Skip to main content
Engineering LibreTexts

10.2: Producers and consumers

  • Page ID
    40635
  • \( \newcommand{\vecs}[1]{\overset { \scriptstyle \rightharpoonup} {\mathbf{#1}} } \) \( \newcommand{\vecd}[1]{\overset{-\!-\!\rightharpoonup}{\vphantom{a}\smash {#1}}} \)\(\newcommand{\id}{\mathrm{id}}\) \( \newcommand{\Span}{\mathrm{span}}\) \( \newcommand{\kernel}{\mathrm{null}\,}\) \( \newcommand{\range}{\mathrm{range}\,}\) \( \newcommand{\RealPart}{\mathrm{Re}}\) \( \newcommand{\ImaginaryPart}{\mathrm{Im}}\) \( \newcommand{\Argument}{\mathrm{Arg}}\) \( \newcommand{\norm}[1]{\| #1 \|}\) \( \newcommand{\inner}[2]{\langle #1, #2 \rangle}\) \( \newcommand{\Span}{\mathrm{span}}\) \(\newcommand{\id}{\mathrm{id}}\) \( \newcommand{\Span}{\mathrm{span}}\) \( \newcommand{\kernel}{\mathrm{null}\,}\) \( \newcommand{\range}{\mathrm{range}\,}\) \( \newcommand{\RealPart}{\mathrm{Re}}\) \( \newcommand{\ImaginaryPart}{\mathrm{Im}}\) \( \newcommand{\Argument}{\mathrm{Arg}}\) \( \newcommand{\norm}[1]{\| #1 \|}\) \( \newcommand{\inner}[2]{\langle #1, #2 \rangle}\) \( \newcommand{\Span}{\mathrm{span}}\)\(\newcommand{\AA}{\unicode[.8,0]{x212B}}\)

    Now let’s make some threads to access this queue. Here’s the producer code:

    void *producer_entry(void *arg) {
        Shared *shared = (Shared *) arg;
    
        for (int i=0; i<QUEUE_LENGTH-1; i++) {
            printf("adding item %d\n", i);
            queue_push(shared->queue, i);
        }
        pthread_exit(NULL);
    }
    

    Here’s the consumer code:

    void *consumer_entry(void *arg) {
        int item;
        Shared *shared = (Shared *) arg;
    
        for (int i=0; i<QUEUE_LENGTH-1; i++) {
            item = queue_pop(shared->queue);
            printf("consuming item %d\n", item);
        }
        pthread_exit(NULL);
    }
    

    Here’s the parent code that starts the threads and waits for them

        pthread_t child[NUM_CHILDREN];
    
        Shared *shared = make_shared();
    
        child[0] = make_thread(producer_entry, shared);
        child[1] = make_thread(consumer_entry, shared);
    
        for (int i=0; i<NUM_CHILDREN; i++) {
            join_thread(child[i]);
        }
    

    And finally here’s the shared structure that contains the queue:

    typedef struct {
        Queue *queue;
    } Shared;
    
    Shared *make_shared()
    {
        Shared *shared = check_malloc(sizeof(Shared));
        shared->queue = make_queue(QUEUE_LENGTH);
        return shared;
    }
    

    The code we have so far is a good starting place, but it has several problems:

    • Access to the queue is not thread safe. Different threads could access array, next_in, and next_out at the same time and leave the queue in a broken, “inconsistent” state.
    • If the consumer is scheduled first, it finds the queue empty, print an error message, and exits. We would rather have the consumer block until the queue is not empty. Similarly, we would like the producer to block if the queue is full.

    In the next section, we solve the first problem with a Mutex. In the following section, we solve the second problem with condition variables.


    This page titled 10.2: Producers and consumers is shared under a CC BY-NC license and was authored, remixed, and/or curated by Allen B. Downey (Green Tea Press) .

    • Was this article helpful?