a modern tui library written in zig
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6/// Thread safe. Fixed size. Blocking push and pop.
7pub fn Queue(
8 comptime T: type,
9 comptime size: usize,
10) type {
11 return struct {
12 buf: [size]T = undefined,
13
14 read_index: usize = 0,
15 write_index: usize = 0,
16
17 mutex: std.Thread.Mutex = .{},
18 // blocks when the buffer is full
19 not_full: Condition = .{},
20 // ...or empty
21 not_empty: Condition = .{},
22
23 const Self = @This();
24
25 /// Pop an item from the queue. Blocks until an item is available.
26 pub fn pop(self: *Self) T {
27 self.mutex.lock();
28 defer self.mutex.unlock();
29 while (self.isEmptyLH()) {
30 self.not_empty.wait(&self.mutex);
31 }
32 std.debug.assert(!self.isEmptyLH());
33 if (self.isFullLH()) {
34 // If we are full, wake up a push that might be
35 // waiting here.
36 self.not_full.signal();
37 }
38
39 const result = self.buf[self.mask(self.read_index)];
40 self.read_index = self.mask2(self.read_index + 1);
41 return result;
42 }
43
44 /// Push an item into the queue. Blocks until an item has been
45 /// put in the queue.
46 pub fn push(self: *Self, item: T) void {
47 self.mutex.lock();
48 defer self.mutex.unlock();
49 while (self.isFullLH()) {
50 self.not_full.wait(&self.mutex);
51 }
52 if (self.isEmptyLH()) {
53 // If we were empty, wake up a pop if it was waiting.
54 self.not_empty.signal();
55 }
56 std.debug.assert(!self.isFullLH());
57
58 self.buf[self.mask(self.write_index)] = item;
59 self.write_index = self.mask2(self.write_index + 1);
60 }
61
62 /// Push an item into the queue. Returns true when the item
63 /// was successfully placed in the queue, false if the queue
64 /// was full.
65 pub fn tryPush(self: *Self, item: T) bool {
66 self.mutex.lock();
67 if (self.isFullLH()) {
68 self.mutex.unlock();
69 return false;
70 }
71 self.mutex.unlock();
72 self.push(item);
73 return true;
74 }
75
76 /// Pop an item from the queue. Returns null when no item is
77 /// available.
78 pub fn tryPop(self: *Self) ?T {
79 self.mutex.lock();
80 if (self.isEmptyLH()) {
81 self.mutex.unlock();
82 return null;
83 }
84 self.mutex.unlock();
85 return self.pop();
86 }
87
88 /// Poll the queue. This call blocks until events are in the queue
89 pub fn poll(self: *Self) void {
90 self.mutex.lock();
91 defer self.mutex.unlock();
92 while (self.isEmptyLH()) {
93 self.not_empty.wait(&self.mutex);
94 }
95 std.debug.assert(!self.isEmptyLH());
96 }
97
98 fn isEmptyLH(self: Self) bool {
99 return self.write_index == self.read_index;
100 }
101
102 fn isFullLH(self: Self) bool {
103 return self.mask2(self.write_index + self.buf.len) ==
104 self.read_index;
105 }
106
107 /// Returns `true` if the queue is empty and `false` otherwise.
108 pub fn isEmpty(self: *Self) bool {
109 self.mutex.lock();
110 defer self.mutex.unlock();
111 return self.isEmptyLH();
112 }
113
114 /// Returns `true` if the queue is full and `false` otherwise.
115 pub fn isFull(self: *Self) bool {
116 self.mutex.lock();
117 defer self.mutex.unlock();
118 return self.isFullLH();
119 }
120
121 /// Returns the length
122 fn len(self: Self) usize {
123 const wrap_offset = 2 * self.buf.len *
124 @intFromBool(self.write_index < self.read_index);
125 const adjusted_write_index = self.write_index + wrap_offset;
126 return adjusted_write_index - self.read_index;
127 }
128
129 /// Returns `index` modulo the length of the backing slice.
130 fn mask(self: Self, index: usize) usize {
131 return index % self.buf.len;
132 }
133
134 /// Returns `index` modulo twice the length of the backing slice.
135 fn mask2(self: Self, index: usize) usize {
136 return index % (2 * self.buf.len);
137 }
138 };
139}
140
141const testing = std.testing;
142const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
143test "Queue: simple push / pop" {
144 var queue: Queue(u8, 16) = .{};
145 queue.push(1);
146 queue.push(2);
147 const pop = queue.pop();
148 try testing.expectEqual(1, pop);
149 try testing.expectEqual(2, queue.pop());
150}
151
152const Thread = std.Thread;
153fn testPushPop(q: *Queue(u8, 2)) !void {
154 q.push(3);
155 try testing.expectEqual(2, q.pop());
156}
157
158test "Fill, wait to push, pop once in another thread" {
159 var queue: Queue(u8, 2) = .{};
160 queue.push(1);
161 queue.push(2);
162 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
163 try testing.expectEqual(false, queue.tryPush(3));
164 try testing.expectEqual(1, queue.pop());
165 t.join();
166 try testing.expectEqual(3, queue.pop());
167 try testing.expectEqual(null, queue.tryPop());
168}
169
170fn testPush(q: *Queue(u8, 2)) void {
171 q.push(0);
172 q.push(1);
173 q.push(2);
174 q.push(3);
175 q.push(4);
176}
177
178test "Try to pop, fill from another thread" {
179 var queue: Queue(u8, 2) = .{};
180 const thread = try Thread.spawn(cfg, testPush, .{&queue});
181 for (0..5) |idx| {
182 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
183 }
184 thread.join();
185}
186
187fn sleepyPop(q: *Queue(u8, 2)) !void {
188 // First we wait for the queue to be full.
189 while (!q.isFull())
190 try Thread.yield();
191
192 // Then we spuriously wake it up, because that's a thing that can
193 // happen.
194 q.not_full.signal();
195 q.not_empty.signal();
196
197 // Then give the other thread a good chance of waking up. It's not
198 // clear that yield guarantees the other thread will be scheduled,
199 // so we'll throw a sleep in here just to be sure. The queue is
200 // still full and the push in the other thread is still blocked
201 // waiting for space.
202 try Thread.yield();
203 std.time.sleep(std.time.ns_per_s);
204 // Finally, let that other thread go.
205 try std.testing.expectEqual(1, q.pop());
206
207 // This won't continue until the other thread has had a chance to
208 // put at least one item in the queue.
209 while (!q.isFull())
210 try Thread.yield();
211 // But we want to ensure that there's a second push waiting, so
212 // here's another sleep.
213 std.time.sleep(std.time.ns_per_s / 2);
214
215 // Another spurious wake...
216 q.not_full.signal();
217 q.not_empty.signal();
218 // And another chance for the other thread to see that it's
219 // spurious and go back to sleep.
220 try Thread.yield();
221 std.time.sleep(std.time.ns_per_s / 2);
222
223 // Pop that thing and we're done.
224 try std.testing.expectEqual(2, q.pop());
225}
226
227test "Fill, block, fill, block" {
228 // Fill the queue, block while trying to write another item, have
229 // a background thread unblock us, then block while trying to
230 // write yet another thing. Have the background thread unblock
231 // that too (after some time) then drain the queue. This test
232 // fails if the while loop in `push` is turned into an `if`.
233
234 var queue: Queue(u8, 2) = .{};
235 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
236 queue.push(1);
237 queue.push(2);
238 const now = std.time.milliTimestamp();
239 queue.push(3); // This one should block.
240 const then = std.time.milliTimestamp();
241
242 // Just to make sure the sleeps are yielding to this thread, make
243 // sure it took at least 900ms to do the push.
244 try std.testing.expect(then - now > 900);
245
246 // This should block again, waiting for the other thread.
247 queue.push(4);
248
249 // And once that push has gone through, the other thread's done.
250 thread.join();
251 try std.testing.expectEqual(3, queue.pop());
252 try std.testing.expectEqual(4, queue.pop());
253}
254
255fn sleepyPush(q: *Queue(u8, 1)) !void {
256 // Try to ensure the other thread has already started trying to pop.
257 try Thread.yield();
258 std.time.sleep(std.time.ns_per_s / 2);
259
260 // Spurious wake
261 q.not_full.signal();
262 q.not_empty.signal();
263
264 try Thread.yield();
265 std.time.sleep(std.time.ns_per_s / 2);
266
267 // Stick something in the queue so it can be popped.
268 q.push(1);
269 // Ensure it's been popped.
270 while (!q.isEmpty())
271 try Thread.yield();
272 // Give the other thread time to block again.
273 try Thread.yield();
274 std.time.sleep(std.time.ns_per_s / 2);
275
276 // Spurious wake
277 q.not_full.signal();
278 q.not_empty.signal();
279
280 q.push(2);
281}
282
283test "Drain, block, drain, block" {
284 // This is like fill/block/fill/block, but on the pop end. This
285 // test should fail if the `while` loop in `pop` is turned into an
286 // `if`.
287
288 var queue: Queue(u8, 1) = .{};
289 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
290 try std.testing.expectEqual(1, queue.pop());
291 try std.testing.expectEqual(2, queue.pop());
292 thread.join();
293}
294
295fn readerThread(q: *Queue(u8, 1)) !void {
296 try testing.expectEqual(1, q.pop());
297}
298
299test "2 readers" {
300 // 2 threads read, one thread writes
301 var queue: Queue(u8, 1) = .{};
302 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
303 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
304 try Thread.yield();
305 std.time.sleep(std.time.ns_per_s / 2);
306 queue.push(1);
307 queue.push(1);
308 t1.join();
309 t2.join();
310}
311
312fn writerThread(q: *Queue(u8, 1)) !void {
313 q.push(1);
314}
315
316test "2 writers" {
317 var queue: Queue(u8, 1) = .{};
318 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
319 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
320
321 try testing.expectEqual(1, queue.pop());
322 try testing.expectEqual(1, queue.pop());
323 t1.join();
324 t2.join();
325}