Skip to content

Parallel Grep

Now that you have seen how to build servers, let's take a step back from networking and focus on core concurrency primitives. In this chapter, we'll explore:

  • Channels - ZIO's primary mechanism for communicating between tasks
  • Worker pools - distributing work across multiple concurrent tasks
  • File I/O - reading files asynchronously
  • Multi-threaded execution - running tasks across multiple CPU cores

We'll build a parallel grep tool that searches for patterns in multiple files concurrently, using a worker pool pattern with channels to coordinate the work.

The Code

Replace the contents of src/main.zig with this:

const std = @import("std");
const zio = @import("zio");

const SearchResult = struct {
    file_path: []const u8,
    line_number: usize,
    line: []const u8,
};

fn searchFile(
    gpa: std.mem.Allocator,
    dir: zio.Dir,
    path: []const u8,
    pattern: []const u8,
    results_channel: *zio.Channel(SearchResult),
) !void {
    const file = dir.openFile(path, .{}) catch |err| {
        std.log.warn("Failed to open file {s}: {}", .{ path, err });
        return;
    };
    defer file.close();

    var read_buffer: [4096]u8 = undefined;
    var reader = file.reader(&read_buffer);

    var line_number: usize = 0;
    while (true) {
        const line = reader.interface.takeDelimiterInclusive('\n') catch |err| switch (err) {
            error.EndOfStream => break,
            error.ReadFailed => |e| return reader.err orelse e,
            else => |e| return e,
        };
        line_number += 1;

        if (std.mem.indexOf(u8, line, pattern)) |_| {
            const result = SearchResult{
                .file_path = path,
                .line_number = line_number,
                .line = try gpa.dupe(u8, line),
            };
            errdefer gpa.free(result.line);
            try results_channel.send(result);
        }
    }
}

fn worker(
    gpa: std.mem.Allocator,
    dir: zio.Dir,
    id: usize,
    work_channel: *zio.Channel([]const u8),
    results_channel: *zio.Channel(SearchResult),
    pattern: []const u8,
) zio.Cancelable!void {
    while (true) {
        const path = work_channel.receive() catch |err| switch (err) {
            error.ChannelClosed => {
                std.log.info("Worker {} exiting", .{id});
                return;
            },
            error.Canceled => return error.Canceled,
        };

        std.log.info("Worker {} searching {s}", .{ id, path });
        searchFile(gpa, dir, path, pattern, results_channel) catch |err| {
            std.log.warn("Worker {} error searching {s}: {}", .{ id, path, err });
        };
    }
}

fn collector(
    gpa: std.mem.Allocator,
    results_channel: *zio.Channel(SearchResult),
) !void {
    const stdout = zio.stdout();
    var write_buffer: [4096]u8 = undefined;
    var writer = stdout.writer(&write_buffer);

    while (true) {
        const result = results_channel.receive() catch |err| switch (err) {
            error.ChannelClosed => return,
            error.Canceled => return error.Canceled,
        };

        try writer.interface.print("{s}:{}: {s}", .{
            result.file_path,
            result.line_number,
            result.line,
        });
        try writer.interface.flush();

        gpa.free(result.line);
    }
}

pub fn main() !void {
    const gpa = std.heap.smp_allocator;

    const args = try std.process.argsAlloc(gpa);
    defer std.process.argsFree(gpa, args);

    if (args.len < 3) {
        std.log.err("Usage: {s} <pattern> <file1> [file2...]", .{args[0]});
        return error.InvalidArgs;
    }

    const pattern = args[1];
    const files = args[2..];

    var rt = try zio.Runtime.init(gpa, .{ .executors = .auto });
    defer rt.deinit();

    const cwd = zio.Dir.cwd();

    // Create channels
    var work_buffer: [16][]const u8 = undefined;
    var work_channel = zio.Channel([]const u8).init(&work_buffer);

    var results_channel = zio.Channel(SearchResult).init(&.{});

    var workers_group: zio.Group = .init;
    defer workers_group.cancel();

    var collector_group: zio.Group = .init;
    defer collector_group.cancel();

    // Start worker tasks
    const num_workers = 4;
    for (0..num_workers) |i| {
        try workers_group.spawn(worker, .{ gpa, cwd, i, &work_channel, &results_channel, pattern });
    }

    // Start collector task
    try collector_group.spawn(collector, .{ gpa, &results_channel });

    // Distribute work
    for (files) |file_path| {
        work_channel.send(file_path) catch |err| switch (err) {
            error.ChannelClosed => break,
            error.Canceled => return error.Canceled,
        };
    }

    // Close work channel to signal workers to exit
    work_channel.close(.graceful);

    // Wait for all workers to finish
    try workers_group.wait();

    // Now close results channel to signal collector to exit
    results_channel.close(.graceful);

    // Wait for collector to finish
    try collector_group.wait();

    std.log.info("Search complete.", .{});
}

