//! generic thread pool with key-partitioned queues //! //! each worker has its own bounded ring buffer + mutex + condvar. //! `submit(key, item, shutdown)` routes to `workers[key % N]` for per-key ordering. //! blocks when the target queue is full (backpressure to caller). //! items stored by value in pre-allocated ring buffer (zero alloc per submit). const std = @import("std"); const Allocator = std.mem.Allocator; pub fn ThreadPool(comptime T: type, comptime processFn: fn (*T) void) type { return struct { const Self = @This(); pub const Config = struct { num_workers: u16 = 8, queue_capacity: u16 = 4096, stack_size: usize = 4 * 1024 * 1024, }; const Worker = struct { // ring buffer stored as a slice of T queue: []T, capacity: u16, head: u16 = 0, // next slot to read tail: u16 = 0, // next slot to write count: u16 = 0, mutex: std.Thread.Mutex = .{}, cond: std.Thread.Condition = .{}, // "not empty" — workers wait here not_full: std.Thread.Condition = .{}, // "not full" — submitters wait here alive: bool = true, thread: ?std.Thread = null, }; workers: []Worker, allocator: Allocator, pub fn init(allocator: Allocator, config: Config) !Self { const workers = try allocator.alloc(Worker, config.num_workers); for (workers) |*w| { w.* = .{ .queue = try allocator.alloc(T, config.queue_capacity), .capacity = config.queue_capacity, }; } const self = Self{ .workers = workers, .allocator = allocator, }; // spawn worker threads for (self.workers) |*w| { w.thread = try std.Thread.spawn( .{ .stack_size = config.stack_size }, workerLoop, .{w}, ); } return self; } /// submit an item for processing, routed by key. /// blocks if the target worker's queue is full (backpressure). /// returns false only if shutdown was requested while waiting. pub fn submit(self: *Self, key: u64, item: T, stop: *std.atomic.Value(bool)) bool { const idx = key % self.workers.len; const w = &self.workers[idx]; w.mutex.lock(); defer w.mutex.unlock(); while (w.count == w.capacity) { if (stop.load(.acquire)) return false; // poll every 100ms so we notice shutdown promptly w.not_full.timedWait(&w.mutex, 100 * std.time.ns_per_ms) catch {}; } w.queue[w.tail] = item; w.tail = @intCast((@as(u32, w.tail) + 1) % @as(u32, w.capacity)); w.count += 1; w.cond.signal(); return true; } /// drain remaining items and join all worker threads. pub fn shutdown(self: *Self) void { // signal all workers to stop for (self.workers) |*w| { w.mutex.lock(); w.alive = false; w.cond.signal(); w.not_full.broadcast(); // wake any blocked submitters w.mutex.unlock(); } // join all threads for (self.workers) |*w| { if (w.thread) |t| { t.join(); w.thread = null; } } } /// free queue storage. pub fn deinit(self: *Self) void { for (self.workers) |*w| { self.allocator.free(w.queue); } self.allocator.free(self.workers); } /// total pending items across all workers (diagnostic). pub fn pendingCount(self: *Self) usize { var total: usize = 0; for (self.workers) |*w| { w.mutex.lock(); defer w.mutex.unlock(); total += w.count; } return total; } fn workerLoop(w: *Worker) void { while (true) { var item: T = undefined; { w.mutex.lock(); defer w.mutex.unlock(); while (w.count == 0 and w.alive) { w.cond.wait(&w.mutex); } if (w.count == 0 and !w.alive) return; item = w.queue[w.head]; w.head = @intCast((@as(u32, w.head) + 1) % @as(u32, w.capacity)); w.count -= 1; w.not_full.signal(); // wake one blocked submitter } processFn(&item); } } }; } // --- tests --- const testing = std.testing; test "basic submit and process" { const Item = struct { value: u32, processed: *std.atomic.Value(u32), }; const S = struct { fn process(item: *Item) void { _ = item.processed.fetchAdd(item.value, .monotonic); } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var counter: std.atomic.Value(u32) = .{ .raw = 0 }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 2, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }); // submit items for (0..10) |i| { const ok = pool.submit(i, .{ .value = @intCast(i + 1), .processed = &counter, }, &shutdown); try testing.expect(ok); } pool.shutdown(); defer pool.deinit(); // sum of 1..10 = 55 try testing.expectEqual(@as(u32, 55), counter.load(.acquire)); } test "per-key ordering preserved" { // items with the same key should be processed in FIFO order const Item = struct { seq: u32, results: *std.ArrayListUnmanaged(u32), mutex: *std.Thread.Mutex, allocator: Allocator, }; const S = struct { fn process(item: *Item) void { item.mutex.lock(); defer item.mutex.unlock(); item.results.append(item.allocator, item.seq) catch {}; } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var results: std.ArrayListUnmanaged(u32) = .{}; defer results.deinit(testing.allocator); var mutex: std.Thread.Mutex = .{}; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 4, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }); // submit 20 items all with key=42 (same worker) for (0..20) |i| { const ok = pool.submit(42, .{ .seq = @intCast(i), .results = &results, .mutex = &mutex, .allocator = testing.allocator, }, &shutdown); try testing.expect(ok); } pool.shutdown(); defer pool.deinit(); try testing.expectEqual(@as(usize, 20), results.items.len); for (results.items, 0..) |val, i| { try testing.expectEqual(@as(u32, @intCast(i)), val); } } test "submit blocks when queue full, succeeds after drain" { const Item = struct { counter: *std.atomic.Value(u32), }; const S = struct { fn process(item: *Item) void { // slow worker — gives time for queue to fill std.posix.nanosleep(0, 5 * std.time.ns_per_ms); _ = item.counter.fetchAdd(1, .monotonic); } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var counter: std.atomic.Value(u32) = .{ .raw = 0 }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 1, .queue_capacity = 4, .stack_size = 1 * 1024 * 1024, }); // submit more items than capacity — submit blocks until slots open for (0..20) |_| { const ok = pool.submit(0, .{ .counter = &counter }, &shutdown); try testing.expect(ok); } pool.shutdown(); defer pool.deinit(); // all 20 should have been processed (none dropped) try testing.expectEqual(@as(u32, 20), counter.load(.acquire)); } test "submit returns false on shutdown" { const Item = struct { stop: *std.atomic.Value(bool), }; const S = struct { fn process(item: *Item) void { // poll until shutdown — allows worker to exit promptly while (!item.stop.load(.acquire)) { std.posix.nanosleep(0, 5 * std.time.ns_per_ms); } } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 1, .queue_capacity = 2, .stack_size = 1 * 1024 * 1024, }); // fill: 1 processing + 2 queued = capacity reached _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown); _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown); _ = pool.submit(0, .{ .stop = &shutdown }, &shutdown); // signal shutdown — next submit should return false shutdown.store(true, .release); const ok = pool.submit(0, .{ .stop = &shutdown }, &shutdown); try testing.expect(!ok); pool.shutdown(); defer pool.deinit(); } test "pendingCount reflects queued items" { const Item = struct { x: u32 }; const S = struct { fn process(item: *Item) void { _ = item; // slow worker so items accumulate std.posix.nanosleep(0, 10 * std.time.ns_per_ms); } }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 1, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }); // initially empty try testing.expectEqual(@as(usize, 0), pool.pendingCount()); pool.shutdown(); defer pool.deinit(); } test "shutdown drains remaining items" { const Item = struct { counter: *std.atomic.Value(u32), }; const S = struct { fn process(item: *Item) void { _ = item.counter.fetchAdd(1, .monotonic); } }; var shutdown: std.atomic.Value(bool) = .{ .raw = false }; var counter: std.atomic.Value(u32) = .{ .raw = 0 }; var pool = try ThreadPool(Item, S.process).init(testing.allocator, .{ .num_workers = 2, .queue_capacity = 64, .stack_size = 1 * 1024 * 1024, }); for (0..30) |i| { _ = pool.submit(i, .{ .counter = &counter }, &shutdown); } pool.shutdown(); defer pool.deinit(); // all 30 should have been processed (shutdown drains) try testing.expectEqual(@as(u32, 30), counter.load(.acquire)); }