# Build a Thread Pool

### Build a Thread Pool

A thread pool is a group of worker threads that wait for jobs.

Instead of creating a new thread for every task, you create a fixed number of threads once. Then you send jobs to them.

This is useful because creating threads is expensive. Reusing threads is cheaper and more predictable.

A thread pool usually has this shape:

```text
main thread
  creates worker threads
  pushes jobs into a queue

worker threads
  wait for jobs
  take one job
  run it
  repeat
```

#### The Goal

We will build a small thread pool that can run jobs like this:

```zig
try pool.submit(Job{
    .number = 42,
});
```

Each worker will take a job and process it:

```text
worker 0 processed job 42
worker 1 processed job 43
worker 2 processed job 44
```

This first version will use:

```text
std.Thread
std.Thread.Mutex
std.Thread.Condition
std.ArrayList
```

A production thread pool needs more details, but this version shows the core idea.

#### The Job Type

Start with a small job:

```zig
const Job = struct {
    number: usize,
};
```

A real job might contain a file path, a network request, a function pointer, or a pointer to application data.

For learning, one number is enough.

#### The Thread Pool Type

A thread pool needs shared state.

```zig
const ThreadPool = struct {
    allocator: std.mem.Allocator,
    threads: []std.Thread,
    jobs: std.ArrayList(Job),
    mutex: std.Thread.Mutex,
    condition: std.Thread.Condition,
    stopped: bool,
};
```

Each field has a role.

`threads` stores the worker threads.

`jobs` stores pending work.

`mutex` protects shared data.

`condition` lets workers sleep until new work arrives.

`stopped` tells workers when to exit.

#### Initializing the Pool

We need a function that creates workers.

```zig
fn init(allocator: std.mem.Allocator, count: usize) !ThreadPool {
    var pool = ThreadPool{
        .allocator = allocator,
        .threads = try allocator.alloc(std.Thread, count),
        .jobs = std.ArrayList(Job).init(allocator),
        .mutex = .{},
        .condition = .{},
        .stopped = false,
    };

    for (pool.threads, 0..) |*thread, id| {
        thread.* = try std.Thread.spawn(.{}, workerMain, .{ &pool, id });
    }

    return pool;
}
```

This creates `count` threads.

Each thread runs `workerMain`.

The worker receives:

```zig
&pool
```

and:

```zig
id
```

The `id` helps us print which worker handled a job.

There is one subtle issue here: this version passes `&pool` to threads before returning `pool` by value. In real Zig code, moving the struct after spawning threads would be unsafe because workers may keep a pointer to the old stack location.

So we should allocate the pool itself on the heap.

Use this safer design:

```zig
fn create(allocator: std.mem.Allocator, count: usize) !*ThreadPool {
    const pool = try allocator.create(ThreadPool);
    errdefer allocator.destroy(pool);

    pool.* = ThreadPool{
        .allocator = allocator,
        .threads = try allocator.alloc(std.Thread, count),
        .jobs = std.ArrayList(Job).init(allocator),
        .mutex = .{},
        .condition = .{},
        .stopped = false,
    };

    errdefer allocator.free(pool.threads);
    errdefer pool.jobs.deinit();

    for (pool.threads, 0..) |*thread, id| {
        thread.* = try std.Thread.spawn(.{}, workerMain, .{ pool, id });
    }

    return pool;
}
```

Now the pool has a stable address.

That matters because worker threads keep a pointer to it.

#### Submitting Jobs

To submit a job, we lock the mutex, append the job, unlock the mutex, and wake one worker.

```zig
fn submit(self: *ThreadPool, job: Job) !void {
    self.mutex.lock();
    defer self.mutex.unlock();

    try self.jobs.append(job);
    self.condition.signal();
}
```

The mutex is required because several threads can access `jobs`.

The main thread appends jobs.

Worker threads remove jobs.

Without a mutex, two threads might modify the list at the same time and corrupt memory.

#### Worker Threads

A worker repeats this loop:

```text
wait for a job
take the job
run the job
```

Here is the worker function:

