Skip to content

Channels and Queues

A queue is a data structure for passing work from one part of a program to another.

A queue is a data structure for passing work from one part of a program to another.

One thread puts work into the queue. Another thread takes work out.

producer -> queue -> consumer

This pattern is common in concurrent programs.

A channel is usually a queue with synchronization built in. Zig’s standard library gives you low-level synchronization tools, such as mutexes, condition variables, atomics, and threads. With those tools, you can build a queue that works safely between threads.

Why Queues Matter

Without a queue, threads often need to call each other directly.

That creates tight coupling.

With a queue, one thread can say:

Here is some work.

Another thread can later say:

Give me the next piece of work.

The producer does not need to know exactly when the consumer runs.

The consumer does not need to know exactly when the producer creates work.

A Simple Job Type

Start with a small job type:

const Job = struct {
    id: u32,
};

A producer will push Job values into a queue.

A worker will pop Job values from the queue.

Queue State

A thread-safe queue needs three things:

const std = @import("std");

const JobQueue = struct {
    mutex: std.Thread.Mutex = .{},
    condition: std.Thread.Condition = .{},
    jobs: std.ArrayList(Job),
    shutdown: bool = false,
};

The fields have clear roles:

FieldRole
mutexProtects the queue state
conditionWakes workers when jobs arrive
jobsStores pending jobs
shutdownTells workers to exit

The mutex protects both jobs and shutdown.

Initializing the Queue

Because std.ArrayList needs an allocator, the queue should be initialized with one.

const JobQueue = struct {
    mutex: std.Thread.Mutex = .{},
    condition: std.Thread.Condition = .{},
    jobs: std.ArrayList(Job),
    shutdown: bool = false,

    fn init(allocator: std.mem.Allocator) JobQueue {
        return .{
            .jobs = std.ArrayList(Job).init(allocator),
        };
    }

    fn deinit(self: *JobQueue) void {
        self.jobs.deinit();
    }
};

Now the owner of the queue controls memory.

var queue = JobQueue.init(allocator);
defer queue.deinit();

Pushing Work

To push work, lock the mutex, append the job, then signal one waiting worker.

fn push(self: *JobQueue, job: Job) !void {
    self.mutex.lock();
    defer self.mutex.unlock();

    try self.jobs.append(job);
    self.condition.signal();
}

The order matters.

First, change the shared state:

try self.jobs.append(job);

Then wake a worker:

self.condition.signal();

The signal means: the queue may now contain work.

Popping Work

Popping is more interesting.

If there is no work, the worker should sleep. If shutdown has been requested, the worker should stop.

fn pop(self: *JobQueue) ?Job {
    self.mutex.lock();
    defer self.mutex.unlock();

    while (self.jobs.items.len == 0 and !self.shutdown) {
        self.condition.wait(&self.mutex);
    }

    if (self.jobs.items.len == 0 and self.shutdown) {
        return null;
    }

    return self.jobs.orderedRemove(0);
}

This function returns ?Job.

If it returns a job, the worker should process it.

If it returns null, the worker should exit.

Why the Wait Uses while

This part must use while, not if:

while (self.jobs.items.len == 0 and !self.shutdown) {
    self.condition.wait(&self.mutex);
}

A worker may wake up even if no job is available.

Also, several workers may wake up, and another worker may take the job first.

So every worker must check the condition again after waking.

Full Queue Example

const std = @import("std");

const Job = struct {
    id: u32,
};

const JobQueue = struct {
    mutex: std.Thread.Mutex = .{},
    condition: std.Thread.Condition = .{},
    jobs: std.ArrayList(Job),
    shutdown: bool = false,

    fn init(allocator: std.mem.Allocator) JobQueue {
        return .{
            .jobs = std.ArrayList(Job).init(allocator),
        };
    }

    fn deinit(self: *JobQueue) void {
        self.jobs.deinit();
    }

    fn push(self: *JobQueue, job: Job) !void {
        self.mutex.lock();
        defer self.mutex.unlock();

        try self.jobs.append(job);
        self.condition.signal();
    }

    fn pop(self: *JobQueue) ?Job {
        self.mutex.lock();
        defer self.mutex.unlock();

        while (self.jobs.items.len == 0 and !self.shutdown) {
            self.condition.wait(&self.mutex);
        }

        if (self.jobs.items.len == 0 and self.shutdown) {
            return null;
        }

        return self.jobs.orderedRemove(0);
    }

    fn stop(self: *JobQueue) void {
        self.mutex.lock();
        defer self.mutex.unlock();

        self.shutdown = true;
        self.condition.broadcast();
    }
};

