Skip to main content
Engineering LibreTexts

10.1: The work queue

  • Page ID
    40634
  • \( \newcommand{\vecs}[1]{\overset { \scriptstyle \rightharpoonup} {\mathbf{#1}} } \) \( \newcommand{\vecd}[1]{\overset{-\!-\!\rightharpoonup}{\vphantom{a}\smash {#1}}} \)\(\newcommand{\id}{\mathrm{id}}\) \( \newcommand{\Span}{\mathrm{span}}\) \( \newcommand{\kernel}{\mathrm{null}\,}\) \( \newcommand{\range}{\mathrm{range}\,}\) \( \newcommand{\RealPart}{\mathrm{Re}}\) \( \newcommand{\ImaginaryPart}{\mathrm{Im}}\) \( \newcommand{\Argument}{\mathrm{Arg}}\) \( \newcommand{\norm}[1]{\| #1 \|}\) \( \newcommand{\inner}[2]{\langle #1, #2 \rangle}\) \( \newcommand{\Span}{\mathrm{span}}\) \(\newcommand{\id}{\mathrm{id}}\) \( \newcommand{\Span}{\mathrm{span}}\) \( \newcommand{\kernel}{\mathrm{null}\,}\) \( \newcommand{\range}{\mathrm{range}\,}\) \( \newcommand{\RealPart}{\mathrm{Re}}\) \( \newcommand{\ImaginaryPart}{\mathrm{Im}}\) \( \newcommand{\Argument}{\mathrm{Arg}}\) \( \newcommand{\norm}[1]{\| #1 \|}\) \( \newcommand{\inner}[2]{\langle #1, #2 \rangle}\) \( \newcommand{\Span}{\mathrm{span}}\)\(\newcommand{\AA}{\unicode[.8,0]{x212B}}\)

    In some multi-threaded programs, threads are organized to perform different tasks. Often they communicate with each other using a queue, where some threads, called “producers”, put data into the queue and other threads, called “consumers”, take data out.

    For example, in applications with a graphical user interface, there might be one thread that runs the GUI, responding to user events, and another thread that processes user requests. In that case, the GUI thread might put requests into a queue and the “back end” thread might take requests out and process them.

    To support this organization, we need a queue implementation that is “thread safe”, which means that both threads (or more than two) can access the queue at the same time. And we need to handle the special cases when the queue is empty and, if the size of the queue is bounded, when the queue is full.

    I’ll start with a simple queue that is not thread safe, then we’ll see what goes wrong and fix it. The code for this example is in the repository for this book, in a folder called queue. The file queue.c contains a basic implementation of a circular buffer, which you can read about at https://en.Wikipedia.org/wiki/Circular_buffer.

    Here’s the structure definition:

    typedef struct {
        int *array;
        int length;
        int next_in;
        int next_out;
    } Queue;
    

    array is the array that contains the elements of the queue. For this example the elements are ints, but more generally they would be structures that contain user events, items of work, etc.

    length is the length of the array. next_in is an index into the array that indices where the next element should be added; similarly, next_out is the index of the next element that should be removed.

    make_queue allocates space for this structure and initializes the fields:

    Queue *make_queue(int length)
    {
        Queue *queue = (Queue *) malloc(sizeof(Queue));
        queue->length = length + 1;
        queue->array = (int *) malloc(length * sizeof(int));
        queue->next_in = 0;
        queue->next_out = 0;
        return queue;
    }
    

    The initial value for next_out needs some explaining. Since the queue is initially empty, there is no next element to remove, so next_out is invalid. Setting next_out == next_in is a special case that indicates that the queue is empty, so we can write:

    int queue_empty(Queue *queue)
    {
        return (queue->next_in == queue->next_out);
    }
    

    Now we can add elements to the queue using queue_push:

    void queue_push(Queue *queue, int item) {
        if (queue_full(queue)) {
            perror_exit("queue is full");
        }
      
        queue->array[queue->next_in] = item;
        queue->next_in = queue_incr(queue, queue->next_in);
    }
    

    If the queue is full, queue_push prints an error message and exits. I will explain queue_full soon.

    If the queue is not full, queue_push inserts the new element and then increments next_in using queue_incr:

    int queue_incr(Queue *queue, int i)
    {
        return (i+1) % queue->length;
    }
    

    When the index, i, gets to the end of the array, it wraps around to 0. And that’s where we run into a tricky part. If we keep adding elements to the queue, eventually next_in wraps around and catches up with next_out. But if next_in == next_out, we would incorrectly conclude that the queue was empty.

    To avoid that, we define another special case to indicate that the queue is full:

    int queue_full(Queue *queue)
    {
        return (queue_incr(queue, queue->next_in) == queue->next_out);
    }
    

    If incrementing next_in lands on next_out, that means we can’t add another element without making the queue seem empty. So we stop one element before the “end” (keeping in mind that the end of the queue can be anywhere, not necessarily the end of the array).

    Now we can write queue_pop, which removes and returns the next element from the queue:

    int queue_pop(Queue *queue) {
        if (queue_empty(queue)) {
            perror_exit("queue is empty");
        }
      
        int item = queue->array[queue->next_out];
        queue->next_out = queue_incr(queue, queue->next_out);
        return item;
    }
    

    If you try to pop from an empty queue, queue_pop prints an error message and exits.


    This page titled 10.1: The work queue is shared under a CC BY-NC license and was authored, remixed, and/or curated by Allen B. Downey (Green Tea Press) .

    • Was this article helpful?