```zig
fn workerMain(pool: *ThreadPool, id: usize) void {
    while (true) {
        pool.mutex.lock();

        while (pool.jobs.items.len == 0 and !pool.stopped) {
            pool.condition.wait(&pool.mutex);
        }

        if (pool.stopped and pool.jobs.items.len == 0) {
            pool.mutex.unlock();
            return;
        }

        const job = pool.jobs.orderedRemove(0);
        pool.mutex.unlock();

        processJob(id, job);
    }
}
```

The condition variable is used here:

```zig
pool.condition.wait(&pool.mutex);
```

This puts the worker to sleep until another thread calls:

```zig
self.condition.signal();
```

or:

```zig
self.condition.broadcast();
```

The `while` loop around `wait` is important. A condition variable can wake up even when there is no job ready. The worker must check the condition again.

#### Processing a Job

For this project, processing only prints a message.

```zig
fn processJob(worker_id: usize, job: Job) void {
    std.debug.print("worker {d} processed job {d}\n", .{ worker_id, job.number });
}
```

In a real program, this is where you would do work.

Examples:

```text
resize an image
parse a file
compress data
handle a request
run a build step
```

#### Stopping the Pool

A thread pool needs a clean shutdown.

We need to tell workers to stop, wake them, and join each thread.

```zig
fn destroy(self: *ThreadPool) void {
    self.mutex.lock();
    self.stopped = true;
    self.condition.broadcast();
    self.mutex.unlock();

    for (self.threads) |thread| {
        thread.join();
    }

    self.jobs.deinit();
    self.allocator.free(self.threads);

    const allocator = self.allocator;
    allocator.destroy(self);
}
```

The call to `broadcast` wakes all workers.

```zig
self.condition.broadcast();
```

If we only used `signal`, one worker might wake up while others stay asleep forever.

#### Complete Program

Put this in `src/main.zig`:

```zig
const std = @import("std");

const Job = struct {
    number: usize,
};

const ThreadPool = struct {
    allocator: std.mem.Allocator,
    threads: []std.Thread,
    jobs: std.ArrayList(Job),
    mutex: std.Thread.Mutex,
    condition: std.Thread.Condition,
    stopped: bool,

    fn create(allocator: std.mem.Allocator, count: usize) !*ThreadPool {
        const pool = try allocator.create(ThreadPool);
        errdefer allocator.destroy(pool);

        pool.* = ThreadPool{
            .allocator = allocator,
            .threads = try allocator.alloc(std.Thread, count),
            .jobs = std.ArrayList(Job).init(allocator),
            .mutex = .{},
            .condition = .{},
            .stopped = false,
        };

        errdefer allocator.free(pool.threads);
        errdefer pool.jobs.deinit();

        for (pool.threads, 0..) |*thread, id| {
            thread.* = try std.Thread.spawn(.{}, workerMain, .{ pool, id });
        }

        return pool;
    }

    fn submit(self: *ThreadPool, job: Job) !void {
        self.mutex.lock();
        defer self.mutex.unlock();

        try self.jobs.append(job);
        self.condition.signal();
    }

    fn destroy(self: *ThreadPool) void {
        self.mutex.lock();
        self.stopped = true;
        self.condition.broadcast();
        self.mutex.unlock();

        for (self.threads) |thread| {
            thread.join();
        }

        self.jobs.deinit();
        self.allocator.free(self.threads);

        const allocator = self.allocator;
        allocator.destroy(self);
    }
};

fn workerMain(pool: *ThreadPool, id: usize) void {
    while (true) {
        pool.mutex.lock();

        while (pool.jobs.items.len == 0 and !pool.stopped) {
            pool.condition.wait(&pool.mutex);
        }

        if (pool.stopped and pool.jobs.items.len == 0) {
            pool.mutex.unlock();
            return;
        }

        const job = pool.jobs.orderedRemove(0);
        pool.mutex.unlock();

        processJob(id, job);
    }
}

fn processJob(worker_id: usize, job: Job) void {
    std.debug.print("worker {d} processed job {d}\n", .{ worker_id, job.number });
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer {
        const status = gpa.deinit();
        if (status == .leak) {
            std.debug.print("memory leak detected\n", .{});
        }
    }

    const allocator = gpa.allocator();

    const pool = try ThreadPool.create(allocator, 4);
    defer pool.destroy();

    for (0..20) |i| {
        try pool.submit(Job{
            .number = i,
        });
    }
}
```

