a modern tui library written in zig
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6/// Thread safe. Fixed size. Blocking push and pop.
7pub fn Queue(
8 comptime T: type,
9 comptime size: usize,
10) type {
11 return struct {
12 buf: [size]T = undefined,
13
14 read_index: usize = 0,
15 write_index: usize = 0,
16
17 mutex: std.Thread.Mutex = .{},
18 // blocks when the buffer is full
19 not_full: Condition = .{},
20 // ...or empty
21 not_empty: Condition = .{},
22
23 const Self = @This();
24
25 /// Pop an item from the queue. Blocks until an item is available.
26 pub fn pop(self: *Self) T {
27 self.mutex.lock();
28 defer self.mutex.unlock();
29 while (self.isEmptyLH()) {
30 self.not_empty.wait(&self.mutex);
31 }
32 std.debug.assert(!self.isEmptyLH());
33 return self.popAndSignalLH();
34 }
35
36 /// Push an item into the queue. Blocks until an item has been
37 /// put in the queue.
38 pub fn push(self: *Self, item: T) void {
39 self.mutex.lock();
40 defer self.mutex.unlock();
41 while (self.isFullLH()) {
42 self.not_full.wait(&self.mutex);
43 }
44 std.debug.assert(!self.isFullLH());
45 self.pushAndSignalLH(item);
46 }
47
48 /// Push an item into the queue. Returns true when the item
49 /// was successfully placed in the queue, false if the queue
50 /// was full.
51 pub fn tryPush(self: *Self, item: T) bool {
52 self.mutex.lock();
53 defer self.mutex.unlock();
54 if (self.isFullLH()) return false;
55 self.pushAndSignalLH(item);
56 return true;
57 }
58
59 /// Pop an item from the queue. Returns null when no item is
60 /// available.
61 pub fn tryPop(self: *Self) ?T {
62 self.mutex.lock();
63 defer self.mutex.unlock();
64 if (self.isEmptyLH()) return null;
65 return self.popAndSignalLH();
66 }
67
68 /// Poll the queue. This call blocks until events are in the queue
69 pub fn poll(self: *Self) void {
70 self.mutex.lock();
71 defer self.mutex.unlock();
72 while (self.isEmptyLH()) {
73 self.not_empty.wait(&self.mutex);
74 }
75 std.debug.assert(!self.isEmptyLH());
76 }
77
78 pub fn lock(self: *Self) void {
79 self.mutex.lock();
80 }
81
82 pub fn unlock(self: *Self) void {
83 self.mutex.unlock();
84 }
85
86 /// Used to efficiently drain the queue while the lock is externally held
87 pub fn drain(self: *Self) ?T {
88 if (self.isEmptyLH()) return null;
89 return self.popLH();
90 }
91
92 fn isEmptyLH(self: Self) bool {
93 return self.write_index == self.read_index;
94 }
95
96 fn isFullLH(self: Self) bool {
97 return self.mask2(self.write_index + self.buf.len) ==
98 self.read_index;
99 }
100
101 /// Returns `true` if the queue is empty and `false` otherwise.
102 pub fn isEmpty(self: *Self) bool {
103 self.mutex.lock();
104 defer self.mutex.unlock();
105 return self.isEmptyLH();
106 }
107
108 /// Returns `true` if the queue is full and `false` otherwise.
109 pub fn isFull(self: *Self) bool {
110 self.mutex.lock();
111 defer self.mutex.unlock();
112 return self.isFullLH();
113 }
114
115 /// Returns the length
116 fn len(self: Self) usize {
117 const wrap_offset = 2 * self.buf.len *
118 @intFromBool(self.write_index < self.read_index);
119 const adjusted_write_index = self.write_index + wrap_offset;
120 return adjusted_write_index - self.read_index;
121 }
122
123 /// Returns `index` modulo the length of the backing slice.
124 fn mask(self: Self, index: usize) usize {
125 return index % self.buf.len;
126 }
127
128 /// Returns `index` modulo twice the length of the backing slice.
129 fn mask2(self: Self, index: usize) usize {
130 return index % (2 * self.buf.len);
131 }
132
133 fn pushAndSignalLH(self: *Self, item: T) void {
134 const was_empty = self.isEmptyLH();
135 self.buf[self.mask(self.write_index)] = item;
136 self.write_index = self.mask2(self.write_index + 1);
137 if (was_empty) {
138 self.not_empty.signal();
139 }
140 }
141
142 fn popAndSignalLH(self: *Self) T {
143 const was_full = self.isFullLH();
144 const result = self.popLH();
145 if (was_full) {
146 self.not_full.signal();
147 }
148 return result;
149 }
150
151 fn popLH(self: *Self) T {
152 const result = self.buf[self.mask(self.read_index)];
153 self.read_index = self.mask2(self.read_index + 1);
154 return result;
155 }
156 };
157}
158
159const testing = std.testing;
160const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
161test "Queue: simple push / pop" {
162 var queue: Queue(u8, 16) = .{};
163 queue.push(1);
164 queue.push(2);
165 const pop = queue.pop();
166 try testing.expectEqual(1, pop);
167 try testing.expectEqual(2, queue.pop());
168}
169
170const Thread = std.Thread;
171fn testPushPop(q: *Queue(u8, 2)) !void {
172 q.push(3);
173 try testing.expectEqual(2, q.pop());
174}
175
176test "Fill, wait to push, pop once in another thread" {
177 var queue: Queue(u8, 2) = .{};
178 queue.push(1);
179 queue.push(2);
180 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
181 try testing.expectEqual(false, queue.tryPush(3));
182 try testing.expectEqual(1, queue.pop());
183 t.join();
184 try testing.expectEqual(3, queue.pop());
185 try testing.expectEqual(null, queue.tryPop());
186}
187
188fn testPush(q: *Queue(u8, 2)) void {
189 q.push(0);
190 q.push(1);
191 q.push(2);
192 q.push(3);
193 q.push(4);
194}
195
196test "Try to pop, fill from another thread" {
197 var queue: Queue(u8, 2) = .{};
198 const thread = try Thread.spawn(cfg, testPush, .{&queue});
199 for (0..5) |idx| {
200 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
201 }
202 thread.join();
203}
204
205fn sleepyPop(q: *Queue(u8, 2), state: *atomic.Value(u8)) !void {
206 // First we wait for the queue to be full.
207 while (state.load(.acquire) < 1)
208 try Thread.yield();
209
210 // Then we spuriously wake it up, because that's a thing that can
211 // happen.
212 q.not_full.signal();
213 q.not_empty.signal();
214
215 // Then give the other thread a good chance of waking up. It's not
216 // clear that yield guarantees the other thread will be scheduled,
217 // so we'll throw a sleep in here just to be sure. The queue is
218 // still full and the push in the other thread is still blocked
219 // waiting for space.
220 try Thread.yield();
221 std.Thread.sleep(10 * std.time.ns_per_ms);
222 // Finally, let that other thread go.
223 try std.testing.expectEqual(1, q.pop());
224
225 // Wait for the other thread to signal it's ready for second push
226 while (state.load(.acquire) < 2)
227 try Thread.yield();
228 // But we want to ensure that there's a second push waiting, so
229 // here's another sleep.
230 std.Thread.sleep(10 * std.time.ns_per_ms);
231
232 // Another spurious wake...
233 q.not_full.signal();
234 q.not_empty.signal();
235 // And another chance for the other thread to see that it's
236 // spurious and go back to sleep.
237 try Thread.yield();
238 std.Thread.sleep(10 * std.time.ns_per_ms);
239
240 // Pop that thing and we're done.
241 try std.testing.expectEqual(2, q.pop());
242}
243
244test "Fill, block, fill, block" {
245 // Fill the queue, block while trying to write another item, have
246 // a background thread unblock us, then block while trying to
247 // write yet another thing. Have the background thread unblock
248 // that too (after some time) then drain the queue. This test
249 // fails if the while loop in `push` is turned into an `if`.
250
251 var queue: Queue(u8, 2) = .{};
252 var state = atomic.Value(u8).init(0);
253 const thread = try Thread.spawn(cfg, sleepyPop, .{ &queue, &state });
254 queue.push(1);
255 queue.push(2);
256 state.store(1, .release);
257 const now = std.time.milliTimestamp();
258 queue.push(3); // This one should block.
259 const then = std.time.milliTimestamp();
260
261 // Just to make sure the sleeps are yielding to this thread, make
262 // sure it took at least 5ms to do the push.
263 try std.testing.expect(then - now > 5);
264
265 state.store(2, .release);
266 // This should block again, waiting for the other thread.
267 queue.push(4);
268
269 // And once that push has gone through, the other thread's done.
270 thread.join();
271 try std.testing.expectEqual(3, queue.pop());
272 try std.testing.expectEqual(4, queue.pop());
273}
274
275fn sleepyPush(q: *Queue(u8, 1), state: *atomic.Value(u8)) !void {
276 // Try to ensure the other thread has already started trying to pop.
277 try Thread.yield();
278 std.Thread.sleep(10 * std.time.ns_per_ms);
279
280 // Spurious wake
281 q.not_full.signal();
282 q.not_empty.signal();
283
284 try Thread.yield();
285 std.Thread.sleep(10 * std.time.ns_per_ms);
286
287 // Stick something in the queue so it can be popped.
288 q.push(1);
289 // Ensure it's been popped.
290 while (state.load(.acquire) < 1)
291 try Thread.yield();
292 // Give the other thread time to block again.
293 try Thread.yield();
294 std.Thread.sleep(10 * std.time.ns_per_ms);
295
296 // Spurious wake
297 q.not_full.signal();
298 q.not_empty.signal();
299
300 q.push(2);
301}
302
303test "Drain, block, drain, block" {
304 // This is like fill/block/fill/block, but on the pop end. This
305 // test should fail if the `while` loop in `pop` is turned into an
306 // `if`.
307
308 var queue: Queue(u8, 1) = .{};
309 var state = atomic.Value(u8).init(0);
310 const thread = try Thread.spawn(cfg, sleepyPush, .{ &queue, &state });
311 try std.testing.expectEqual(1, queue.pop());
312 state.store(1, .release);
313 try std.testing.expectEqual(2, queue.pop());
314 thread.join();
315}
316
317fn readerThread(q: *Queue(u8, 1)) !void {
318 try testing.expectEqual(1, q.pop());
319}
320
321test "2 readers" {
322 // 2 threads read, one thread writes
323 var queue: Queue(u8, 1) = .{};
324 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
325 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
326 try Thread.yield();
327 std.Thread.sleep(10 * std.time.ns_per_ms);
328 queue.push(1);
329 queue.push(1);
330 t1.join();
331 t2.join();
332}
333
334fn writerThread(q: *Queue(u8, 1)) !void {
335 q.push(1);
336}
337
338test "2 writers" {
339 var queue: Queue(u8, 1) = .{};
340 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
341 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
342
343 try testing.expectEqual(1, queue.pop());
344 try testing.expectEqual(1, queue.pop());
345 t1.join();
346 t2.join();
347}