Now build and run it:

$ zig build run -- "TODO" src/*.zig
info: Worker 0 searching src/main.zig
info: Worker 1 searching src/root.zig
src/main.zig:42: // TODO: Add error handling
src/root.zig:15: // TODO: Implement feature
info: Worker 0 exiting
info: Worker 1 exiting
info: Search complete.

How It Works

This program uses a worker pool pattern with channels to distribute file searching across multiple tasks. It consists of three types of tasks: the main coordinator, worker tasks, and a collector task.

Multi-threaded Runtime

First, notice how we initialize the runtime:

var rt = try zio.Runtime.init(gpa, .{ .executors = .auto });

The .executors = .auto option tells ZIO to create one executor (OS thread) per CPU core. This means our tasks can truly run in parallel across multiple cores, not just concurrently on a single core.

In the previous examples, we didn't specify this option, so the runtime defaulted to a single executor. That was fine for I/O-bound network servers where tasks spend most of their time waiting. But for this CPU-bound workload (searching file contents), using multiple cores gives us real parallelism.

Channels

Before we dive into the tasks, let's understand the communication mechanism:

    // Create channels
    var work_buffer: [16][]const u8 = undefined;
    var work_channel = zio.Channel([]const u8).init(&work_buffer);

    var results_channel = zio.Channel(SearchResult).init(&.{});

A Channel is a typed queue for passing messages between tasks. Channels can be:

  • Buffered (like work_channel with 16 slots) - send() blocks only when the buffer is full
  • Unbuffered (like results_channel with empty slice) - send() blocks until a receiver calls receive()

Channels provide a safe way to communicate between concurrent tasks without shared memory or locks.

Worker Pool

The program spawns 4 worker tasks that process files from a shared queue:

    // Start worker tasks
    const num_workers = 4;
    for (0..num_workers) |i| {
        try workers_group.spawn(worker, .{ gpa, cwd, i, &work_channel, &results_channel, pattern });
    }

Each worker runs this loop:

fn worker(
    gpa: std.mem.Allocator,
    dir: zio.Dir,
    id: usize,
    work_channel: *zio.Channel([]const u8),
    results_channel: *zio.Channel(SearchResult),
    pattern: []const u8,
) zio.Cancelable!void {
    while (true) {
        const path = work_channel.receive() catch |err| switch (err) {
            error.ChannelClosed => {
                std.log.info("Worker {} exiting", .{id});
                return;
            },
            error.Canceled => return error.Canceled,
        };

        std.log.info("Worker {} searching {s}", .{ id, path });
        searchFile(gpa, dir, path, pattern, results_channel) catch |err| {
            std.log.warn("Worker {} error searching {s}: {}", .{ id, path, err });
        };
    }
}

The worker:

  1. Receives a file path from work_channel
  2. When the channel is closed, the worker exits gracefully
  3. Searches the file for the pattern
  4. Sends any matches to results_channel
  5. Repeats until the work queue is exhausted

The Collector Task

A separate task collects and prints results:

fn collector(
    gpa: std.mem.Allocator,
    results_channel: *zio.Channel(SearchResult),
) !void {
    const stdout = zio.stdout();
    var write_buffer: [4096]u8 = undefined;
    var writer = stdout.writer(&write_buffer);

    while (true) {
        const result = results_channel.receive() catch |err| switch (err) {
            error.ChannelClosed => return,
            error.Canceled => return error.Canceled,
        };

        try writer.interface.print("{s}:{}: {s}", .{
            result.file_path,
            result.line_number,
            result.line,
        });
        try writer.interface.flush();

        gpa.free(result.line);
    }
}

The collector runs in its own task to avoid blocking workers. It creates the stdout writer once before the loop, then receives results, prints them, and frees the memory allocated by workers. This demonstrates how ownership can be transferred between tasks through channels.

Coordinating the Work

The main function orchestrates everything:

    // Start worker tasks
    const num_workers = 4;
    for (0..num_workers) |i| {
        try workers_group.spawn(worker, .{ gpa, cwd, i, &work_channel, &results_channel, pattern });
    }

    // Start collector task
    try collector_group.spawn(collector, .{ gpa, &results_channel });

    // Distribute work
    for (files) |file_path| {
        work_channel.send(file_path) catch |err| switch (err) {
            error.ChannelClosed => break,
            error.Canceled => return error.Canceled,
        };
    }

    // Close work channel to signal workers to exit
    work_channel.close(.graceful);

    // Wait for all workers to finish
    try workers_group.wait();

    // Now close results channel to signal collector to exit
    results_channel.close(.graceful);

    // Wait for collector to finish
    try collector_group.wait();

This shutdown sequence is important:

  1. Send all file paths to workers
  2. Close work_channel - workers will exit when they drain the queue
  3. Wait for workers to finish - ensures all results are sent
  4. Close results_channel - signals collector to exit
  5. Wait for collector to finish - ensures all output is printed

This is a graceful shutdown that ensures no work is lost and all tasks clean up properly.

Memory Management

Notice how memory flows through the system:

// In searchFile (called by workers)
const result = SearchResult{
    .file_path = path,
    .line_number = line_number,
    .line = try gpa.dupe(u8, line),  // Allocate
};
errdefer gpa.free(result.line);  // Free if send fails
try results_channel.send(result);
// In collector
const result = results_channel.receive() catch ...
// ... print result ...
gpa.free(result.line);  // Free

The worker allocates memory for each matching line, sends ownership through the channel, and the collector frees it. Channels ensure this transfer is safe - there's no risk of use-after-free or double-free.

Key Concepts

Multi-threaded Execution

ZIO's runtime can use multiple OS threads (executors) to run tasks in parallel:

  • .executors = .auto - auto-detect based on CPU count (good for CPU-bound work)
  • .executors = .exact(1) - single-threaded (default, good for I/O-bound work)
  • .executors = .exact(N) - explicit number of threads

Tasks are automatically distributed across executors. Channels handle synchronization, so you don't need locks or atomic operations - just send and receive messages safely between tasks.

For I/O-bound workloads (like web servers), a single executor is often sufficient since tasks spend most of their time waiting. For CPU-bound workloads (like file processing, data analysis, or computation), multiple executors let you use all available CPU cores.

Channels

Channels are the primary way to communicate between tasks in ZIO. They provide:

  • Type safety - channels are typed, so you can only send/receive values of the declared type
  • Blocking semantics - send() and receive() suspend the task when the operation can't complete immediately
  • Graceful closure - closed channels return error.ChannelClosed to signal no more data will be sent

Use buffered channels when you want to decouple producers and consumers, allowing bursts of work. Use unbuffered channels when you want direct handoff between tasks.

Worker Pools

The worker pool pattern is useful when you have:

  • A queue of independent work items
  • Tasks that can process items in parallel
  • A fixed number of workers to limit resource usage

In our example, the 4 workers share the work queue, automatically load-balancing. If one worker gets a large file, others keep processing smaller files.

Graceful Shutdown

The shutdown sequence demonstrates structured concurrency with channels:

  1. Close the work channel - tells workers "no more work is coming"
  2. Wait for workers - ensures they finish processing what they have
  3. Close the results channel - tells collector "no more results are coming"
  4. Wait for collector - ensures it finishes printing

This pattern prevents data loss and ensures clean shutdown, even with complex task dependencies.

Comparing Concurrency Patterns

  • Previous examples (TCP/HTTP servers): Tasks are created per client and live until the connection closes
  • This example (parallel grep): Tasks are created upfront and share work through channels

Both patterns have their uses. Use per-client tasks when each client has its own long-lived state. Use worker pools when you have many small, independent work items to process.