a modern tui library written in zig
1const std = @import("std");
2const assert = std.debug.assert;
3const atomic = std.atomic;
4const Condition = std.Thread.Condition;
5
6const log = std.log.scoped(.queue);
7
8/// Thread safe. Fixed size. Blocking push and pop.
9pub fn Queue(
10 comptime T: type,
11 comptime size: usize,
12) type {
13 return struct {
14 buf: [size]T = undefined,
15
16 read_index: usize = 0,
17 write_index: usize = 0,
18
19 mutex: std.Thread.Mutex = .{},
20 // blocks when the buffer is full
21 not_full: Condition = .{},
22 // ...or empty
23 not_empty: Condition = .{},
24
25 const Self = @This();
26
27 /// Pop an item from the queue. Blocks until an item is available.
28 pub fn pop(self: *Self) T {
29 self.mutex.lock();
30 defer self.mutex.unlock();
31 while (self.isEmptyLH()) {
32 self.not_empty.wait(&self.mutex);
33 }
34 std.debug.assert(!self.isEmptyLH());
35 if (self.isFullLH()) {
36 // If we are full, wake up a push that might be
37 // waiting here.
38 self.not_full.signal();
39 }
40
41 const result = self.buf[self.mask(self.read_index)];
42 self.read_index = self.mask2(self.read_index + 1);
43 return result;
44 }
45
46 /// Push an item into the queue. Blocks until an item has been
47 /// put in the queue.
48 pub fn push(self: *Self, item: T) void {
49 self.mutex.lock();
50 defer self.mutex.unlock();
51 while (self.isFullLH()) {
52 self.not_full.wait(&self.mutex);
53 }
54 if (self.isEmptyLH()) {
55 // If we were empty, wake up a pop if it was waiting.
56 self.not_empty.signal();
57 }
58 std.debug.assert(!self.isFullLH());
59
60 self.buf[self.mask(self.write_index)] = item;
61 self.write_index = self.mask2(self.write_index + 1);
62 }
63
64 /// Push an item into the queue. Returns true when the item
65 /// was successfully placed in the queue, false if the queue
66 /// was full.
67 pub fn tryPush(self: *Self, item: T) bool {
68 self.mutex.lock();
69 if (self.isFullLH()) {
70 self.mutex.unlock();
71 return false;
72 }
73 self.mutex.unlock();
74 self.push(item);
75 return true;
76 }
77
78 /// Pop an item from the queue. Returns null when no item is
79 /// available.
80 pub fn tryPop(self: *Self) ?T {
81 self.mutex.lock();
82 if (self.isEmptyLH()) {
83 self.mutex.unlock();
84 return null;
85 }
86 self.mutex.unlock();
87 return self.pop();
88 }
89
90 /// Poll the queue. This call blocks until events are in the queue
91 pub fn poll(self: *Self) void {
92 self.mutex.lock();
93 defer self.mutex.unlock();
94 while (self.isEmptyLH()) {
95 self.not_empty.wait(&self.mutex);
96 }
97 std.debug.assert(!self.isEmptyLH());
98 }
99
100 fn isEmptyLH(self: Self) bool {
101 return self.write_index == self.read_index;
102 }
103
104 fn isFullLH(self: Self) bool {
105 return self.mask2(self.write_index + self.buf.len) ==
106 self.read_index;
107 }
108
109 /// Returns `true` if the queue is empty and `false` otherwise.
110 pub fn isEmpty(self: *Self) bool {
111 self.mutex.lock();
112 defer self.mutex.unlock();
113 return self.isEmptyLH();
114 }
115
116 /// Returns `true` if the queue is full and `false` otherwise.
117 pub fn isFull(self: *Self) bool {
118 self.mutex.lock();
119 defer self.mutex.unlock();
120 return self.isFullLH();
121 }
122
123 /// Returns the length
124 fn len(self: Self) usize {
125 const wrap_offset = 2 * self.buf.len *
126 @intFromBool(self.write_index < self.read_index);
127 const adjusted_write_index = self.write_index + wrap_offset;
128 return adjusted_write_index - self.read_index;
129 }
130
131 /// Returns `index` modulo the length of the backing slice.
132 fn mask(self: Self, index: usize) usize {
133 return index % self.buf.len;
134 }
135
136 /// Returns `index` modulo twice the length of the backing slice.
137 fn mask2(self: Self, index: usize) usize {
138 return index % (2 * self.buf.len);
139 }
140 };
141}
142
143const testing = std.testing;
144const cfg = Thread.SpawnConfig{ .allocator = testing.allocator };
145test "Queue: simple push / pop" {
146 var queue: Queue(u8, 16) = .{};
147 queue.push(1);
148 queue.push(2);
149 const pop = queue.pop();
150 try testing.expectEqual(1, pop);
151 try testing.expectEqual(2, queue.pop());
152}
153
154const Thread = std.Thread;
155fn testPushPop(q: *Queue(u8, 2)) !void {
156 q.push(3);
157 try testing.expectEqual(2, q.pop());
158}
159
160test "Fill, wait to push, pop once in another thread" {
161 var queue: Queue(u8, 2) = .{};
162 queue.push(1);
163 queue.push(2);
164 const t = try Thread.spawn(cfg, testPushPop, .{&queue});
165 try testing.expectEqual(false, queue.tryPush(3));
166 try testing.expectEqual(1, queue.pop());
167 t.join();
168 try testing.expectEqual(3, queue.pop());
169 try testing.expectEqual(null, queue.tryPop());
170}
171
172fn testPush(q: *Queue(u8, 2)) void {
173 q.push(0);
174 q.push(1);
175 q.push(2);
176 q.push(3);
177 q.push(4);
178}
179
180test "Try to pop, fill from another thread" {
181 var queue: Queue(u8, 2) = .{};
182 const thread = try Thread.spawn(cfg, testPush, .{&queue});
183 for (0..5) |idx| {
184 try testing.expectEqual(@as(u8, @intCast(idx)), queue.pop());
185 }
186 thread.join();
187}
188
189fn sleepyPop(q: *Queue(u8, 2)) !void {
190 // First we wait for the queue to be full.
191 while (!q.isFull())
192 try Thread.yield();
193
194 // Then we spuriously wake it up, because that's a thing that can
195 // happen.
196 q.not_full.signal();
197 q.not_empty.signal();
198
199 // Then give the other thread a good chance of waking up. It's not
200 // clear that yield guarantees the other thread will be scheduled,
201 // so we'll throw a sleep in here just to be sure. The queue is
202 // still full and the push in the other thread is still blocked
203 // waiting for space.
204 try Thread.yield();
205 std.time.sleep(std.time.ns_per_s);
206 // Finally, let that other thread go.
207 try std.testing.expectEqual(1, q.pop());
208
209 // This won't continue until the other thread has had a chance to
210 // put at least one item in the queue.
211 while (!q.isFull())
212 try Thread.yield();
213 // But we want to ensure that there's a second push waiting, so
214 // here's another sleep.
215 std.time.sleep(std.time.ns_per_s / 2);
216
217 // Another spurious wake...
218 q.not_full.signal();
219 q.not_empty.signal();
220 // And another chance for the other thread to see that it's
221 // spurious and go back to sleep.
222 try Thread.yield();
223 std.time.sleep(std.time.ns_per_s / 2);
224
225 // Pop that thing and we're done.
226 try std.testing.expectEqual(2, q.pop());
227}
228
229test "Fill, block, fill, block" {
230 // Fill the queue, block while trying to write another item, have
231 // a background thread unblock us, then block while trying to
232 // write yet another thing. Have the background thread unblock
233 // that too (after some time) then drain the queue. This test
234 // fails if the while loop in `push` is turned into an `if`.
235
236 var queue: Queue(u8, 2) = .{};
237 const thread = try Thread.spawn(cfg, sleepyPop, .{&queue});
238 queue.push(1);
239 queue.push(2);
240 const now = std.time.milliTimestamp();
241 queue.push(3); // This one should block.
242 const then = std.time.milliTimestamp();
243
244 // Just to make sure the sleeps are yielding to this thread, make
245 // sure it took at least 900ms to do the push.
246 try std.testing.expect(then - now > 900);
247
248 // This should block again, waiting for the other thread.
249 queue.push(4);
250
251 // And once that push has gone through, the other thread's done.
252 thread.join();
253 try std.testing.expectEqual(3, queue.pop());
254 try std.testing.expectEqual(4, queue.pop());
255}
256
257fn sleepyPush(q: *Queue(u8, 1)) !void {
258 // Try to ensure the other thread has already started trying to pop.
259 try Thread.yield();
260 std.time.sleep(std.time.ns_per_s / 2);
261
262 // Spurious wake
263 q.not_full.signal();
264 q.not_empty.signal();
265
266 try Thread.yield();
267 std.time.sleep(std.time.ns_per_s / 2);
268
269 // Stick something in the queue so it can be popped.
270 q.push(1);
271 // Ensure it's been popped.
272 while (!q.isEmpty())
273 try Thread.yield();
274 // Give the other thread time to block again.
275 try Thread.yield();
276 std.time.sleep(std.time.ns_per_s / 2);
277
278 // Spurious wake
279 q.not_full.signal();
280 q.not_empty.signal();
281
282 q.push(2);
283}
284
285test "Drain, block, drain, block" {
286 // This is like fill/block/fill/block, but on the pop end. This
287 // test should fail if the `while` loop in `pop` is turned into an
288 // `if`.
289
290 var queue: Queue(u8, 1) = .{};
291 const thread = try Thread.spawn(cfg, sleepyPush, .{&queue});
292 try std.testing.expectEqual(1, queue.pop());
293 try std.testing.expectEqual(2, queue.pop());
294 thread.join();
295}
296
297fn readerThread(q: *Queue(u8, 1)) !void {
298 try testing.expectEqual(1, q.pop());
299}
300
301test "2 readers" {
302 // 2 threads read, one thread writes
303 var queue: Queue(u8, 1) = .{};
304 const t1 = try Thread.spawn(cfg, readerThread, .{&queue});
305 const t2 = try Thread.spawn(cfg, readerThread, .{&queue});
306 try Thread.yield();
307 std.time.sleep(std.time.ns_per_s / 2);
308 queue.push(1);
309 queue.push(1);
310 t1.join();
311 t2.join();
312}
313
314fn writerThread(q: *Queue(u8, 1)) !void {
315 q.push(1);
316}
317
318test "2 writers" {
319 var queue: Queue(u8, 1) = .{};
320 const t1 = try Thread.spawn(cfg, writerThread, .{&queue});
321 const t2 = try Thread.spawn(cfg, writerThread, .{&queue});
322
323 try testing.expectEqual(1, queue.pop());
324 try testing.expectEqual(1, queue.pop());
325 t1.join();
326 t2.join();
327}