this repo has no description
at main 11 kB view raw
1const std = @import("std"); 2const assert = std.debug.assert; 3const atomic = std.atomic; 4const Condition = std.Thread.Condition; 5 6/// Thread safe. Fixed size. Blocking push and pop. 7pub fn Queue( 8 comptime T: type, 9 comptime size: usize, 10) type { 11 return struct { 12 buf: [size]T = undefined, 13 14 read_index: usize = 0, 15 write_index: usize = 0, 16 17 mutex: std.Thread.Mutex = .{}, 18 // blocks when the buffer is full 19 not_full: Condition = .{}, 20 // ...or empty 21 not_empty: Condition = .{}, 22 23 /// Optionally supply an eventfd. Queue will write to this when receiving an event if it was 24 /// empty. Can be helpful for applications using poll/epoll and tryPop. Note that any 25 /// failure to write to the eventfd will be logged and a panic will occur. 26 eventfd: ?std.posix.fd_t = null, 27 28 const Self = @This(); 29 30 /// Pop an item from the queue. Blocks until an item is available. 31 pub fn pop(self: *Self) T { 32 self.mutex.lock(); 33 defer self.mutex.unlock(); 34 while (self.isEmptyLH()) { 35 self.not_empty.wait(&self.mutex); 36 } 37 std.debug.assert(!self.isEmptyLH()); 38 if (self.isFullLH()) { 39 // If we are full, wake up a push that might be 40 // waiting here. 41 self.not_full.signal(); 42 } 43 44 return self.popLH(); 45 } 46 47 /// Push an item into the queue. Blocks until an item has been 48 /// put in the queue. 49 pub fn push(self: *Self, item: T) void { 50 self.mutex.lock(); 51 defer self.mutex.unlock(); 52 while (self.isFullLH()) { 53 self.not_full.wait(&self.mutex); 54 } 55 std.debug.assert(!self.isFullLH()); 56 const was_empty = self.isEmptyLH(); 57 58 self.buf[self.mask(self.write_index)] = item; 59 self.write_index = self.mask2(self.write_index + 1); 60 61 // If we were empty, wake up a pop if it was waiting. 62 if (was_empty) { 63 self.not_empty.signal(); 64 if (self.eventfd) |fd| { 65 const val = @as([8]u8, @bitCast(@as(u64, 1))); 66 _ = std.posix.write(fd, &val) catch |err| { 67 std.log.err("writing to eventfd failed: {}", .{err}); 68 @panic("eventfd write failure"); 69 }; 70 } 71 } 72 } 73 74 /// Push an item into the queue. Returns true when the item 75 /// was successfully placed in the queue, false if the queue 76 /// was full. 77 pub fn tryPush(self: *Self, item: T) bool { 78 self.mutex.lock(); 79 if (self.isFullLH()) { 80 self.mutex.unlock(); 81 return false; 82 } 83 self.mutex.unlock(); 84 self.push(item); 85 return true; 86 } 87 88 /// Pop an item from the queue. Returns null when no item is 89 /// available. 90 pub fn tryPop(self: *Self) ?T { 91 self.mutex.lock(); 92 defer self.mutex.unlock(); 93 if (self.isEmptyLH()) return null; 94 return self.popLH(); 95 } 96 97 /// Poll the queue. This call blocks until events are in the queue 98 pub fn poll(self: *Self) void { 99 self.mutex.lock(); 100 defer self.mutex.unlock(); 101 while (self.isEmptyLH()) { 102 self.not_empty.wait(&self.mutex); 103 } 104 std.debug.assert(!self.isEmptyLH()); 105 } 106 107 pub fn lock(self: *Self) void { 108 self.mutex.lock(); 109 } 110 111 pub fn unlock(self: *Self) void { 112 self.mutex.unlock(); 113 } 114 115 /// Used to efficiently drain the queue while the lock is externally held 116 pub fn drain(self: *Self) ?T { 117 if (self.isEmptyLH()) return null; 118 return self.popLH(); 119 } 120 121 fn isEmptyLH(self: Self) bool { 122 return self.write_index == self.read_index; 123 } 124 125 fn isFullLH(self: Self) bool { 126 return self.mask2(self.write_index + self.buf.len) == 127 self.read_index; 128 } 129 130 /// Returns `true` if the queue is empty and `false` otherwise. 131 pub fn isEmpty(self: *Self) bool { 132 self.mutex.lock(); 133 defer self.mutex.unlock(); 134 return self.isEmptyLH(); 135 } 136 137 /// Returns `true` if the queue is full and `false` otherwise. 138 pub fn isFull(self: *Self) bool { 139 self.mutex.lock(); 140 defer self.mutex.unlock(); 141 return self.isFullLH(); 142 } 143 144 /// Returns the length 145 fn len(self: Self) usize { 146 const wrap_offset = 2 * self.buf.len * 147 @intFromBool(self.write_index < self.read_index); 148 const adjusted_write_index = self.write_index + wrap_offset; 149 return adjusted_write_index - self.read_index; 150 } 151 152 /// Returns `index` modulo the length of the backing slice. 153 fn mask(self: Self, index: usize) usize { 154 return index % self.buf.len; 155 } 156 157 /// Returns `index` modulo twice the length of the backing slice. 158 fn mask2(self: Self, index: usize) usize { 159 return index % (2 * self.buf.len); 160 } 161 162 fn popLH(self: *Self) T { 163 const result = self.buf[self.mask(self.read_index)]; 164 self.read_index = self.mask2(self.read_index + 1); 165 return result; 166 } 167 }; 168} 169 170const testing = std.testing; 171const cfg = Thread.SpawnConfig{ .allocator = testing.allocator }; 172test "Queue: simple push / pop" { 173 var queue: Queue(u8, 16) = .{}; 174 queue.push(1); 175 queue.push(2); 176 const pop = queue.pop(); 177 try testing.expectEqual(1, pop); 178 try testing.expectEqual(2, queue.pop()); 179} 180 181const Thread = std.Thread; 182fn testPushPop(q: *Queue(u8, 2)) !void { 183 q.push(3); 184 try testing.expectEqual(2, q.pop()); 185} 186 187test "Fill, wait to push, pop once in another thread" { 188 var queue: Queue(u8, 2) = .{}; 189 queue.push(1); 190 queue.push(2); 191 const t = try Thread.spawn(cfg, testPushPop, .{&queue}); 192 try testing.expectEqual(false, queue.tryPush(3)); 193 try testing.expectEqual(1, queue.pop()); 194 t.join(); 195 try testing.expectEqual(3, queue.pop()); 196 try testing.expectEqual(null, queue.tryPop()); 197} 198 199fn testPush(q: *Queue(u8, 2)) void { 200 q.push(0); 201 q.push(1); 202 q.push(2); 203 q.push(3); 204 q.push(4); 205} 206 207test "Try to pop, fill from another thread" { 208 var queue: Queue(u8, 2) = .{}; 209 const thread = try Thread.spawn(cfg, testPush, .{&queue}); 210 for (0..5) |idx| { 211 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop()); 212 } 213 thread.join(); 214} 215 216fn sleepyPop(q: *Queue(u8, 2)) !void { 217 // First we wait for the queue to be full. 218 while (!q.isFull()) 219 try Thread.yield(); 220 221 // Then we spuriously wake it up, because that's a thing that can 222 // happen. 223 q.not_full.signal(); 224 q.not_empty.signal(); 225 226 // Then give the other thread a good chance of waking up. It's not 227 // clear that yield guarantees the other thread will be scheduled, 228 // so we'll throw a sleep in here just to be sure. The queue is 229 // still full and the push in the other thread is still blocked 230 // waiting for space. 231 try Thread.yield(); 232 std.time.sleep(std.time.ns_per_s); 233 // Finally, let that other thread go. 234 try std.testing.expectEqual(1, q.pop()); 235 236 // This won't continue until the other thread has had a chance to 237 // put at least one item in the queue. 238 while (!q.isFull()) 239 try Thread.yield(); 240 // But we want to ensure that there's a second push waiting, so 241 // here's another sleep. 242 std.time.sleep(std.time.ns_per_s / 2); 243 244 // Another spurious wake... 245 q.not_full.signal(); 246 q.not_empty.signal(); 247 // And another chance for the other thread to see that it's 248 // spurious and go back to sleep. 249 try Thread.yield(); 250 std.time.sleep(std.time.ns_per_s / 2); 251 252 // Pop that thing and we're done. 253 try std.testing.expectEqual(2, q.pop()); 254} 255 256test "Fill, block, fill, block" { 257 // Fill the queue, block while trying to write another item, have 258 // a background thread unblock us, then block while trying to 259 // write yet another thing. Have the background thread unblock 260 // that too (after some time) then drain the queue. This test 261 // fails if the while loop in `push` is turned into an `if`. 262 263 var queue: Queue(u8, 2) = .{}; 264 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue}); 265 queue.push(1); 266 queue.push(2); 267 const now = std.time.milliTimestamp(); 268 queue.push(3); // This one should block. 269 const then = std.time.milliTimestamp(); 270 271 // Just to make sure the sleeps are yielding to this thread, make 272 // sure it took at least 900ms to do the push. 273 try std.testing.expect(then - now > 900); 274 275 // This should block again, waiting for the other thread. 276 queue.push(4); 277 278 // And once that push has gone through, the other thread's done. 279 thread.join(); 280 try std.testing.expectEqual(3, queue.pop()); 281 try std.testing.expectEqual(4, queue.pop()); 282} 283 284fn sleepyPush(q: *Queue(u8, 1)) !void { 285 // Try to ensure the other thread has already started trying to pop. 286 try Thread.yield(); 287 std.time.sleep(std.time.ns_per_s / 2); 288 289 // Spurious wake 290 q.not_full.signal(); 291 q.not_empty.signal(); 292 293 try Thread.yield(); 294 std.time.sleep(std.time.ns_per_s / 2); 295 296 // Stick something in the queue so it can be popped. 297 q.push(1); 298 // Ensure it's been popped. 299 while (!q.isEmpty()) 300 try Thread.yield(); 301 // Give the other thread time to block again. 302 try Thread.yield(); 303 std.time.sleep(std.time.ns_per_s / 2); 304 305 // Spurious wake 306 q.not_full.signal(); 307 q.not_empty.signal(); 308 309 q.push(2); 310} 311 312test "Drain, block, drain, block" { 313 // This is like fill/block/fill/block, but on the pop end. This 314 // test should fail if the `while` loop in `pop` is turned into an 315 // `if`. 316 317 var queue: Queue(u8, 1) = .{}; 318 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue}); 319 try std.testing.expectEqual(1, queue.pop()); 320 try std.testing.expectEqual(2, queue.pop()); 321 thread.join(); 322} 323 324fn readerThread(q: *Queue(u8, 1)) !void { 325 try testing.expectEqual(1, q.pop()); 326} 327 328test "2 readers" { 329 // 2 threads read, one thread writes 330 var queue: Queue(u8, 1) = .{}; 331 const t1 = try Thread.spawn(cfg, readerThread, .{&queue}); 332 const t2 = try Thread.spawn(cfg, readerThread, .{&queue}); 333 try Thread.yield(); 334 std.time.sleep(std.time.ns_per_s / 2); 335 queue.push(1); 336 queue.push(1); 337 t1.join(); 338 t2.join(); 339} 340 341fn writerThread(q: *Queue(u8, 1)) !void { 342 q.push(1); 343} 344 345test "2 writers" { 346 var queue: Queue(u8, 1) = .{}; 347 const t1 = try Thread.spawn(cfg, writerThread, .{&queue}); 348 const t2 = try Thread.spawn(cfg, writerThread, .{&queue}); 349 350 try testing.expectEqual(1, queue.pop()); 351 try testing.expectEqual(1, queue.pop()); 352 t1.join(); 353 t2.join(); 354}