# Channels and Queues

### Channels and Queues

A queue is a data structure for passing work from one part of a program to another.

One thread puts work into the queue. Another thread takes work out.

```text
producer -> queue -> consumer
```

This pattern is common in concurrent programs.

A channel is usually a queue with synchronization built in. Zig’s standard library gives you low-level synchronization tools, such as mutexes, condition variables, atomics, and threads. With those tools, you can build a queue that works safely between threads.

#### Why Queues Matter

Without a queue, threads often need to call each other directly.

That creates tight coupling.

With a queue, one thread can say:

```text
Here is some work.
```

Another thread can later say:

```text
Give me the next piece of work.
```

The producer does not need to know exactly when the consumer runs.

The consumer does not need to know exactly when the producer creates work.

#### A Simple Job Type

Start with a small job type:

```zig
const Job = struct {
    id: u32,
};
```

A producer will push `Job` values into a queue.

A worker will pop `Job` values from the queue.

#### Queue State

A thread-safe queue needs three things:

```zig
const std = @import("std");

const JobQueue = struct {
    mutex: std.Thread.Mutex = .{},
    condition: std.Thread.Condition = .{},
    jobs: std.ArrayList(Job),
    shutdown: bool = false,
};
```

The fields have clear roles:

| Field | Role |
|---|---|
| `mutex` | Protects the queue state |
| `condition` | Wakes workers when jobs arrive |
| `jobs` | Stores pending jobs |
| `shutdown` | Tells workers to exit |

The mutex protects both `jobs` and `shutdown`.

#### Initializing the Queue

Because `std.ArrayList` needs an allocator, the queue should be initialized with one.

```zig
const JobQueue = struct {
    mutex: std.Thread.Mutex = .{},
    condition: std.Thread.Condition = .{},
    jobs: std.ArrayList(Job),
    shutdown: bool = false,

    fn init(allocator: std.mem.Allocator) JobQueue {
        return .{
            .jobs = std.ArrayList(Job).init(allocator),
        };
    }

    fn deinit(self: *JobQueue) void {
        self.jobs.deinit();
    }
};
```

Now the owner of the queue controls memory.

```zig
var queue = JobQueue.init(allocator);
defer queue.deinit();
```

#### Pushing Work

To push work, lock the mutex, append the job, then signal one waiting worker.

```zig
fn push(self: *JobQueue, job: Job) !void {
    self.mutex.lock();
    defer self.mutex.unlock();

    try self.jobs.append(job);
    self.condition.signal();
}
```

The order matters.

First, change the shared state:

```zig
try self.jobs.append(job);
```

Then wake a worker:

```zig
self.condition.signal();
```

The signal means: the queue may now contain work.

#### Popping Work

Popping is more interesting.

If there is no work, the worker should sleep. If shutdown has been requested, the worker should stop.

```zig
fn pop(self: *JobQueue) ?Job {
    self.mutex.lock();
    defer self.mutex.unlock();

    while (self.jobs.items.len == 0 and !self.shutdown) {
        self.condition.wait(&self.mutex);
    }

    if (self.jobs.items.len == 0 and self.shutdown) {
        return null;
    }

    return self.jobs.orderedRemove(0);
}
```

This function returns `?Job`.

If it returns a job, the worker should process it.

If it returns `null`, the worker should exit.

#### Why the Wait Uses `while`

This part must use `while`, not `if`:

```zig
while (self.jobs.items.len == 0 and !self.shutdown) {
    self.condition.wait(&self.mutex);
}
```

A worker may wake up even if no job is available.

Also, several workers may wake up, and another worker may take the job first.

So every worker must check the condition again after waking.

#### Full Queue Example

```zig
const std = @import("std");

const Job = struct {
    id: u32,
};

const JobQueue = struct {
    mutex: std.Thread.Mutex = .{},
    condition: std.Thread.Condition = .{},
    jobs: std.ArrayList(Job),
    shutdown: bool = false,

    fn init(allocator: std.mem.Allocator) JobQueue {
        return .{
            .jobs = std.ArrayList(Job).init(allocator),
        };
    }

    fn deinit(self: *JobQueue) void {
        self.jobs.deinit();
    }

    fn push(self: *JobQueue, job: Job) !void {
        self.mutex.lock();
        defer self.mutex.unlock();

        try self.jobs.append(job);
        self.condition.signal();
    }

    fn pop(self: *JobQueue) ?Job {
        self.mutex.lock();
        defer self.mutex.unlock();

        while (self.jobs.items.len == 0 and !self.shutdown) {
            self.condition.wait(&self.mutex);
        }

        if (self.jobs.items.len == 0 and self.shutdown) {
            return null;
        }

        return self.jobs.orderedRemove(0);
    }

    fn stop(self: *JobQueue) void {
        self.mutex.lock();
        defer self.mutex.unlock();

        self.shutdown = true;
        self.condition.broadcast();
    }
};

fn worker(queue: *JobQueue) void {
    while (true) {
        const maybe_job = queue.pop();

        if (maybe_job) |job| {
            std.debug.print("worker got job {}\n", .{job.id});
        } else {
            std.debug.print("worker stopping\n", .{});
            return;
        }
    }
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();

    const allocator = gpa.allocator();

    var queue = JobQueue.init(allocator);
    defer queue.deinit();

    const t1 = try std.Thread.spawn(.{}, worker, .{&queue});
    const t2 = try std.Thread.spawn(.{}, worker, .{&queue});

    try queue.push(.{ .id = 1 });
    try queue.push(.{ .id = 2 });
    try queue.push(.{ .id = 3 });

    queue.stop();

    t1.join();
    t2.join();
}
```

