this repo has no description
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6/// Thread safe. Fixed size. Blocking push and pop.
7pub fn Queue(
8 comptime T: type,
9 comptime size: usize,
10) type {
11 return struct {
12 buf: [size]T = undefined,
13
14 read_index: usize = 0,
15 write_index: usize = 0,
16
17 mutex: std.Thread.Mutex = .{},
18 // blocks when the buffer is full
19 not_full: Condition = .{},
20 // ...or empty
21 not_empty: Condition = .{},
22
23 /// Optionally supply an eventfd. Queue will write to this when receiving an event if it was
24 /// empty. Can be helpful for applications using poll/epoll and tryPop. Note that any
25 /// failure to write to the eventfd will be logged and a panic will occur.
26 eventfd: ?std.posix.fd_t = null,
27
28 const Self = @This();
29
30 /// Pop an item from the queue. Blocks until an item is available.
31 pub fn pop(self: *Self) T {
32 self.mutex.lock();
33 defer self.mutex.unlock();
34 while (self.isEmptyLH()) {
35 self.not_empty.wait(&self.mutex);
36 }
37 std.debug.assert(!self.isEmptyLH());
38 if (self.isFullLH()) {
39 // If we are full, wake up a push that might be
40 // waiting here.
41 self.not_full.signal();
42 }
43
44 return self.popLH();
45 }
46
47 /// Push an item into the queue. Blocks until an item has been
48 /// put in the queue.
49 pub fn push(self: *Self, item: T) void {
50 self.mutex.lock();
51 defer self.mutex.unlock();
52 while (self.isFullLH()) {
53 self.not_full.wait(&self.mutex);
54 }
55 std.debug.assert(!self.isFullLH());
56 const was_empty = self.isEmptyLH();
57
58 self.buf[self.mask(self.write_index)] = item;
59 self.write_index = self.mask2(self.write_index + 1);
60
61 // If we were empty, wake up a pop if it was waiting.
62 if (was_empty) {
63 self.not_empty.signal();
64 if (self.eventfd) |fd| {
65 const val = @as([8]u8, @bitCast(@as(u64, 1)));
66 _ = std.posix.write(fd, &val) catch |err| {
67 std.log.err("writing to eventfd failed: {}", .{err});
68 @panic("eventfd write failure");
69 };
70 }
71 }
72 }
73
74 /// Push an item into the queue. Returns true when the item
75 /// was successfully placed in the queue, false if the queue
76 /// was full.
77 pub fn tryPush(self: *Self, item: T) bool {
78 self.mutex.lock();
79 if (self.isFullLH()) {
80 self.mutex.unlock();
81 return false;
82 }
83 self.mutex.unlock();
84 self.push(item);
85 return true;
86 }
87
88 /// Pop an item from the queue. Returns null when no item is
89 /// available.
90 pub fn tryPop(self: *Self) ?T {
91 self.mutex.lock();
92 defer self.mutex.unlock();
93 if (self.isEmptyLH()) return null;
94 return self.popLH();
95 }
96
97 /// Poll the queue. This call blocks until events are in the queue
98 pub fn poll(self: *Self) void {
99 self.mutex.lock();
100 defer self.mutex.unlock();
101 while (self.isEmptyLH()) {
102 self.not_empty.wait(&self.mutex);
103 }
104 std.debug.assert(!self.isEmptyLH());
105 }
106
107 pub fn lock(self: *Self) void {
108 self.mutex.lock();
109 }
110
111 pub fn unlock(self: *Self) void {
112 self.mutex.unlock();
113 }
114
115 /// Used to efficiently drain the queue while the lock is externally held
116 pub fn drain(self: *Self) ?T {
117 if (self.isEmptyLH()) return null;
118 return self.popLH();
119 }
120
121 fn isEmptyLH(self: Self) bool {
122 return self.write_index == self.read_index;
123 }
124
125 fn isFullLH(self: Self) bool {
126 return self.mask2(self.write_index + self.buf.len) ==
127 self.read_index;
128 }
129
130 /// Returns `true` if the queue is empty and `false` otherwise.
131 pub fn isEmpty(self: *Self) bool {
132 self.mutex.lock();
133 defer self.mutex.unlock();
134 return self.isEmptyLH();
135 }
136
137 /// Returns `true` if the queue is full and `false` otherwise.
138 pub fn isFull(self: *Self) bool {
139 self.mutex.lock();
140 defer self.mutex.unlock();
141 return self.isFullLH();
142 }
143
144 /// Returns the length
145 fn len(self: Self) usize {
146 const wrap_offset = 2 * self.buf.len *
147 @intFromBool(self.write_index < self.read_index);
148 const adjusted_write_index = self.write_index + wrap_offset;
149 return adjusted_write_index - self.read_index;
150 }
151
152 /// Returns `index` modulo the length of the backing slice.
153 fn mask(self: Self, index: usize) usize {
154 return index % self.buf.len;
155 }
156
157 /// Returns `index` modulo twice the length of the backing slice.
158 fn mask2(self: Self, index: usize) usize {
159 return index % (2 * self.buf.len);
160 }
161
162 fn popLH(self: *Self) T {
163 const result = self.buf[self.mask(self.read_index)];
164 self.read_index = self.mask2(self.read_index + 1);
165 return result;
166 }
167 };
168}
169
170const testing = std.testing;
171const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
172test "Queue: simple push / pop" {
173 var queue: Queue(u8, 16) = .{};
174 queue.push(1);
175 queue.push(2);
176 const pop = queue.pop();
177 try testing.expectEqual(1, pop);
178 try testing.expectEqual(2, queue.pop());
179}
180
181const Thread = std.Thread;
182fn testPushPop(q: *Queue(u8, 2)) !void {
183 q.push(3);
184 try testing.expectEqual(2, q.pop());
185}
186
187test "Fill, wait to push, pop once in another thread" {
188 var queue: Queue(u8, 2) = .{};
189 queue.push(1);
190 queue.push(2);
191 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
192 try testing.expectEqual(false, queue.tryPush(3));
193 try testing.expectEqual(1, queue.pop());
194 t.join();
195 try testing.expectEqual(3, queue.pop());
196 try testing.expectEqual(null, queue.tryPop());
197}
198
199fn testPush(q: *Queue(u8, 2)) void {
200 q.push(0);
201 q.push(1);
202 q.push(2);
203 q.push(3);
204 q.push(4);
205}
206
207test "Try to pop, fill from another thread" {
208 var queue: Queue(u8, 2) = .{};
209 const thread = try Thread.spawn(cfg, testPush, .{&queue});
210 for (0..5) |idx| {
211 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
212 }
213 thread.join();
214}
215
216fn sleepyPop(q: *Queue(u8, 2)) !void {
217 // First we wait for the queue to be full.
218 while (!q.isFull())
219 try Thread.yield();
220
221 // Then we spuriously wake it up, because that's a thing that can
222 // happen.
223 q.not_full.signal();
224 q.not_empty.signal();
225
226 // Then give the other thread a good chance of waking up. It's not
227 // clear that yield guarantees the other thread will be scheduled,
228 // so we'll throw a sleep in here just to be sure. The queue is
229 // still full and the push in the other thread is still blocked
230 // waiting for space.
231 try Thread.yield();
232 std.time.sleep(std.time.ns_per_s);
233 // Finally, let that other thread go.
234 try std.testing.expectEqual(1, q.pop());
235
236 // This won't continue until the other thread has had a chance to
237 // put at least one item in the queue.
238 while (!q.isFull())
239 try Thread.yield();
240 // But we want to ensure that there's a second push waiting, so
241 // here's another sleep.
242 std.time.sleep(std.time.ns_per_s / 2);
243
244 // Another spurious wake...
245 q.not_full.signal();
246 q.not_empty.signal();
247 // And another chance for the other thread to see that it's
248 // spurious and go back to sleep.
249 try Thread.yield();
250 std.time.sleep(std.time.ns_per_s / 2);
251
252 // Pop that thing and we're done.
253 try std.testing.expectEqual(2, q.pop());
254}
255
256test "Fill, block, fill, block" {
257 // Fill the queue, block while trying to write another item, have
258 // a background thread unblock us, then block while trying to
259 // write yet another thing. Have the background thread unblock
260 // that too (after some time) then drain the queue. This test
261 // fails if the while loop in `push` is turned into an `if`.
262
263 var queue: Queue(u8, 2) = .{};
264 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
265 queue.push(1);
266 queue.push(2);
267 const now = std.time.milliTimestamp();
268 queue.push(3); // This one should block.
269 const then = std.time.milliTimestamp();
270
271 // Just to make sure the sleeps are yielding to this thread, make
272 // sure it took at least 900ms to do the push.
273 try std.testing.expect(then - now > 900);
274
275 // This should block again, waiting for the other thread.
276 queue.push(4);
277
278 // And once that push has gone through, the other thread's done.
279 thread.join();
280 try std.testing.expectEqual(3, queue.pop());
281 try std.testing.expectEqual(4, queue.pop());
282}
283
284fn sleepyPush(q: *Queue(u8, 1)) !void {
285 // Try to ensure the other thread has already started trying to pop.
286 try Thread.yield();
287 std.time.sleep(std.time.ns_per_s / 2);
288
289 // Spurious wake
290 q.not_full.signal();
291 q.not_empty.signal();
292
293 try Thread.yield();
294 std.time.sleep(std.time.ns_per_s / 2);
295
296 // Stick something in the queue so it can be popped.
297 q.push(1);
298 // Ensure it's been popped.
299 while (!q.isEmpty())
300 try Thread.yield();
301 // Give the other thread time to block again.
302 try Thread.yield();
303 std.time.sleep(std.time.ns_per_s / 2);
304
305 // Spurious wake
306 q.not_full.signal();
307 q.not_empty.signal();
308
309 q.push(2);
310}
311
312test "Drain, block, drain, block" {
313 // This is like fill/block/fill/block, but on the pop end. This
314 // test should fail if the `while` loop in `pop` is turned into an
315 // `if`.
316
317 var queue: Queue(u8, 1) = .{};
318 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
319 try std.testing.expectEqual(1, queue.pop());
320 try std.testing.expectEqual(2, queue.pop());
321 thread.join();
322}
323
324fn readerThread(q: *Queue(u8, 1)) !void {
325 try testing.expectEqual(1, q.pop());
326}
327
328test "2 readers" {
329 // 2 threads read, one thread writes
330 var queue: Queue(u8, 1) = .{};
331 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
332 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
333 try Thread.yield();
334 std.time.sleep(std.time.ns_per_s / 2);
335 queue.push(1);
336 queue.push(1);
337 t1.join();
338 t2.join();
339}
340
341fn writerThread(q: *Queue(u8, 1)) !void {
342 q.push(1);
343}
344
345test "2 writers" {
346 var queue: Queue(u8, 1) = .{};
347 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
348 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
349
350 try testing.expectEqual(1, queue.pop());
351 try testing.expectEqual(1, queue.pop());
352 t1.join();
353 t2.join();
354}