atproto relay implementation in zig zlay.waow.tech
at main 356 lines 11 kB view raw
1//! generic thread pool with key-partitioned queues 2//! 3//! each worker has its own bounded ring buffer + mutex + condvar. 4//! `submit(key, item, shutdown)` routes to `workers[key % N]` for per-key ordering. 5//! blocks when the target queue is full (backpressure to caller). 6//! items stored by value in pre-allocated ring buffer (zero alloc per submit). 7 8const std = @import("std"); 9const Allocator = std.mem.Allocator; 10 11pub fn ThreadPool(comptime T: type, comptime processFn: fn (*T) void) type { 12 return struct { 13 const Self = @This(); 14 15 pub const Config = struct { 16 num_workers: u16 = 8, 17 queue_capacity: u16 = 4096, 18 stack_size: usize = 4 * 1024 * 1024, 19 }; 20 21 const Worker = struct { 22 // ring buffer stored as a slice of T 23 queue: []T, 24 capacity: u16, 25 head: u16 = 0, // next slot to read 26 tail: u16 = 0, // next slot to write 27 count: u16 = 0, 28 mutex: std.Thread.Mutex = .{}, 29 cond: std.Thread.Condition = .{}, // "not empty" — workers wait here 30 not_full: std.Thread.Condition = .{}, // "not full" — submitters wait here 31 alive: bool = true, 32 thread: ?std.Thread = null, 33 }; 34 35 workers: []Worker, 36 allocator: Allocator, 37 38 pub fn init(allocator: Allocator, config: Config) !Self { 39 const workers = try allocator.alloc(Worker, config.num_workers); 40 for (workers) |*w| { 41 w.* = .{ 42 .queue = try allocator.alloc(T, config.queue_capacity), 43 .capacity = config.queue_capacity, 44 }; 45 } 46 47 const self = Self{ 48 .workers = workers, 49 .allocator = allocator, 50 }; 51 52 // spawn worker threads 53 for (self.workers) |*w| { 54 w.thread = try std.Thread.spawn( 55 .{ .stack_size = config.stack_size }, 56 workerLoop, 57 .{w}, 58 ); 59 } 60 61 return self; 62 } 63 64 /// submit an item for processing, routed by key. 65 /// blocks if the target worker's queue is full (backpressure). 66 /// returns false only if shutdown was requested while waiting. 67 pub fn submit(self: *Self, key: u64, item: T, stop: *std.atomic.Value(bool)) bool { 68 const idx = key % self.workers.len; 69 const w = &self.workers[idx]; 70 71 w.mutex.lock(); 72 defer w.mutex.unlock(); 73 74 while (w.count == w.capacity) { 75 if (stop.load(.acquire)) return false; 76 // poll every 100ms so we notice shutdown promptly 77 w.not_full.timedWait(&w.mutex, 100 * std.time.ns_per_ms) catch {}; 78 } 79 80 w.queue[w.tail] = item; 81 w.tail = @intCast((@as(u32, w.tail) + 1) % @as(u32, w.capacity)); 82 w.count += 1; 83 w.cond.signal(); 84 return true; 85 } 86 87 /// drain remaining items and join all worker threads. 88 pub fn shutdown(self: *Self) void { 89 // signal all workers to stop 90 for (self.workers) |*w| { 91 w.mutex.lock(); 92 w.alive = false; 93 w.cond.signal(); 94 w.not_full.broadcast(); // wake any blocked submitters 95 w.mutex.unlock(); 96 } 97 // join all threads 98 for (self.workers) |*w| { 99 if (w.thread) |t| { 100 t.join(); 101 w.thread = null; 102 } 103 } 104 } 105 106 /// free queue storage. 107 pub fn deinit(self: *Self) void { 108 for (self.workers) |*w| { 109 self.allocator.free(w.queue); 110 } 111 self.allocator.free(self.workers); 112 } 113 114 /// total pending items across all workers (diagnostic). 115 pub fn pendingCount(self: *Self) usize { 116 var total: usize = 0; 117 for (self.workers) |*w| { 118 w.mutex.lock(); 119 defer w.mutex.unlock(); 120 total += w.count; 121 } 122 return total; 123 } 124 125 fn workerLoop(w: *Worker) void { 126 while (true) { 127 var item: T = undefined; 128 129 { 130 w.mutex.lock(); 131 defer w.mutex.unlock(); 132 133 while (w.count == 0 and w.alive) { 134 w.cond.wait(&w.mutex); 135 } 136 137 if (w.count == 0 and !w.alive) return; 138 139 item = w.queue[w.head]; 140 w.head = @intCast((@as(u32, w.head) + 1) % @as(u32, w.capacity)); 141 w.count -= 1; 142 w.not_full.signal(); // wake one blocked submitter 143 } 144 145 processFn(&item); 146 } 147 } 148 }; 149} 150 151// --- tests --- 152 153const testing = std.testing; 154 155test "basic submit and process" { 156 const Item = struct { 157 value: u32, 158 processed: *std.atomic.Value(u32), 159 }; 160 161 const S = struct { 162 fn process(item: *Item) void { 163 _ = item.processed.fetchAdd(item.value, .monotonic); 164 } 165 }; 166 167 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 168 var counter: std.atomic.Value(u32) = .{ .raw = 0 }; 169 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 170 .num_workers = 2, 171 .queue_capacity = 64, 172 .stack_size = 1 * 1024 * 1024, 173 }); 174 175 // submit items 176 for (0..10) |i| { 177 const ok = pool.submit(i, .{ 178 .value = @intCast(i + 1), 179 .processed = &counter, 180 }, &shutdown); 181 try testing.expect(ok); 182 } 183 184 pool.shutdown(); 185 defer pool.deinit(); 186 187 // sum of 1..10 = 55 188 try testing.expectEqual(@as(u32, 55), counter.load(.acquire)); 189} 190 191test "per-key ordering preserved" { 192 // items with the same key should be processed in FIFO order 193 const Item = struct { 194 seq: u32, 195 results: *std.ArrayListUnmanaged(u32), 196 mutex: *std.Thread.Mutex, 197 allocator: Allocator, 198 }; 199 200 const S = struct { 201 fn process(item: *Item) void { 202 item.mutex.lock(); 203 defer item.mutex.unlock(); 204 item.results.append(item.allocator, item.seq) catch {}; 205 } 206 }; 207 208 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 209 var results: std.ArrayListUnmanaged(u32) = .{}; 210 defer results.deinit(testing.allocator); 211 var mutex: std.Thread.Mutex = .{}; 212 213 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 214 .num_workers = 4, 215 .queue_capacity = 64, 216 .stack_size = 1 * 1024 * 1024, 217 }); 218 219 // submit 20 items all with key=42 (same worker) 220 for (0..20) |i| { 221 const ok = pool.submit(42, .{ 222 .seq = @intCast(i), 223 .results = &results, 224 .mutex = &mutex, 225 .allocator = testing.allocator, 226 }, &shutdown); 227 try testing.expect(ok); 228 } 229 230 pool.shutdown(); 231 defer pool.deinit(); 232 233 try testing.expectEqual(@as(usize, 20), results.items.len); 234 for (results.items, 0..) |val, i| { 235 try testing.expectEqual(@as(u32, @intCast(i)), val); 236 } 237} 238 239test "submit blocks when queue full, succeeds after drain" { 240 const Item = struct { 241 counter: *std.atomic.Value(u32), 242 }; 243 const S = struct { 244 fn process(item: *Item) void { 245 // slow worker — gives time for queue to fill 246 std.posix.nanosleep(0, 5 * std.time.ns_per_ms); 247 _ = item.counter.fetchAdd(1, .monotonic); 248 } 249 }; 250 251 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 252 var counter: std.atomic.Value(u32) = .{ .raw = 0 }; 253 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 254 .num_workers = 1, 255 .queue_capacity = 4, 256 .stack_size = 1 * 1024 * 1024, 257 }); 258 259 // submit more items than capacity — submit blocks until slots open 260 for (0..20) |_| { 261 const ok = pool.submit(0, .{ .counter = &counter }, &shutdown); 262 try testing.expect(ok); 263 } 264 265 pool.shutdown(); 266 defer pool.deinit(); 267 268 // all 20 should have been processed (none dropped) 269 try testing.expectEqual(@as(u32, 20), counter.load(.acquire)); 270} 271 272test "submit returns false on shutdown" { 273 const Item = struct { 274 stop: *std.atomic.Value(bool), 275 }; 276 const S = struct { 277 fn process(item: *Item) void { 278 // poll until shutdown — allows worker to exit promptly 279 while (!item.stop.load(.acquire)) { 280 std.posix.nanosleep(0, 5 * std.time.ns_per_ms); 281 } 282 } 283 }; 284 285 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 286 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 287 .num_workers = 1, 288 .queue_capacity = 2, 289 .stack_size = 1 * 1024 * 1024, 290 }); 291 292 // fill: 1 processing + 2 queued = capacity reached 293 _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown); 294 _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown); 295 _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown); 296 297 // signal shutdown — next submit should return false 298 shutdown.store(true, .release); 299 const ok = pool.submit(0, .{ .stop = &shutdown }, &shutdown); 300 try testing.expect(!ok); 301 302 pool.shutdown(); 303 defer pool.deinit(); 304} 305 306test "pendingCount reflects queued items" { 307 const Item = struct { x: u32 }; 308 const S = struct { 309 fn process(item: *Item) void { 310 _ = item; 311 // slow worker so items accumulate 312 std.posix.nanosleep(0, 10 * std.time.ns_per_ms); 313 } 314 }; 315 316 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 317 .num_workers = 1, 318 .queue_capacity = 64, 319 .stack_size = 1 * 1024 * 1024, 320 }); 321 322 // initially empty 323 try testing.expectEqual(@as(usize, 0), pool.pendingCount()); 324 325 pool.shutdown(); 326 defer pool.deinit(); 327} 328 329test "shutdown drains remaining items" { 330 const Item = struct { 331 counter: *std.atomic.Value(u32), 332 }; 333 const S = struct { 334 fn process(item: *Item) void { 335 _ = item.counter.fetchAdd(1, .monotonic); 336 } 337 }; 338 339 var shutdown: std.atomic.Value(bool) = .{ .raw = false }; 340 var counter: std.atomic.Value(u32) = .{ .raw = 0 }; 341 var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ 342 .num_workers = 2, 343 .queue_capacity = 64, 344 .stack_size = 1 * 1024 * 1024, 345 }); 346 347 for (0..30) |i| { 348 _ = pool.submit(i, .{ .counter = &counter }, &shutdown); 349 } 350 351 pool.shutdown(); 352 defer pool.deinit(); 353 354 // all 30 should have been processed (shutdown drains) 355 try testing.expectEqual(@as(u32, 30), counter.load(.acquire)); 356}