The output order is not guaranteed. One worker may get two jobs. The other may get one. That is normal.

#### The Shutdown Path

The queue has a `stop` method:

```zig
fn stop(self: *JobQueue) void {
    self.mutex.lock();
    defer self.mutex.unlock();

    self.shutdown = true;
    self.condition.broadcast();
}
```

It uses `broadcast`, not `signal`.

Why?

There may be several sleeping workers. Shutdown affects all of them. Each one should wake up, see `shutdown == true`, and exit if no jobs remain.

#### Ordered Remove Is Simple but Slow

This line is easy to understand:

```zig
return self.jobs.orderedRemove(0);
```

It removes the first item and shifts the remaining items left.

That keeps FIFO order, but it costs more as the queue grows.

For learning, this is fine.

For production, use a ring buffer, linked list, or another queue structure with efficient front removal.

#### FIFO Order

The queue above is FIFO.

FIFO means first in, first out.

```text
push 1
push 2
push 3

pop -> 1
pop -> 2
pop -> 3
```

FIFO is the usual order for job queues.

Some programs need different behavior:

| Queue type | Meaning |
|---|---|
| FIFO queue | Oldest job first |
| LIFO stack | Newest job first |
| Priority queue | Most important job first |
| Ring buffer | Fixed-size circular storage |

The synchronization idea stays the same. Protect the structure with a mutex. Sleep with a condition variable. Wake workers when the state changes.

#### Bounded Queues

The queue above can grow until memory runs out.

A bounded queue has a maximum size.

That is useful when producers may create work faster than consumers can process it.

A bounded queue needs two waiting conditions:

```text
not empty
not full
```

Consumers wait while the queue is empty.

Producers wait while the queue is full.

This gives backpressure. Backpressure means fast producers are forced to slow down when the system is overloaded.

#### Channels

A channel is a higher-level version of this idea.

A channel usually has operations like:

```text
send(value)
receive() -> value
close()
```

Internally, a channel often contains a queue, a mutex, and one or more condition variables.

The main benefit of a channel is that it hides the locking details.

Instead of writing:

```zig
mutex.lock();
defer mutex.unlock();
condition.wait(&mutex);
```

user code can say:

```text
channel.send(job)
job = channel.receive()
```

Zig gives you enough low-level tools to build this abstraction when your program needs it.

#### Ownership of Jobs

A queue passes ownership.

For small values like this:

```zig
const Job = struct {
    id: u32,
};
```

the queue stores the whole value.

For larger jobs, the queue may store pointers:

```zig
const Job = struct {
    path: []const u8,
    output: []u8,
};
```

Then lifetime matters.

If a job contains a pointer, the pointed-to data must stay alive until the worker is done with the job.

A safe beginner rule:

Store owned data in the job, or make the owner explicit.

Do not queue pointers to temporary stack data.

#### Bad Lifetime Example

This is unsafe in design:

```zig
fn submit(queue: *JobQueue) !void {
    var path = [_]u8{ 'a', '.', 't', 'x', 't' };

    try queue.push(.{
        .path = path[0..],
    });
}
```

The array `path` disappears when `submit` returns.

A worker may later receive a slice pointing to invalid memory.

Better designs allocate the path, store it in longer-lived state, or process the job before the data goes out of scope.

#### One Queue, Many Workers

A common pattern is:

```text
main thread creates queue
main thread starts workers
main thread pushes jobs
workers pop jobs
main thread stops queue
main thread joins workers
```

This pattern is called a worker pool.

It is useful when you have many jobs and a fixed number of worker threads.

The queue controls communication. The workers do not need to know where jobs come from.

#### Good Queue Discipline

A good concurrent queue has these rules:

| Rule | Reason |
|---|---|
| All shared state is protected by one mutex | Avoids data races |
| Waiting uses `while` | Handles wakeups correctly |
| Producers signal after pushing | Wakes sleeping consumers |
| Shutdown uses `broadcast` | Wakes all workers |
| Jobs have clear ownership | Prevents invalid pointers |
| Long work happens outside the lock | Keeps the queue responsive |

The queue should only be locked while changing queue state.

The worker should unlock before processing the job.

That is why `pop` returns the job and then releases the mutex. Processing happens later.

#### The Main Rule

A queue is a boundary between threads.

One side produces work. The other side consumes work.

The queue owns the synchronization. The rest of the program should not touch its internal mutex, condition variable, or list directly.

This is the clean design:

```text
push job
pop job
stop queue
```

The details stay inside the queue.

