a modern tui library written in zig
at main 11 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 return self.popAndSignalLH(); 34 } 35 36 /// Push an item into the queue. Blocks until an item has been 37 /// put in the queue. 38 pub fn push(self: *Self, item: T) void { 39 self.mutex.lock(); 40 defer self.mutex.unlock(); 41 while (self.isFullLH()) { 42 self.not_full.wait(&self.mutex); 43 } 44 std.debug.assert(!self.isFullLH()); 45 self.pushAndSignalLH(item); 46 } 47 48 /// Push an item into the queue. Returns true when the item 49 /// was successfully placed in the queue, false if the queue 50 /// was full. 51 pub fn tryPush(self: *Self, item: T) bool { 52 self.mutex.lock(); 53 defer self.mutex.unlock(); 54 if (self.isFullLH()) return false; 55 self.pushAndSignalLH(item); 56 return true; 57 } 58 59 /// Pop an item from the queue. Returns null when no item is 60 /// available. 61 pub fn tryPop(self: *Self) ?T { 62 self.mutex.lock(); 63 defer self.mutex.unlock(); 64 if (self.isEmptyLH()) return null; 65 return self.popAndSignalLH(); 66 } 67 68 /// Poll the queue. This call blocks until events are in the queue 69 pub fn poll(self: *Self) void { 70 self.mutex.lock(); 71 defer self.mutex.unlock(); 72 while (self.isEmptyLH()) { 73 self.not_empty.wait(&self.mutex); 74 } 75 std.debug.assert(!self.isEmptyLH()); 76 } 77 78 pub fn lock(self: *Self) void { 79 self.mutex.lock(); 80 } 81 82 pub fn unlock(self: *Self) void { 83 self.mutex.unlock(); 84 } 85 86 /// Used to efficiently drain the queue while the lock is externally held 87 pub fn drain(self: *Self) ?T { 88 if (self.isEmptyLH()) return null; 89 return self.popLH(); 90 } 91 92 fn isEmptyLH(self: Self) bool { 93 return self.write_index == self.read_index; 94 } 95 96 fn isFullLH(self: Self) bool { 97 return self.mask2(self.write_index + self.buf.len) == 98 self.read_index; 99 } 100 101 /// Returns `true` if the queue is empty and `false` otherwise. 102 pub fn isEmpty(self: *Self) bool { 103 self.mutex.lock(); 104 defer self.mutex.unlock(); 105 return self.isEmptyLH(); 106 } 107 108 /// Returns `true` if the queue is full and `false` otherwise. 109 pub fn isFull(self: *Self) bool { 110 self.mutex.lock(); 111 defer self.mutex.unlock(); 112 return self.isFullLH(); 113 } 114 115 /// Returns the length 116 fn len(self: Self) usize { 117 const wrap_offset = 2 * self.buf.len * 118 @intFromBool(self.write_index < self.read_index); 119 const adjusted_write_index = self.write_index + wrap_offset; 120 return adjusted_write_index - self.read_index; 121 } 122 123 /// Returns `index` modulo the length of the backing slice. 124 fn mask(self: Self, index: usize) usize { 125 return index % self.buf.len; 126 } 127 128 /// Returns `index` modulo twice the length of the backing slice. 129 fn mask2(self: Self, index: usize) usize { 130 return index % (2 * self.buf.len); 131 } 132 133 fn pushAndSignalLH(self: *Self, item: T) void { 134 const was_empty = self.isEmptyLH(); 135 self.buf[self.mask(self.write_index)] = item; 136 self.write_index = self.mask2(self.write_index + 1); 137 if (was_empty) { 138 self.not_empty.signal(); 139 } 140 } 141 142 fn popAndSignalLH(self: *Self) T { 143 const was_full = self.isFullLH(); 144 const result = self.popLH(); 145 if (was_full) { 146 self.not_full.signal(); 147 } 148 return result; 149 } 150 151 fn popLH(self: *Self) T { 152 const result = self.buf[self.mask(self.read_index)]; 153 self.read_index = self.mask2(self.read_index + 1); 154 return result; 155 } 156 }; 157} 158 159const testing = std.testing; 160const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 161test "Queue: simple push / pop" { 162 var queue: Queue(u8, 16) = .{}; 163 queue.push(1); 164 queue.push(2); 165 const pop = queue.pop(); 166 try testing.expectEqual(1, pop); 167 try testing.expectEqual(2, queue.pop()); 168} 169 170const Thread = std.Thread; 171fn testPushPop(q: *Queue(u8, 2)) !void { 172 q.push(3); 173 try testing.expectEqual(2, q.pop()); 174} 175 176test "Fill, wait to push, pop once in another thread" { 177 var queue: Queue(u8, 2) = .{}; 178 queue.push(1); 179 queue.push(2); 180 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 181 try testing.expectEqual(false, queue.tryPush(3)); 182 try testing.expectEqual(1, queue.pop()); 183 t.join(); 184 try testing.expectEqual(3, queue.pop()); 185 try testing.expectEqual(null, queue.tryPop()); 186} 187 188fn testPush(q: *Queue(u8, 2)) void { 189 q.push(0); 190 q.push(1); 191 q.push(2); 192 q.push(3); 193 q.push(4); 194} 195 196test "Try to pop, fill from another thread" { 197 var queue: Queue(u8, 2) = .{}; 198 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 199 for (0..5) |idx| { 200 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 201 } 202 thread.join(); 203} 204 205fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void { 206 // First we wait for the queue to be full. 207 while (state.load(.acquire) < 1) 208 try Thread.yield(); 209 210 // Then we spuriously wake it up, because that's a thing that can 211 // happen. 212 q.not_full.signal(); 213 q.not_empty.signal(); 214 215 // Then give the other thread a good chance of waking up. It's not 216 // clear that yield guarantees the other thread will be scheduled, 217 // so we'll throw a sleep in here just to be sure. The queue is 218 // still full and the push in the other thread is still blocked 219 // waiting for space. 220 try Thread.yield(); 221 std.Thread.sleep(10 * std.time.ns_per_ms); 222 // Finally, let that other thread go. 223 try std.testing.expectEqual(1, q.pop()); 224 225 // Wait for the other thread to signal it's ready for second push 226 while (state.load(.acquire) < 2) 227 try Thread.yield(); 228 // But we want to ensure that there's a second push waiting, so 229 // here's another sleep. 230 std.Thread.sleep(10 * std.time.ns_per_ms); 231 232 // Another spurious wake... 233 q.not_full.signal(); 234 q.not_empty.signal(); 235 // And another chance for the other thread to see that it's 236 // spurious and go back to sleep. 237 try Thread.yield(); 238 std.Thread.sleep(10 * std.time.ns_per_ms); 239 240 // Pop that thing and we're done. 241 try std.testing.expectEqual(2, q.pop()); 242} 243 244test "Fill, block, fill, block" { 245 // Fill the queue, block while trying to write another item, have 246 // a background thread unblock us, then block while trying to 247 // write yet another thing. Have the background thread unblock 248 // that too (after some time) then drain the queue. This test 249 // fails if the while loop in `push` is turned into an `if`. 250 251 var queue: Queue(u8, 2) = .{}; 252 var state = atomic.Value(u8).init(0); 253 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state }); 254 queue.push(1); 255 queue.push(2); 256 state.store(1, .release); 257 const now = std.time.milliTimestamp(); 258 queue.push(3); // This one should block. 259 const then = std.time.milliTimestamp(); 260 261 // Just to make sure the sleeps are yielding to this thread, make 262 // sure it took at least 5ms to do the push. 263 try std.testing.expect(then - now > 5); 264 265 state.store(2, .release); 266 // This should block again, waiting for the other thread. 267 queue.push(4); 268 269 // And once that push has gone through, the other thread's done. 270 thread.join(); 271 try std.testing.expectEqual(3, queue.pop()); 272 try std.testing.expectEqual(4, queue.pop()); 273} 274 275fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void { 276 // Try to ensure the other thread has already started trying to pop. 277 try Thread.yield(); 278 std.Thread.sleep(10 * std.time.ns_per_ms); 279 280 // Spurious wake 281 q.not_full.signal(); 282 q.not_empty.signal(); 283 284 try Thread.yield(); 285 std.Thread.sleep(10 * std.time.ns_per_ms); 286 287 // Stick something in the queue so it can be popped. 288 q.push(1); 289 // Ensure it's been popped. 290 while (state.load(.acquire) < 1) 291 try Thread.yield(); 292 // Give the other thread time to block again. 293 try Thread.yield(); 294 std.Thread.sleep(10 * std.time.ns_per_ms); 295 296 // Spurious wake 297 q.not_full.signal(); 298 q.not_empty.signal(); 299 300 q.push(2); 301} 302 303test "Drain, block, drain, block" { 304 // This is like fill/block/fill/block, but on the pop end. This 305 // test should fail if the `while` loop in `pop` is turned into an 306 // `if`. 307 308 var queue: Queue(u8, 1) = .{}; 309 var state = atomic.Value(u8).init(0); 310 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state }); 311 try std.testing.expectEqual(1, queue.pop()); 312 state.store(1, .release); 313 try std.testing.expectEqual(2, queue.pop()); 314 thread.join(); 315} 316 317fn readerThread(q: *Queue(u8, 1)) !void { 318 try testing.expectEqual(1, q.pop()); 319} 320 321test "2 readers" { 322 // 2 threads read, one thread writes 323 var queue: Queue(u8, 1) = .{}; 324 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 325 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 326 try Thread.yield(); 327 std.Thread.sleep(10 * std.time.ns_per_ms); 328 queue.push(1); 329 queue.push(1); 330 t1.join(); 331 t2.join(); 332} 333 334fn writerThread(q: *Queue(u8, 1)) !void { 335 q.push(1); 336} 337 338test "2 writers" { 339 var queue: Queue(u8, 1) = .{}; 340 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 341 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 342 343 try testing.expectEqual(1, queue.pop()); 344 try testing.expectEqual(1, queue.pop()); 345 t1.join(); 346 t2.join(); 347}