a modern tui library written in zig
at v0.2.0 10 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6const log = std.log.scoped(.queue); 7 8/// Thread safe. Fixed size. Blocking push and pop. 9pub fn Queue( 10 comptime T: type, 11 comptime size: usize, 12) type { 13 return struct { 14 buf: [size]T = undefined, 15 16 read_index: usize = 0, 17 write_index: usize = 0, 18 19 mutex: std.Thread.Mutex = .{}, 20 // blocks when the buffer is full 21 not_full: Condition = .{}, 22 // ...or empty 23 not_empty: Condition = .{}, 24 25 const Self = @This(); 26 27 /// Pop an item from the queue. Blocks until an item is available. 28 pub fn pop(self: *Self) T { 29 self.mutex.lock(); 30 defer self.mutex.unlock(); 31 while (self.isEmptyLH()) { 32 self.not_empty.wait(&self.mutex); 33 } 34 std.debug.assert(!self.isEmptyLH()); 35 if (self.isFullLH()) { 36 // If we are full, wake up a push that might be 37 // waiting here. 38 self.not_full.signal(); 39 } 40 41 const result = self.buf[self.mask(self.read_index)]; 42 self.read_index = self.mask2(self.read_index + 1); 43 return result; 44 } 45 46 /// Push an item into the queue. Blocks until an item has been 47 /// put in the queue. 48 pub fn push(self: *Self, item: T) void { 49 self.mutex.lock(); 50 defer self.mutex.unlock(); 51 while (self.isFullLH()) { 52 self.not_full.wait(&self.mutex); 53 } 54 if (self.isEmptyLH()) { 55 // If we were empty, wake up a pop if it was waiting. 56 self.not_empty.signal(); 57 } 58 std.debug.assert(!self.isFullLH()); 59 60 self.buf[self.mask(self.write_index)] = item; 61 self.write_index = self.mask2(self.write_index + 1); 62 } 63 64 /// Push an item into the queue. Returns true when the item 65 /// was successfully placed in the queue, false if the queue 66 /// was full. 67 pub fn tryPush(self: *Self, item: T) bool { 68 self.mutex.lock(); 69 if (self.isFullLH()) { 70 self.mutex.unlock(); 71 return false; 72 } 73 self.mutex.unlock(); 74 self.push(item); 75 return true; 76 } 77 78 /// Pop an item from the queue. Returns null when no item is 79 /// available. 80 pub fn tryPop(self: *Self) ?T { 81 self.mutex.lock(); 82 if (self.isEmptyLH()) { 83 self.mutex.unlock(); 84 return null; 85 } 86 self.mutex.unlock(); 87 return self.pop(); 88 } 89 90 /// Poll the queue. This call blocks until events are in the queue 91 pub fn poll(self: *Self) void { 92 self.mutex.lock(); 93 defer self.mutex.unlock(); 94 while (self.isEmptyLH()) { 95 self.not_empty.wait(&self.mutex); 96 } 97 std.debug.assert(!self.isEmptyLH()); 98 } 99 100 fn isEmptyLH(self: Self) bool { 101 return self.write_index == self.read_index; 102 } 103 104 fn isFullLH(self: Self) bool { 105 return self.mask2(self.write_index + self.buf.len) == 106 self.read_index; 107 } 108 109 /// Returns `true` if the queue is empty and `false` otherwise. 110 pub fn isEmpty(self: *Self) bool { 111 self.mutex.lock(); 112 defer self.mutex.unlock(); 113 return self.isEmptyLH(); 114 } 115 116 /// Returns `true` if the queue is full and `false` otherwise. 117 pub fn isFull(self: *Self) bool { 118 self.mutex.lock(); 119 defer self.mutex.unlock(); 120 return self.isFullLH(); 121 } 122 123 /// Returns the length 124 fn len(self: Self) usize { 125 const wrap_offset = 2 * self.buf.len * 126 @intFromBool(self.write_index < self.read_index); 127 const adjusted_write_index = self.write_index + wrap_offset; 128 return adjusted_write_index - self.read_index; 129 } 130 131 /// Returns `index` modulo the length of the backing slice. 132 fn mask(self: Self, index: usize) usize { 133 return index % self.buf.len; 134 } 135 136 /// Returns `index` modulo twice the length of the backing slice. 137 fn mask2(self: Self, index: usize) usize { 138 return index % (2 * self.buf.len); 139 } 140 }; 141} 142 143const testing = std.testing; 144const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 145test "Queue: simple push / pop" { 146 var queue: Queue(u8, 16) = .{}; 147 queue.push(1); 148 queue.push(2); 149 const pop = queue.pop(); 150 try testing.expectEqual(1, pop); 151 try testing.expectEqual(2, queue.pop()); 152} 153 154const Thread = std.Thread; 155fn testPushPop(q: *Queue(u8, 2)) !void { 156 q.push(3); 157 try testing.expectEqual(2, q.pop()); 158} 159 160test "Fill, wait to push, pop once in another thread" { 161 var queue: Queue(u8, 2) = .{}; 162 queue.push(1); 163 queue.push(2); 164 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 165 try testing.expectEqual(false, queue.tryPush(3)); 166 try testing.expectEqual(1, queue.pop()); 167 t.join(); 168 try testing.expectEqual(3, queue.pop()); 169 try testing.expectEqual(null, queue.tryPop()); 170} 171 172fn testPush(q: *Queue(u8, 2)) void { 173 q.push(0); 174 q.push(1); 175 q.push(2); 176 q.push(3); 177 q.push(4); 178} 179 180test "Try to pop, fill from another thread" { 181 var queue: Queue(u8, 2) = .{}; 182 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 183 for (0..5) |idx| { 184 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 185 } 186 thread.join(); 187} 188 189fn sleepyPop(q: *Queue(u8, 2)) !void { 190 // First we wait for the queue to be full. 191 while (!q.isFull()) 192 try Thread.yield(); 193 194 // Then we spuriously wake it up, because that's a thing that can 195 // happen. 196 q.not_full.signal(); 197 q.not_empty.signal(); 198 199 // Then give the other thread a good chance of waking up. It's not 200 // clear that yield guarantees the other thread will be scheduled, 201 // so we'll throw a sleep in here just to be sure. The queue is 202 // still full and the push in the other thread is still blocked 203 // waiting for space. 204 try Thread.yield(); 205 std.time.sleep(std.time.ns_per_s); 206 // Finally, let that other thread go. 207 try std.testing.expectEqual(1, q.pop()); 208 209 // This won't continue until the other thread has had a chance to 210 // put at least one item in the queue. 211 while (!q.isFull()) 212 try Thread.yield(); 213 // But we want to ensure that there's a second push waiting, so 214 // here's another sleep. 215 std.time.sleep(std.time.ns_per_s / 2); 216 217 // Another spurious wake... 218 q.not_full.signal(); 219 q.not_empty.signal(); 220 // And another chance for the other thread to see that it's 221 // spurious and go back to sleep. 222 try Thread.yield(); 223 std.time.sleep(std.time.ns_per_s / 2); 224 225 // Pop that thing and we're done. 226 try std.testing.expectEqual(2, q.pop()); 227} 228 229test "Fill, block, fill, block" { 230 // Fill the queue, block while trying to write another item, have 231 // a background thread unblock us, then block while trying to 232 // write yet another thing. Have the background thread unblock 233 // that too (after some time) then drain the queue. This test 234 // fails if the while loop in `push` is turned into an `if`. 235 236 var queue: Queue(u8, 2) = .{}; 237 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue}); 238 queue.push(1); 239 queue.push(2); 240 const now = std.time.milliTimestamp(); 241 queue.push(3); // This one should block. 242 const then = std.time.milliTimestamp(); 243 244 // Just to make sure the sleeps are yielding to this thread, make 245 // sure it took at least 900ms to do the push. 246 try std.testing.expect(then - now > 900); 247 248 // This should block again, waiting for the other thread. 249 queue.push(4); 250 251 // And once that push has gone through, the other thread's done. 252 thread.join(); 253 try std.testing.expectEqual(3, queue.pop()); 254 try std.testing.expectEqual(4, queue.pop()); 255} 256 257fn sleepyPush(q: *Queue(u8, 1)) !void { 258 // Try to ensure the other thread has already started trying to pop. 259 try Thread.yield(); 260 std.time.sleep(std.time.ns_per_s / 2); 261 262 // Spurious wake 263 q.not_full.signal(); 264 q.not_empty.signal(); 265 266 try Thread.yield(); 267 std.time.sleep(std.time.ns_per_s / 2); 268 269 // Stick something in the queue so it can be popped. 270 q.push(1); 271 // Ensure it's been popped. 272 while (!q.isEmpty()) 273 try Thread.yield(); 274 // Give the other thread time to block again. 275 try Thread.yield(); 276 std.time.sleep(std.time.ns_per_s / 2); 277 278 // Spurious wake 279 q.not_full.signal(); 280 q.not_empty.signal(); 281 282 q.push(2); 283} 284 285test "Drain, block, drain, block" { 286 // This is like fill/block/fill/block, but on the pop end. This 287 // test should fail if the `while` loop in `pop` is turned into an 288 // `if`. 289 290 var queue: Queue(u8, 1) = .{}; 291 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue}); 292 try std.testing.expectEqual(1, queue.pop()); 293 try std.testing.expectEqual(2, queue.pop()); 294 thread.join(); 295} 296 297fn readerThread(q: *Queue(u8, 1)) !void { 298 try testing.expectEqual(1, q.pop()); 299} 300 301test "2 readers" { 302 // 2 threads read, one thread writes 303 var queue: Queue(u8, 1) = .{}; 304 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 305 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 306 try Thread.yield(); 307 std.time.sleep(std.time.ns_per_s / 2); 308 queue.push(1); 309 queue.push(1); 310 t1.join(); 311 t2.join(); 312} 313 314fn writerThread(q: *Queue(u8, 1)) !void { 315 q.push(1); 316} 317 318test "2 writers" { 319 var queue: Queue(u8, 1) = .{}; 320 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 321 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 322 323 try testing.expectEqual(1, queue.pop()); 324 try testing.expectEqual(1, queue.pop()); 325 t1.join(); 326 t2.join(); 327}