atproto relay implementation in zig
zlay.waow.tech
1//! generic thread pool with key-partitioned queues
2//!
3//! each worker has its own bounded ring buffer + mutex + condvar.
4//! `submit(key, item, shutdown)` routes to `workers[key % N]` for per-key ordering.
5//! blocks when the target queue is full (backpressure to caller).
6//! items stored by value in pre-allocated ring buffer (zero alloc per submit).
7
8const std = @import("std");
9const Allocator = std.mem.Allocator;
10
11pub fn ThreadPool(comptime T: type, comptime processFn: fn (*T) void) type {
12 return struct {
13 const Self = @This();
14
15 pub const Config = struct {
16 num_workers: u16 = 8,
17 queue_capacity: u16 = 4096,
18 stack_size: usize = 4 * 1024 * 1024,
19 };
20
21 const Worker = struct {
22 // ring buffer stored as a slice of T
23 queue: []T,
24 capacity: u16,
25 head: u16 = 0, // next slot to read
26 tail: u16 = 0, // next slot to write
27 count: u16 = 0,
28 mutex: std.Thread.Mutex = .{},
29 cond: std.Thread.Condition = .{}, // "not empty" — workers wait here
30 not_full: std.Thread.Condition = .{}, // "not full" — submitters wait here
31 alive: bool = true,
32 thread: ?std.Thread = null,
33 };
34
35 workers: []Worker,
36 allocator: Allocator,
37
38 pub fn init(allocator: Allocator, config: Config) !Self {
39 const workers = try allocator.alloc(Worker, config.num_workers);
40 for (workers) |*w| {
41 w.* = .{
42 .queue = try allocator.alloc(T, config.queue_capacity),
43 .capacity = config.queue_capacity,
44 };
45 }
46
47 const self = Self{
48 .workers = workers,
49 .allocator = allocator,
50 };
51
52 // spawn worker threads
53 for (self.workers) |*w| {
54 w.thread = try std.Thread.spawn(
55 .{ .stack_size = config.stack_size },
56 workerLoop,
57 .{w},
58 );
59 }
60
61 return self;
62 }
63
64 /// submit an item for processing, routed by key.
65 /// blocks if the target worker's queue is full (backpressure).
66 /// returns false only if shutdown was requested while waiting.
67 pub fn submit(self: *Self, key: u64, item: T, stop: *std.atomic.Value(bool)) bool {
68 const idx = key % self.workers.len;
69 const w = &self.workers[idx];
70
71 w.mutex.lock();
72 defer w.mutex.unlock();
73
74 while (w.count == w.capacity) {
75 if (stop.load(.acquire)) return false;
76 // poll every 100ms so we notice shutdown promptly
77 w.not_full.timedWait(&w.mutex, 100 * std.time.ns_per_ms) catch {};
78 }
79
80 w.queue[w.tail] = item;
81 w.tail = @intCast((@as(u32, w.tail) + 1) % @as(u32, w.capacity));
82 w.count += 1;
83 w.cond.signal();
84 return true;
85 }
86
87 /// drain remaining items and join all worker threads.
88 pub fn shutdown(self: *Self) void {
89 // signal all workers to stop
90 for (self.workers) |*w| {
91 w.mutex.lock();
92 w.alive = false;
93 w.cond.signal();
94 w.not_full.broadcast(); // wake any blocked submitters
95 w.mutex.unlock();
96 }
97 // join all threads
98 for (self.workers) |*w| {
99 if (w.thread) |t| {
100 t.join();
101 w.thread = null;
102 }
103 }
104 }
105
106 /// free queue storage.
107 pub fn deinit(self: *Self) void {
108 for (self.workers) |*w| {
109 self.allocator.free(w.queue);
110 }
111 self.allocator.free(self.workers);
112 }
113
114 /// total pending items across all workers (diagnostic).
115 pub fn pendingCount(self: *Self) usize {
116 var total: usize = 0;
117 for (self.workers) |*w| {
118 w.mutex.lock();
119 defer w.mutex.unlock();
120 total += w.count;
121 }
122 return total;
123 }
124
125 fn workerLoop(w: *Worker) void {
126 while (true) {
127 var item: T = undefined;
128
129 {
130 w.mutex.lock();
131 defer w.mutex.unlock();
132
133 while (w.count == 0 and w.alive) {
134 w.cond.wait(&w.mutex);
135 }
136
137 if (w.count == 0 and !w.alive) return;
138
139 item = w.queue[w.head];
140 w.head = @intCast((@as(u32, w.head) + 1) % @as(u32, w.capacity));
141 w.count -= 1;
142 w.not_full.signal(); // wake one blocked submitter
143 }
144
145 processFn(&item);
146 }
147 }
148 };
149}
150
151// --- tests ---
152
153const testing = std.testing;
154
155test "basic submit and process" {
156 const Item = struct {
157 value: u32,
158 processed: *std.atomic.Value(u32),
159 };
160
161 const S = struct {
162 fn process(item: *Item) void {
163 _ = item.processed.fetchAdd(item.value, .monotonic);
164 }
165 };
166
167 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
168 var counter: std.atomic.Value(u32) = .{ .raw = 0 };
169 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
170 .num_workers = 2,
171 .queue_capacity = 64,
172 .stack_size = 1 * 1024 * 1024,
173 });
174
175 // submit items
176 for (0..10) |i| {
177 const ok = pool.submit(i, .{
178 .value = @intCast(i + 1),
179 .processed = &counter,
180 }, &shutdown);
181 try testing.expect(ok);
182 }
183
184 pool.shutdown();
185 defer pool.deinit();
186
187 // sum of 1..10 = 55
188 try testing.expectEqual(@as(u32, 55), counter.load(.acquire));
189}
190
191test "per-key ordering preserved" {
192 // items with the same key should be processed in FIFO order
193 const Item = struct {
194 seq: u32,
195 results: *std.ArrayListUnmanaged(u32),
196 mutex: *std.Thread.Mutex,
197 allocator: Allocator,
198 };
199
200 const S = struct {
201 fn process(item: *Item) void {
202 item.mutex.lock();
203 defer item.mutex.unlock();
204 item.results.append(item.allocator, item.seq) catch {};
205 }
206 };
207
208 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
209 var results: std.ArrayListUnmanaged(u32) = .{};
210 defer results.deinit(testing.allocator);
211 var mutex: std.Thread.Mutex = .{};
212
213 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
214 .num_workers = 4,
215 .queue_capacity = 64,
216 .stack_size = 1 * 1024 * 1024,
217 });
218
219 // submit 20 items all with key=42 (same worker)
220 for (0..20) |i| {
221 const ok = pool.submit(42, .{
222 .seq = @intCast(i),
223 .results = &results,
224 .mutex = &mutex,
225 .allocator = testing.allocator,
226 }, &shutdown);
227 try testing.expect(ok);
228 }
229
230 pool.shutdown();
231 defer pool.deinit();
232
233 try testing.expectEqual(@as(usize, 20), results.items.len);
234 for (results.items, 0..) |val, i| {
235 try testing.expectEqual(@as(u32, @intCast(i)), val);
236 }
237}
238
239test "submit blocks when queue full, succeeds after drain" {
240 const Item = struct {
241 counter: *std.atomic.Value(u32),
242 };
243 const S = struct {
244 fn process(item: *Item) void {
245 // slow worker — gives time for queue to fill
246 std.posix.nanosleep(0, 5 * std.time.ns_per_ms);
247 _ = item.counter.fetchAdd(1, .monotonic);
248 }
249 };
250
251 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
252 var counter: std.atomic.Value(u32) = .{ .raw = 0 };
253 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
254 .num_workers = 1,
255 .queue_capacity = 4,
256 .stack_size = 1 * 1024 * 1024,
257 });
258
259 // submit more items than capacity — submit blocks until slots open
260 for (0..20) |_| {
261 const ok = pool.submit(0, .{ .counter = &counter }, &shutdown);
262 try testing.expect(ok);
263 }
264
265 pool.shutdown();
266 defer pool.deinit();
267
268 // all 20 should have been processed (none dropped)
269 try testing.expectEqual(@as(u32, 20), counter.load(.acquire));
270}
271
272test "submit returns false on shutdown" {
273 const Item = struct {
274 stop: *std.atomic.Value(bool),
275 };
276 const S = struct {
277 fn process(item: *Item) void {
278 // poll until shutdown — allows worker to exit promptly
279 while (!item.stop.load(.acquire)) {
280 std.posix.nanosleep(0, 5 * std.time.ns_per_ms);
281 }
282 }
283 };
284
285 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
286 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
287 .num_workers = 1,
288 .queue_capacity = 2,
289 .stack_size = 1 * 1024 * 1024,
290 });
291
292 // fill: 1 processing + 2 queued = capacity reached
293 _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown);
294 _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown);
295 _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown);
296
297 // signal shutdown — next submit should return false
298 shutdown.store(true, .release);
299 const ok = pool.submit(0, .{ .stop = &shutdown }, &shutdown);
300 try testing.expect(!ok);
301
302 pool.shutdown();
303 defer pool.deinit();
304}
305
306test "pendingCount reflects queued items" {
307 const Item = struct { x: u32 };
308 const S = struct {
309 fn process(item: *Item) void {
310 _ = item;
311 // slow worker so items accumulate
312 std.posix.nanosleep(0, 10 * std.time.ns_per_ms);
313 }
314 };
315
316 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
317 .num_workers = 1,
318 .queue_capacity = 64,
319 .stack_size = 1 * 1024 * 1024,
320 });
321
322 // initially empty
323 try testing.expectEqual(@as(usize, 0), pool.pendingCount());
324
325 pool.shutdown();
326 defer pool.deinit();
327}
328
329test "shutdown drains remaining items" {
330 const Item = struct {
331 counter: *std.atomic.Value(u32),
332 };
333 const S = struct {
334 fn process(item: *Item) void {
335 _ = item.counter.fetchAdd(1, .monotonic);
336 }
337 };
338
339 var shutdown: std.atomic.Value(bool) = .{ .raw = false };
340 var counter: std.atomic.Value(u32) = .{ .raw = 0 };
341 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{
342 .num_workers = 2,
343 .queue_capacity = 64,
344 .stack_size = 1 * 1024 * 1024,
345 });
346
347 for (0..30) |i| {
348 _ = pool.submit(i, .{ .counter = &counter }, &shutdown);
349 }
350
351 pool.shutdown();
352 defer pool.deinit();
353
354 // all 30 should have been processed (shutdown drains)
355 try testing.expectEqual(@as(u32, 30), counter.load(.acquire));
356}