Run it:

```bash
zig build run
```

You may see output like this:

```text
worker 0 processed job 0
worker 1 processed job 1
worker 2 processed job 2
worker 3 processed job 3
worker 0 processed job 4
worker 1 processed job 5
```

The exact order can change.

That is normal. Threads run concurrently, and the operating system decides when each thread gets CPU time.

#### Why the Output Order Changes

Do not expect this:

```text
worker 0 processed job 0
worker 1 processed job 1
worker 2 processed job 2
worker 3 processed job 3
```

every time.

You might get:

```text
worker 2 processed job 0
worker 2 processed job 1
worker 0 processed job 2
worker 3 processed job 3
```

That does not mean the program is wrong.

Concurrency means many possible execution orders are valid. Correct concurrent code should not depend on a specific thread schedule.

#### Why We Use a Mutex

The job queue is shared.

This is unsafe:

```zig
try self.jobs.append(job);
```

while another thread is doing:

```zig
const job = pool.jobs.orderedRemove(0);
```

If both happen at the same time, the array list can be damaged.

The mutex creates a protected region:

```zig
self.mutex.lock();
defer self.mutex.unlock();

try self.jobs.append(job);
```

Only one thread can hold the mutex at a time.

#### Why We Use a Condition Variable

Without a condition variable, workers might do this:

```zig
while (true) {
    if (jobs.items.len > 0) {
        runJob();
    }
}
```

That is called busy waiting.

It wastes CPU because the worker keeps checking even when no job exists.

A condition variable lets the worker sleep:

```zig
pool.condition.wait(&pool.mutex);
```

When the main thread submits a job, it wakes a worker:

```zig
self.condition.signal();
```

This is more efficient.

#### The Cost of `orderedRemove(0)`

Our worker removes the first job:

```zig
const job = pool.jobs.orderedRemove(0);
```

This keeps jobs in first-in, first-out order.

But it has a cost. Removing index `0` from an array list shifts all later elements left by one.

For a small project, that is fine.

For a real thread pool, you would usually use a ring buffer or another queue structure so removing the next job is constant time.

#### A Better Job Queue Later

A more serious thread pool might use:

```text
ring buffer
linked queue
work stealing queues
bounded queue
priority queue
```

Each design has different behavior.

A bounded queue can prevent the producer from creating unlimited work.

A priority queue can run important jobs first.

Work stealing can improve CPU usage when different workers have uneven workloads.

For this beginner version, a simple `ArrayList` keeps the code readable.

#### Add a Small Delay

To make concurrency easier to see, add a short sleep in `processJob`:

```zig
fn processJob(worker_id: usize, job: Job) void {
    std.time.sleep(100 * std.time.ns_per_ms);
    std.debug.print("worker {d} processed job {d}\n", .{ worker_id, job.number });
}
```

Now each job sleeps for about 100 milliseconds.

With one worker, 20 jobs would take about 2 seconds.

With four workers, they should finish faster because several jobs run at the same time.

#### Handling Submit After Stop

Our current `submit` function allows jobs even if the pool is stopped.

A stricter version should reject that:

```zig
fn submit(self: *ThreadPool, job: Job) !void {
    self.mutex.lock();
    defer self.mutex.unlock();

    if (self.stopped) {
        return error.ThreadPoolStopped;
    }

    try self.jobs.append(job);
    self.condition.signal();
}
```

This makes the API safer.

Once shutdown begins, no new work should enter the pool.

#### What You Learned

A thread pool is a fixed group of workers that reuse threads.

You created worker threads with `std.Thread.spawn`.

You protected shared state with `std.Thread.Mutex`.

You let workers sleep with `std.Thread.Condition`.

You submitted jobs into a queue.

You shut down the pool cleanly with `broadcast` and `join`.

You also saw one of the core rules of concurrent programming: shared mutable data must be protected.

Concurrency gives speed only when the program is structured carefully. A small thread pool is a good first step because it shows the basic moving parts without hiding them.

