Parallel Grep
Now that you have seen how to build servers, let's take a step back from networking and focus on core concurrency primitives. In this chapter, we'll explore:
- Channels - ZIO's primary mechanism for communicating between tasks
- Worker pools - distributing work across multiple concurrent tasks
- File I/O - reading files asynchronously
- Multi-threaded execution - running tasks across multiple CPU cores
We'll build a parallel grep tool that searches for patterns in multiple files concurrently, using a worker pool pattern with channels to coordinate the work.
The Code
Replace the contents of src/main.zig with this:
const std = @import("std");
const zio = @import("zio");
const SearchResult = struct {
file_path: []const u8,
line_number: usize,
line: []const u8,
};
fn searchFile(
gpa: std.mem.Allocator,
dir: zio.Dir,
path: []const u8,
pattern: []const u8,
results_channel: *zio.Channel(SearchResult),
) !void {
const file = dir.openFile(path, .{}) catch |err| {
std.log.warn("Failed to open file {s}: {}", .{ path, err });
return;
};
defer file.close();
var read_buffer: [4096]u8 = undefined;
var reader = file.reader(&read_buffer);
var line_number: usize = 0;
while (true) {
const line = reader.interface.takeDelimiterInclusive('\n') catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => |e| return reader.err orelse e,
else => |e| return e,
};
line_number += 1;
if (std.mem.indexOf(u8, line, pattern)) |_| {
const result = SearchResult{
.file_path = path,
.line_number = line_number,
.line = try gpa.dupe(u8, line),
};
errdefer gpa.free(result.line);
try results_channel.send(result);
}
}
}
fn worker(
gpa: std.mem.Allocator,
dir: zio.Dir,
id: usize,
work_channel: *zio.Channel([]const u8),
results_channel: *zio.Channel(SearchResult),
pattern: []const u8,
) zio.Cancelable!void {
while (true) {
const path = work_channel.receive() catch |err| switch (err) {
error.ChannelClosed => {
std.log.info("Worker {} exiting", .{id});
return;
},
error.Canceled => return error.Canceled,
};
std.log.info("Worker {} searching {s}", .{ id, path });
searchFile(gpa, dir, path, pattern, results_channel) catch |err| {
std.log.warn("Worker {} error searching {s}: {}", .{ id, path, err });
};
}
}
fn collector(
gpa: std.mem.Allocator,
results_channel: *zio.Channel(SearchResult),
) !void {
const stdout = zio.stdout();
var write_buffer: [4096]u8 = undefined;
var writer = stdout.writer(&write_buffer);
while (true) {
const result = results_channel.receive() catch |err| switch (err) {
error.ChannelClosed => return,
error.Canceled => return error.Canceled,
};
try writer.interface.print("{s}:{}: {s}", .{
result.file_path,
result.line_number,
result.line,
});
try writer.interface.flush();
gpa.free(result.line);
}
}
pub fn main() !void {
const gpa = std.heap.smp_allocator;
const args = try std.process.argsAlloc(gpa);
defer std.process.argsFree(gpa, args);
if (args.len < 3) {
std.log.err("Usage: {s} <pattern> <file1> [file2...]", .{args[0]});
return error.InvalidArgs;
}
const pattern = args[1];
const files = args[2..];
var rt = try zio.Runtime.init(gpa, .{ .executors = .auto });
defer rt.deinit();
const cwd = zio.Dir.cwd();
// Create channels
var work_buffer: [16][]const u8 = undefined;
var work_channel = zio.Channel([]const u8).init(&work_buffer);
var results_channel = zio.Channel(SearchResult).init(&.{});
var workers_group: zio.Group = .init;
defer workers_group.cancel();
var collector_group: zio.Group = .init;
defer collector_group.cancel();
// Start worker tasks
const num_workers = 4;
for (0..num_workers) |i| {
try workers_group.spawn(worker, .{ gpa, cwd, i, &work_channel, &results_channel, pattern });
}
// Start collector task
try collector_group.spawn(collector, .{ gpa, &results_channel });
// Distribute work
for (files) |file_path| {
work_channel.send(file_path) catch |err| switch (err) {
error.ChannelClosed => break,
error.Canceled => return error.Canceled,
};
}
// Close work channel to signal workers to exit
work_channel.close(.graceful);
// Wait for all workers to finish
try workers_group.wait();
// Now close results channel to signal collector to exit
results_channel.close(.graceful);
// Wait for collector to finish
try collector_group.wait();
std.log.info("Search complete.", .{});
}
Now build and run it:
$ zig build run -- "TODO" src/*.zig
info: Worker 0 searching src/main.zig
info: Worker 1 searching src/root.zig
src/main.zig:42: // TODO: Add error handling
src/root.zig:15: // TODO: Implement feature
info: Worker 0 exiting
info: Worker 1 exiting
info: Search complete.
How It Works
This program uses a worker pool pattern with channels to distribute file searching across multiple tasks. It consists of three types of tasks: the main coordinator, worker tasks, and a collector task.
Multi-threaded Runtime
First, notice how we initialize the runtime:
The .executors = .auto option tells ZIO to create one executor (OS thread) per CPU core. This means our tasks can truly run in parallel across multiple cores, not just concurrently on a single core.
In the previous examples, we didn't specify this option, so the runtime defaulted to a single executor. That was fine for I/O-bound network servers where tasks spend most of their time waiting. But for this CPU-bound workload (searching file contents), using multiple cores gives us real parallelism.
Channels
Before we dive into the tasks, let's understand the communication mechanism:
// Create channels
var work_buffer: [16][]const u8 = undefined;
var work_channel = zio.Channel([]const u8).init(&work_buffer);
var results_channel = zio.Channel(SearchResult).init(&.{});
A Channel is a typed queue for passing messages between tasks. Channels can be:
- Buffered (like
work_channelwith 16 slots) -send()blocks only when the buffer is full - Unbuffered (like
results_channelwith empty slice) -send()blocks until a receiver callsreceive()
Channels provide a safe way to communicate between concurrent tasks without shared memory or locks.
Worker Pool
The program spawns 4 worker tasks that process files from a shared queue:
// Start worker tasks
const num_workers = 4;
for (0..num_workers) |i| {
try workers_group.spawn(worker, .{ gpa, cwd, i, &work_channel, &results_channel, pattern });
}
Each worker runs this loop:
fn worker(
gpa: std.mem.Allocator,
dir: zio.Dir,
id: usize,
work_channel: *zio.Channel([]const u8),
results_channel: *zio.Channel(SearchResult),
pattern: []const u8,
) zio.Cancelable!void {
while (true) {
const path = work_channel.receive() catch |err| switch (err) {
error.ChannelClosed => {
std.log.info("Worker {} exiting", .{id});
return;
},
error.Canceled => return error.Canceled,
};
std.log.info("Worker {} searching {s}", .{ id, path });
searchFile(gpa, dir, path, pattern, results_channel) catch |err| {
std.log.warn("Worker {} error searching {s}: {}", .{ id, path, err });
};
}
}
The worker:
- Receives a file path from
work_channel - When the channel is closed, the worker exits gracefully
- Searches the file for the pattern
- Sends any matches to
results_channel - Repeats until the work queue is exhausted
The Collector Task
A separate task collects and prints results:
fn collector(
gpa: std.mem.Allocator,
results_channel: *zio.Channel(SearchResult),
) !void {
const stdout = zio.stdout();
var write_buffer: [4096]u8 = undefined;
var writer = stdout.writer(&write_buffer);
while (true) {
const result = results_channel.receive() catch |err| switch (err) {
error.ChannelClosed => return,
error.Canceled => return error.Canceled,
};
try writer.interface.print("{s}:{}: {s}", .{
result.file_path,
result.line_number,
result.line,
});
try writer.interface.flush();
gpa.free(result.line);
}
}
The collector runs in its own task to avoid blocking workers. It creates the stdout writer once before the loop, then receives results, prints them, and frees the memory allocated by workers. This demonstrates how ownership can be transferred between tasks through channels.
Coordinating the Work
The main function orchestrates everything:
// Start worker tasks
const num_workers = 4;
for (0..num_workers) |i| {
try workers_group.spawn(worker, .{ gpa, cwd, i, &work_channel, &results_channel, pattern });
}
// Start collector task
try collector_group.spawn(collector, .{ gpa, &results_channel });
// Distribute work
for (files) |file_path| {
work_channel.send(file_path) catch |err| switch (err) {
error.ChannelClosed => break,
error.Canceled => return error.Canceled,
};
}
// Close work channel to signal workers to exit
work_channel.close(.graceful);
// Wait for all workers to finish
try workers_group.wait();
// Now close results channel to signal collector to exit
results_channel.close(.graceful);
// Wait for collector to finish
try collector_group.wait();
This shutdown sequence is important:
- Send all file paths to workers
- Close
work_channel- workers will exit when they drain the queue - Wait for workers to finish - ensures all results are sent
- Close
results_channel- signals collector to exit - Wait for collector to finish - ensures all output is printed
This is a graceful shutdown that ensures no work is lost and all tasks clean up properly.
Memory Management
Notice how memory flows through the system:
// In searchFile (called by workers)
const result = SearchResult{
.file_path = path,
.line_number = line_number,
.line = try gpa.dupe(u8, line), // Allocate
};
errdefer gpa.free(result.line); // Free if send fails
try results_channel.send(result);
// In collector
const result = results_channel.receive() catch ...
// ... print result ...
gpa.free(result.line); // Free
The worker allocates memory for each matching line, sends ownership through the channel, and the collector frees it. Channels ensure this transfer is safe - there's no risk of use-after-free or double-free.
Key Concepts
Multi-threaded Execution
ZIO's runtime can use multiple OS threads (executors) to run tasks in parallel:
.executors = .auto- auto-detect based on CPU count (good for CPU-bound work).executors = .exact(1)- single-threaded (default, good for I/O-bound work).executors = .exact(N)- explicit number of threads
Tasks are automatically distributed across executors. Channels handle synchronization, so you don't need locks or atomic operations - just send and receive messages safely between tasks.
For I/O-bound workloads (like web servers), a single executor is often sufficient since tasks spend most of their time waiting. For CPU-bound workloads (like file processing, data analysis, or computation), multiple executors let you use all available CPU cores.
Channels
Channels are the primary way to communicate between tasks in ZIO. They provide:
- Type safety - channels are typed, so you can only send/receive values of the declared type
- Blocking semantics -
send()andreceive()suspend the task when the operation can't complete immediately - Graceful closure - closed channels return
error.ChannelClosedto signal no more data will be sent
Use buffered channels when you want to decouple producers and consumers, allowing bursts of work. Use unbuffered channels when you want direct handoff between tasks.
Worker Pools
The worker pool pattern is useful when you have:
- A queue of independent work items
- Tasks that can process items in parallel
- A fixed number of workers to limit resource usage
In our example, the 4 workers share the work queue, automatically load-balancing. If one worker gets a large file, others keep processing smaller files.
Graceful Shutdown
The shutdown sequence demonstrates structured concurrency with channels:
- Close the work channel - tells workers "no more work is coming"
- Wait for workers - ensures they finish processing what they have
- Close the results channel - tells collector "no more results are coming"
- Wait for collector - ensures it finishes printing
This pattern prevents data loss and ensures clean shutdown, even with complex task dependencies.
Comparing Concurrency Patterns
- Previous examples (TCP/HTTP servers): Tasks are created per client and live until the connection closes
- This example (parallel grep): Tasks are created upfront and share work through channels
Both patterns have their uses. Use per-client tasks when each client has its own long-lived state. Use worker pools when you have many small, independent work items to process.