a modern tui library written in zig
at v0.4.0 10 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 const Self = @This(); 24 25 /// Pop an item from the queue. Blocks until an item is available. 26 pub fn pop(self: *Self) T { 27 self.mutex.lock(); 28 defer self.mutex.unlock(); 29 while (self.isEmptyLH()) { 30 self.not_empty.wait(&self.mutex); 31 } 32 std.debug.assert(!self.isEmptyLH()); 33 if (self.isFullLH()) { 34 // If we are full, wake up a push that might be 35 // waiting here. 36 self.not_full.signal(); 37 } 38 39 const result = self.buf[self.mask(self.read_index)]; 40 self.read_index = self.mask2(self.read_index + 1); 41 return result; 42 } 43 44 /// Push an item into the queue. Blocks until an item has been 45 /// put in the queue. 46 pub fn push(self: *Self, item: T) void { 47 self.mutex.lock(); 48 defer self.mutex.unlock(); 49 while (self.isFullLH()) { 50 self.not_full.wait(&self.mutex); 51 } 52 if (self.isEmptyLH()) { 53 // If we were empty, wake up a pop if it was waiting. 54 self.not_empty.signal(); 55 } 56 std.debug.assert(!self.isFullLH()); 57 58 self.buf[self.mask(self.write_index)] = item; 59 self.write_index = self.mask2(self.write_index + 1); 60 } 61 62 /// Push an item into the queue. Returns true when the item 63 /// was successfully placed in the queue, false if the queue 64 /// was full. 65 pub fn tryPush(self: *Self, item: T) bool { 66 self.mutex.lock(); 67 if (self.isFullLH()) { 68 self.mutex.unlock(); 69 return false; 70 } 71 self.mutex.unlock(); 72 self.push(item); 73 return true; 74 } 75 76 /// Pop an item from the queue. Returns null when no item is 77 /// available. 78 pub fn tryPop(self: *Self) ?T { 79 self.mutex.lock(); 80 if (self.isEmptyLH()) { 81 self.mutex.unlock(); 82 return null; 83 } 84 self.mutex.unlock(); 85 return self.pop(); 86 } 87 88 /// Poll the queue. This call blocks until events are in the queue 89 pub fn poll(self: *Self) void { 90 self.mutex.lock(); 91 defer self.mutex.unlock(); 92 while (self.isEmptyLH()) { 93 self.not_empty.wait(&self.mutex); 94 } 95 std.debug.assert(!self.isEmptyLH()); 96 } 97 98 fn isEmptyLH(self: Self) bool { 99 return self.write_index == self.read_index; 100 } 101 102 fn isFullLH(self: Self) bool { 103 return self.mask2(self.write_index + self.buf.len) == 104 self.read_index; 105 } 106 107 /// Returns `true` if the queue is empty and `false` otherwise. 108 pub fn isEmpty(self: *Self) bool { 109 self.mutex.lock(); 110 defer self.mutex.unlock(); 111 return self.isEmptyLH(); 112 } 113 114 /// Returns `true` if the queue is full and `false` otherwise. 115 pub fn isFull(self: *Self) bool { 116 self.mutex.lock(); 117 defer self.mutex.unlock(); 118 return self.isFullLH(); 119 } 120 121 /// Returns the length 122 fn len(self: Self) usize { 123 const wrap_offset = 2 * self.buf.len * 124 @intFromBool(self.write_index < self.read_index); 125 const adjusted_write_index = self.write_index + wrap_offset; 126 return adjusted_write_index - self.read_index; 127 } 128 129 /// Returns `index` modulo the length of the backing slice. 130 fn mask(self: Self, index: usize) usize { 131 return index % self.buf.len; 132 } 133 134 /// Returns `index` modulo twice the length of the backing slice. 135 fn mask2(self: Self, index: usize) usize { 136 return index % (2 * self.buf.len); 137 } 138 }; 139} 140 141const testing = std.testing; 142const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 143test "Queue: simple push / pop" { 144 var queue: Queue(u8, 16) = .{}; 145 queue.push(1); 146 queue.push(2); 147 const pop = queue.pop(); 148 try testing.expectEqual(1, pop); 149 try testing.expectEqual(2, queue.pop()); 150} 151 152const Thread = std.Thread; 153fn testPushPop(q: *Queue(u8, 2)) !void { 154 q.push(3); 155 try testing.expectEqual(2, q.pop()); 156} 157 158test "Fill, wait to push, pop once in another thread" { 159 var queue: Queue(u8, 2) = .{}; 160 queue.push(1); 161 queue.push(2); 162 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 163 try testing.expectEqual(false, queue.tryPush(3)); 164 try testing.expectEqual(1, queue.pop()); 165 t.join(); 166 try testing.expectEqual(3, queue.pop()); 167 try testing.expectEqual(null, queue.tryPop()); 168} 169 170fn testPush(q: *Queue(u8, 2)) void { 171 q.push(0); 172 q.push(1); 173 q.push(2); 174 q.push(3); 175 q.push(4); 176} 177 178test "Try to pop, fill from another thread" { 179 var queue: Queue(u8, 2) = .{}; 180 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 181 for (0..5) |idx| { 182 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 183 } 184 thread.join(); 185} 186 187fn sleepyPop(q: *Queue(u8, 2)) !void { 188 // First we wait for the queue to be full. 189 while (!q.isFull()) 190 try Thread.yield(); 191 192 // Then we spuriously wake it up, because that's a thing that can 193 // happen. 194 q.not_full.signal(); 195 q.not_empty.signal(); 196 197 // Then give the other thread a good chance of waking up. It's not 198 // clear that yield guarantees the other thread will be scheduled, 199 // so we'll throw a sleep in here just to be sure. The queue is 200 // still full and the push in the other thread is still blocked 201 // waiting for space. 202 try Thread.yield(); 203 std.time.sleep(std.time.ns_per_s); 204 // Finally, let that other thread go. 205 try std.testing.expectEqual(1, q.pop()); 206 207 // This won't continue until the other thread has had a chance to 208 // put at least one item in the queue. 209 while (!q.isFull()) 210 try Thread.yield(); 211 // But we want to ensure that there's a second push waiting, so 212 // here's another sleep. 213 std.time.sleep(std.time.ns_per_s / 2); 214 215 // Another spurious wake... 216 q.not_full.signal(); 217 q.not_empty.signal(); 218 // And another chance for the other thread to see that it's 219 // spurious and go back to sleep. 220 try Thread.yield(); 221 std.time.sleep(std.time.ns_per_s / 2); 222 223 // Pop that thing and we're done. 224 try std.testing.expectEqual(2, q.pop()); 225} 226 227test "Fill, block, fill, block" { 228 // Fill the queue, block while trying to write another item, have 229 // a background thread unblock us, then block while trying to 230 // write yet another thing. Have the background thread unblock 231 // that too (after some time) then drain the queue. This test 232 // fails if the while loop in `push` is turned into an `if`. 233 234 var queue: Queue(u8, 2) = .{}; 235 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue}); 236 queue.push(1); 237 queue.push(2); 238 const now = std.time.milliTimestamp(); 239 queue.push(3); // This one should block. 240 const then = std.time.milliTimestamp(); 241 242 // Just to make sure the sleeps are yielding to this thread, make 243 // sure it took at least 900ms to do the push. 244 try std.testing.expect(then - now > 900); 245 246 // This should block again, waiting for the other thread. 247 queue.push(4); 248 249 // And once that push has gone through, the other thread's done. 250 thread.join(); 251 try std.testing.expectEqual(3, queue.pop()); 252 try std.testing.expectEqual(4, queue.pop()); 253} 254 255fn sleepyPush(q: *Queue(u8, 1)) !void { 256 // Try to ensure the other thread has already started trying to pop. 257 try Thread.yield(); 258 std.time.sleep(std.time.ns_per_s / 2); 259 260 // Spurious wake 261 q.not_full.signal(); 262 q.not_empty.signal(); 263 264 try Thread.yield(); 265 std.time.sleep(std.time.ns_per_s / 2); 266 267 // Stick something in the queue so it can be popped. 268 q.push(1); 269 // Ensure it's been popped. 270 while (!q.isEmpty()) 271 try Thread.yield(); 272 // Give the other thread time to block again. 273 try Thread.yield(); 274 std.time.sleep(std.time.ns_per_s / 2); 275 276 // Spurious wake 277 q.not_full.signal(); 278 q.not_empty.signal(); 279 280 q.push(2); 281} 282 283test "Drain, block, drain, block" { 284 // This is like fill/block/fill/block, but on the pop end. This 285 // test should fail if the `while` loop in `pop` is turned into an 286 // `if`. 287 288 var queue: Queue(u8, 1) = .{}; 289 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue}); 290 try std.testing.expectEqual(1, queue.pop()); 291 try std.testing.expectEqual(2, queue.pop()); 292 thread.join(); 293} 294 295fn readerThread(q: *Queue(u8, 1)) !void { 296 try testing.expectEqual(1, q.pop()); 297} 298 299test "2 readers" { 300 // 2 threads read, one thread writes 301 var queue: Queue(u8, 1) = .{}; 302 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 303 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 304 try Thread.yield(); 305 std.time.sleep(std.time.ns_per_s / 2); 306 queue.push(1); 307 queue.push(1); 308 t1.join(); 309 t2.join(); 310} 311 312fn writerThread(q: *Queue(u8, 1)) !void { 313 q.push(1); 314} 315 316test "2 writers" { 317 var queue: Queue(u8, 1) = .{}; 318 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 319 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 320 321 try testing.expectEqual(1, queue.pop()); 322 try testing.expectEqual(1, queue.pop()); 323 t1.join(); 324 t2.join(); 325}