fn worker(queue: *JobQueue) void {
    while (true) {
        const maybe_job = queue.pop();

        if (maybe_job) |job| {
            std.debug.print("worker got job {}\n", .{job.id});
        } else {
            std.debug.print("worker stopping\n", .{});
            return;
        }
    }
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();

    const allocator = gpa.allocator();

    var queue = JobQueue.init(allocator);
    defer queue.deinit();

    const t1 = try std.Thread.spawn(.{}, worker, .{&queue});
    const t2 = try std.Thread.spawn(.{}, worker, .{&queue});

    try queue.push(.{ .id = 1 });
    try queue.push(.{ .id = 2 });
    try queue.push(.{ .id = 3 });

    queue.stop();

    t1.join();
    t2.join();
}

The output order is not guaranteed. One worker may get two jobs. The other may get one. That is normal.

The Shutdown Path

The queue has a stop method:

fn stop(self: *JobQueue) void {
    self.mutex.lock();
    defer self.mutex.unlock();

    self.shutdown = true;
    self.condition.broadcast();
}

It uses broadcast, not signal.

Why?

There may be several sleeping workers. Shutdown affects all of them. Each one should wake up, see shutdown == true, and exit if no jobs remain.

Ordered Remove Is Simple but Slow

This line is easy to understand:

return self.jobs.orderedRemove(0);

It removes the first item and shifts the remaining items left.

That keeps FIFO order, but it costs more as the queue grows.

For learning, this is fine.

For production, use a ring buffer, linked list, or another queue structure with efficient front removal.

FIFO Order

The queue above is FIFO.

FIFO means first in, first out.

push 1
push 2
push 3

pop -> 1
pop -> 2
pop -> 3

FIFO is the usual order for job queues.

Some programs need different behavior:

Queue typeMeaning
FIFO queueOldest job first
LIFO stackNewest job first
Priority queueMost important job first
Ring bufferFixed-size circular storage

The synchronization idea stays the same. Protect the structure with a mutex. Sleep with a condition variable. Wake workers when the state changes.

Bounded Queues

The queue above can grow until memory runs out.

A bounded queue has a maximum size.

That is useful when producers may create work faster than consumers can process it.

A bounded queue needs two waiting conditions:

not empty
not full

Consumers wait while the queue is empty.

Producers wait while the queue is full.

This gives backpressure. Backpressure means fast producers are forced to slow down when the system is overloaded.

Channels

A channel is a higher-level version of this idea.

A channel usually has operations like:

send(value)
receive() -> value
close()

Internally, a channel often contains a queue, a mutex, and one or more condition variables.

The main benefit of a channel is that it hides the locking details.

Instead of writing:

mutex.lock();
defer mutex.unlock();
condition.wait(&mutex);

user code can say:

channel.send(job)
job = channel.receive()

Zig gives you enough low-level tools to build this abstraction when your program needs it.

Ownership of Jobs

A queue passes ownership.

For small values like this:

const Job = struct {
    id: u32,
};

the queue stores the whole value.

For larger jobs, the queue may store pointers:

const Job = struct {
    path: []const u8,
    output: []u8,
};

Then lifetime matters.

If a job contains a pointer, the pointed-to data must stay alive until the worker is done with the job.

A safe beginner rule:

Store owned data in the job, or make the owner explicit.

Do not queue pointers to temporary stack data.

Bad Lifetime Example

This is unsafe in design:

fn submit(queue: *JobQueue) !void {
    var path = [_]u8{ 'a', '.', 't', 'x', 't' };

    try queue.push(.{
        .path = path[0..],
    });
}

The array path disappears when submit returns.

A worker may later receive a slice pointing to invalid memory.

Better designs allocate the path, store it in longer-lived state, or process the job before the data goes out of scope.

One Queue, Many Workers

A common pattern is:

main thread creates queue
main thread starts workers
main thread pushes jobs
workers pop jobs
main thread stops queue
main thread joins workers

This pattern is called a worker pool.

It is useful when you have many jobs and a fixed number of worker threads.

The queue controls communication. The workers do not need to know where jobs come from.

Good Queue Discipline

A good concurrent queue has these rules:

RuleReason
All shared state is protected by one mutexAvoids data races
Waiting uses whileHandles wakeups correctly
Producers signal after pushingWakes sleeping consumers
Shutdown uses broadcastWakes all workers
Jobs have clear ownershipPrevents invalid pointers
Long work happens outside the lockKeeps the queue responsive

The queue should only be locked while changing queue state.

The worker should unlock before processing the job.

That is why pop returns the job and then releases the mutex. Processing happens later.

The Main Rule

A queue is a boundary between threads.

One side produces work. The other side consumes work.

The queue owns the synchronization. The rest of the program should not touch its internal mutex, condition variable, or list directly.

This is the clean design:

push job
pop job
stop queue

The details stay inside the queue.