An asynchronous IO runtime
1const std = @import("std");
2const builtin = @import("builtin");
3
4const Allocator = std.mem.Allocator;
5pub const Mock = @import("ourio/Mock.zig");
6const Queue = @import("ourio/queue.zig").Intrusive;
7const io = @This();
8const posix = std.posix;
9
10pub const has_kqueue = switch (builtin.os.tag) {
11 .dragonfly,
12 .freebsd,
13 .ios,
14 .macos,
15 .netbsd,
16 .openbsd,
17 .tvos,
18 .visionos,
19 .watchos,
20 => true,
21
22 else => false,
23};
24pub const has_io_uring = builtin.os.tag == .linux;
25
26pub const Task = @import("ourio/Task.zig");
27pub const Callback = *const fn (*Ring, Task) anyerror!void;
28pub fn noopCallback(_: *Ring, _: Task) anyerror!void {}
29
30pub const RunCondition = enum {
31 once,
32 until_done,
33 forever,
34};
35
36pub const Context = struct {
37 ptr: ?*anyopaque = null,
38 msg: u16 = 0,
39 cb: Callback = noopCallback,
40};
41
42/// Used for timeouts and deadlines. We make this struct extern because we will ptrCast it to the
43/// linux kernel timespec struct
44pub const Timespec = extern struct {
45 sec: i64 = 0,
46 nsec: i64 = 0,
47
48 pub fn isZero(self: Timespec) bool {
49 return self.sec == 0 and self.nsec == 0;
50 }
51};
52
53pub const Backend = union(enum) {
54 mock: Mock,
55
56 platform: switch (builtin.os.tag) {
57 .dragonfly,
58 .freebsd,
59 .ios,
60 .macos,
61 .netbsd,
62 .openbsd,
63 .tvos,
64 .visionos,
65 .watchos,
66 => @import("ourio/Kqueue.zig"),
67
68 .linux => @import("ourio/Uring.zig"),
69
70 else => @compileError("unsupported os"),
71 },
72
73 pub fn initChild(self: *Backend, entries: u16) !Backend {
74 switch (self.*) {
75 .mock => return .{ .mock = .{} },
76 .platform => |*p| return .{ .platform = try p.initChild(entries) },
77 }
78 }
79
80 pub fn deinit(self: *Backend, gpa: Allocator) void {
81 switch (self.*) {
82 inline else => |*backend| backend.deinit(gpa),
83 }
84 }
85
86 pub fn pollableFd(self: *Backend) !posix.fd_t {
87 return switch (self.*) {
88 inline else => |*backend| backend.pollableFd(),
89 };
90 }
91
92 pub fn submitAndWait(self: *Backend, queue: *SubmissionQueue) !void {
93 return switch (self.*) {
94 inline else => |*backend| backend.submitAndWait(queue),
95 };
96 }
97
98 pub fn submit(self: *Backend, queue: *SubmissionQueue) !void {
99 return switch (self.*) {
100 inline else => |*backend| backend.submit(queue),
101 };
102 }
103
104 pub fn reapCompletions(
105 self: *Backend,
106 rt: *Ring,
107 ) !void {
108 return switch (self.*) {
109 inline else => |*backend| backend.reapCompletions(rt),
110 };
111 }
112
113 pub fn done(self: *Backend) bool {
114 return switch (self.*) {
115 inline else => |*backend| backend.done(),
116 };
117 }
118};
119
120pub const CompletionQueue = Queue(Task, .complete);
121pub const FreeQueue = Queue(Task, .free);
122pub const SubmissionQueue = Queue(Task, .in_flight);
123
124pub const Ring = struct {
125 backend: Backend,
126 gpa: Allocator,
127
128 completion_q: CompletionQueue = .{},
129 submission_q: SubmissionQueue = .{},
130 free_q: FreeQueue = .{},
131
132 pub fn init(gpa: Allocator, entries: u16) !Ring {
133 return .{
134 .backend = .{ .platform = try .init(gpa, entries) },
135 .gpa = gpa,
136 };
137 }
138
139 pub fn initChild(self: *Ring, entries: u16) !Ring {
140 return .{
141 .backend = try self.backend.initChild(entries),
142 .gpa = self.gpa,
143 };
144 }
145
146 pub fn initMock(gpa: Allocator, entries: u16) !Ring {
147 return .{
148 .backend = .{ .mock = try .init(entries) },
149 .gpa = gpa,
150 .free_q = .{},
151 .submission_q = .{},
152 .completion_q = .{},
153 };
154 }
155
156 pub fn deinit(self: *Ring) void {
157 self.backend.deinit(self.gpa);
158 while (self.free_q.pop()) |task| self.gpa.destroy(task);
159 while (self.submission_q.pop()) |task| self.gpa.destroy(task);
160 while (self.completion_q.pop()) |task| self.gpa.destroy(task);
161 }
162
163 pub fn run(self: *Ring, condition: RunCondition) !void {
164 while (true) {
165 try self.backend.submitAndWait(&self.submission_q);
166 try self.backend.reapCompletions(self);
167 switch (condition) {
168 .once => return,
169 .until_done => if (self.backend.done() and self.submission_q.empty()) return,
170 .forever => {},
171 }
172 }
173 }
174
175 pub fn getTask(self: *Ring) Allocator.Error!*Task {
176 return self.free_q.pop() orelse try self.gpa.create(Task);
177 }
178
179 pub fn noop(
180 self: *Ring,
181 ctx: Context,
182 ) Allocator.Error!*Task {
183 const task = try self.getTask();
184 task.* = .{
185 .userdata = ctx.ptr,
186 .msg = ctx.msg,
187 .callback = ctx.cb,
188 .req = .noop,
189 };
190
191 self.submission_q.push(task);
192 return task;
193 }
194
195 pub fn timer(
196 self: *Ring,
197 duration: Timespec,
198 ctx: Context,
199 ) Allocator.Error!*Task {
200 const task = try self.getTask();
201 task.* = .{
202 .userdata = ctx.ptr,
203 .msg = ctx.msg,
204 .callback = ctx.cb,
205 .req = .{ .timer = duration },
206 };
207
208 self.submission_q.push(task);
209 return task;
210 }
211
212 pub fn cancelAll(self: *Ring) Allocator.Error!void {
213 const task = try self.getTask();
214 task.* = .{
215 .req = .{ .cancel = .all },
216 };
217
218 self.submission_q.push(task);
219 }
220
221 pub fn accept(
222 self: *Ring,
223 fd: posix.fd_t,
224 ctx: Context,
225 ) Allocator.Error!*Task {
226 const task = try self.getTask();
227 task.* = .{
228 .userdata = ctx.ptr,
229 .msg = ctx.msg,
230 .callback = ctx.cb,
231 .req = .{ .accept = fd },
232 };
233
234 self.submission_q.push(task);
235 return task;
236 }
237
238 pub fn msgRing(
239 self: *Ring,
240 target: *Ring,
241 target_task: *Task, // The task that the target ring will receive. The callbacks of
242 // this task are what will be called when the target receives the message
243
244 ctx: Context,
245 ) Allocator.Error!*Task {
246 // This is the task to send the message
247 const task = try self.getTask();
248 task.* = .{
249 .userdata = ctx.ptr,
250 .msg = ctx.msg,
251 .callback = ctx.cb,
252 .req = .{ .msg_ring = .{
253 .target = target,
254 .task = target_task,
255 } },
256 };
257 target_task.state = .in_flight;
258 self.submission_q.push(task);
259 return task;
260 }
261
262 pub fn recv(
263 self: *Ring,
264 fd: posix.fd_t,
265 buffer: []u8,
266 ctx: Context,
267 ) Allocator.Error!*Task {
268 const task = try self.getTask();
269 task.* = .{
270 .userdata = ctx.ptr,
271 .msg = ctx.msg,
272 .callback = ctx.cb,
273 .req = .{ .recv = .{
274 .fd = fd,
275 .buffer = buffer,
276 } },
277 };
278
279 self.submission_q.push(task);
280 return task;
281 }
282
283 pub fn write(
284 self: *Ring,
285 fd: posix.fd_t,
286 buffer: []const u8,
287 ctx: Context,
288 ) Allocator.Error!*Task {
289 const task = try self.getTask();
290 task.* = .{
291 .userdata = ctx.ptr,
292 .msg = ctx.msg,
293 .callback = ctx.cb,
294 .req = .{ .write = .{
295 .fd = fd,
296 .buffer = buffer,
297 } },
298 };
299
300 self.submission_q.push(task);
301 return task;
302 }
303
304 pub fn writev(
305 self: *Ring,
306 fd: posix.fd_t,
307 vecs: []const posix.iovec_const,
308 ctx: Context,
309 ) Allocator.Error!*Task {
310 const task = try self.getTask();
311 task.* = .{
312 .userdata = ctx.ptr,
313 .msg = ctx.msg,
314 .callback = ctx.cb,
315 .req = .{ .writev = .{
316 .fd = fd,
317 .vecs = vecs,
318 } },
319 };
320
321 self.submission_q.push(task);
322 return task;
323 }
324
325 pub fn close(
326 self: *Ring,
327 fd: posix.fd_t,
328 ctx: Context,
329 ) Allocator.Error!*Task {
330 const task = try self.getTask();
331 task.* = .{
332 .userdata = ctx.ptr,
333 .msg = ctx.msg,
334 .callback = ctx.cb,
335 .req = .{ .close = fd },
336 };
337
338 self.submission_q.push(task);
339 return task;
340 }
341
342 pub fn poll(
343 self: *Ring,
344 fd: posix.fd_t,
345 mask: u32,
346 ctx: Context,
347 ) Allocator.Error!*Task {
348 const task = try self.getTask();
349 task.* = .{
350 .userdata = ctx.ptr,
351 .msg = ctx.msg,
352 .callback = ctx.cb,
353 .req = .{ .poll = .{ .fd = fd, .mask = mask } },
354 };
355
356 self.submission_q.push(task);
357 return task;
358 }
359
360 pub fn socket(
361 self: *Ring,
362 domain: u32,
363 socket_type: u32,
364 protocol: u32,
365 ctx: Context,
366 ) Allocator.Error!*Task {
367 const task = try self.getTask();
368 task.* = .{
369 .userdata = ctx.ptr,
370 .msg = ctx.msg,
371 .callback = ctx.cb,
372 .req = .{ .socket = .{ .domain = domain, .type = socket_type, .protocol = protocol } },
373 };
374
375 self.submission_q.push(task);
376 return task;
377 }
378
379 pub fn connect(
380 self: *Ring,
381 fd: posix.socket_t,
382 addr: *posix.sockaddr,
383 addr_len: posix.socklen_t,
384 ctx: Context,
385 ) Allocator.Error!*Task {
386 const task = try self.getTask();
387 task.* = .{
388 .userdata = ctx.ptr,
389 .msg = ctx.msg,
390 .callback = ctx.cb,
391 .req = .{ .connect = .{ .fd = fd, .addr = addr, .addr_len = addr_len } },
392 };
393
394 self.submission_q.push(task);
395 return task;
396 }
397
398 pub fn stat(
399 self: *Ring,
400 path: [:0]const u8,
401 result: *Statx,
402 ctx: Context,
403 ) Allocator.Error!*Task {
404 const task = try self.getTask();
405 task.* = .{
406 .userdata = ctx.ptr,
407 .msg = ctx.msg,
408 .callback = ctx.cb,
409 .req = .{ .statx = .{ .path = path, .result = result } },
410 };
411
412 self.submission_q.push(task);
413 return task;
414 }
415
416 pub fn readv(
417 self: *Ring,
418 fd: posix.fd_t,
419 vecs: []const posix.iovec,
420 ctx: Context,
421 ) Allocator.Error!*Task {
422 const task = try self.getTask();
423 task.* = .{
424 .userdata = ctx.ptr,
425 .msg = ctx.msg,
426 .callback = ctx.cb,
427 .req = .{ .readv = .{
428 .fd = fd,
429 .vecs = vecs,
430 } },
431 };
432
433 self.submission_q.push(task);
434 return task;
435 }
436
437 pub fn read(
438 self: *Ring,
439 fd: posix.fd_t,
440 buffer: []u8,
441 ctx: Context,
442 ) Allocator.Error!*Task {
443 const task = try self.getTask();
444 task.* = .{
445 .userdata = ctx.ptr,
446 .msg = ctx.msg,
447 .callback = ctx.cb,
448 .req = .{ .read = .{
449 .fd = fd,
450 .buffer = buffer,
451 } },
452 };
453
454 self.submission_q.push(task);
455 return task;
456 }
457
458 pub fn open(
459 self: *Ring,
460 path: [:0]const u8,
461 flags: posix.O,
462 mode: posix.mode_t,
463 ctx: Context,
464 ) Allocator.Error!*Task {
465 const task = try self.getTask();
466 task.* = .{
467 .userdata = ctx.ptr,
468 .msg = ctx.msg,
469 .callback = ctx.cb,
470 .req = .{ .open = .{ .path = path, .flags = flags, .mode = mode } },
471 };
472
473 self.submission_q.push(task);
474 return task;
475 }
476
477 /// Spawns a thread with a Ring instance. The thread will be idle and waiting to receive work
478 /// via msgRing when this function returns. Call kill on the returned thread to signal it to
479 /// shutdown.
480 pub fn spawnThread(self: *Ring, entries: u16) !*Thread {
481 const thread = try self.gpa.create(Thread);
482 errdefer self.gpa.destroy(thread);
483
484 var wg: std.Thread.WaitGroup = .{};
485 wg.start();
486 thread.thread = try std.Thread.spawn(.{}, Thread.run, .{ thread, self, &wg, entries });
487 wg.wait();
488
489 return thread;
490 }
491};
492
493pub const Thread = struct {
494 thread: std.Thread,
495 ring: io.Ring = undefined,
496
497 pub const Msg = enum {
498 kill,
499 };
500
501 pub fn run(self: *Thread, parent: *io.Ring, wg: *std.Thread.WaitGroup, entries: u16) !void {
502 self.ring = try parent.initChild(entries);
503 wg.finish();
504
505 defer self.ring.deinit();
506
507 // Run forever, because we may not start with a task. Inter-thread messaging means we could
508 // receive work at any time
509 self.ring.run(.forever) catch |err| {
510 switch (err) {
511 error.ThreadKilled => return,
512 else => return err,
513 }
514 };
515 }
516
517 /// Kill sends a message to the thread telling it to exit. Callers of this thread can safely
518 /// join and deinit the Thread in the Context callback
519 pub fn kill(self: *Thread, rt: *io.Ring, ctx: Context) Allocator.Error!*io.Task {
520 const target_task = try rt.getTask();
521 target_task.* = .{
522 .userdata = self,
523 .msg = @intFromEnum(Thread.Msg.kill),
524 .callback = Thread.onCompletion,
525 .result = .noop,
526 };
527
528 return rt.msgRing(&self.ring, target_task, ctx);
529 }
530
531 pub fn join(self: Thread) void {
532 self.thread.join();
533 }
534
535 fn onCompletion(_: *io.Ring, task: Task) anyerror!void {
536 switch (task.msgToEnum(Thread.Msg)) {
537 .kill => return error.ThreadKilled,
538 }
539 }
540};
541
542pub const Op = enum {
543 noop,
544 deadline,
545 timer,
546 cancel,
547 accept,
548 msg_ring,
549 recv,
550 write,
551 writev,
552 close,
553 poll,
554 socket,
555 connect,
556 statx,
557 readv,
558 open,
559 read,
560
561 /// userbytes is meant to send slices of bytes between Ring instances or callbacks
562 userbytes,
563 /// userfd is meant to send file descriptors between Ring instances (using msgRing)
564 userfd,
565 /// usermsg is meant to send a u16 between runtime instances (using msgRing)
566 usermsg,
567 /// userptr is meant to send a pointer between runtime instances (using msgRing)
568 userptr,
569};
570
571pub const Request = union(Op) {
572 noop,
573 deadline: Timespec,
574 timer: Timespec,
575 cancel: union(enum) {
576 all,
577 task: *Task,
578 },
579 accept: posix.fd_t,
580 msg_ring: struct {
581 target: *Ring,
582 task: *Task,
583 },
584 recv: struct {
585 fd: posix.fd_t,
586 buffer: []u8,
587 },
588 write: struct {
589 fd: posix.fd_t,
590 buffer: []const u8,
591 },
592 writev: struct {
593 fd: posix.fd_t,
594 vecs: []const posix.iovec_const,
595 },
596 close: posix.fd_t,
597 poll: struct {
598 fd: posix.fd_t,
599 mask: u32,
600 },
601 socket: struct {
602 domain: u32,
603 type: u32,
604 protocol: u32,
605 },
606 connect: struct {
607 fd: posix.socket_t,
608 addr: *posix.sockaddr,
609 addr_len: posix.socklen_t,
610 },
611 statx: struct {
612 path: [:0]const u8,
613 result: *Statx, // this will be filled in by the op
614 },
615 readv: struct {
616 fd: posix.fd_t,
617 vecs: []const posix.iovec,
618 },
619 open: struct {
620 path: [:0]const u8,
621 flags: posix.O,
622 mode: posix.mode_t,
623 },
624 read: struct {
625 fd: posix.fd_t,
626 buffer: []u8,
627 },
628
629 userbytes,
630 userfd,
631 usermsg,
632 userptr,
633};
634
635pub const Result = union(Op) {
636 noop,
637 deadline: ResultError!void,
638 timer: ResultError!void,
639 cancel: CancelError!void,
640 accept: ResultError!posix.fd_t,
641 msg_ring: ResultError!void,
642 recv: RecvError!usize,
643 write: ResultError!usize,
644 writev: ResultError!usize,
645 close: ResultError!void,
646 poll: ResultError!void,
647 socket: ResultError!posix.fd_t,
648 connect: ResultError!void,
649 statx: ResultError!*Statx,
650 readv: ResultError!usize,
651 open: ResultError!posix.fd_t,
652 read: ResultError!usize,
653
654 userbytes: anyerror![]const u8,
655 userfd: anyerror!posix.fd_t,
656 usermsg: u16,
657 userptr: anyerror!?*anyopaque,
658};
659
660pub const ResultError = error{
661 /// The request was invalid
662 Invalid,
663 /// The request was canceled
664 Canceled,
665 /// An unexpected error occured
666 Unexpected,
667};
668
669pub const CancelError = ResultError || error{
670 /// The entry to cancel couldn't be found
671 EntryNotFound,
672 /// The entry couldn't be canceled
673 NotCanceled,
674};
675
676pub const RecvError = ResultError || error{
677 /// The entry to cancel couldn't be found
678 ConnectionResetByPeer,
679};
680
681test {
682 _ = @import("ourio/Mock.zig");
683 _ = @import("ourio/queue.zig");
684
685 if (has_io_uring) _ = @import("ourio/Uring.zig");
686 if (has_kqueue) _ = @import("ourio/Kqueue.zig");
687}
688
689/// Foo is only for testing
690const Foo = struct {
691 bar: usize = 0,
692
693 fn callback(_: *io.Ring, task: io.Task) anyerror!void {
694 const self = task.userdataCast(Foo);
695 self.bar += 1;
696 }
697};
698
699/// Follows the ABI of linux statx. Not all platforms will contain all information, or may contain
700/// more information than requested
701pub const Statx = extern struct {
702 /// Mask of bits indicating filled fields
703 mask: u32,
704
705 /// Block size for filesystem I/O
706 blksize: u32,
707
708 /// Extra file attribute indicators
709 attributes: u64,
710
711 /// Number of hard links
712 nlink: u32,
713
714 /// User ID of owner
715 uid: posix.uid_t,
716
717 /// Group ID of owner
718 gid: posix.gid_t,
719
720 /// File type and mode
721 mode: u16,
722 __pad1: u16,
723
724 /// Inode number
725 ino: u64,
726
727 /// Total size in bytes
728 size: u64,
729
730 /// Number of 512B blocks allocated
731 blocks: u64,
732
733 /// Mask to show what's supported in `attributes`.
734 attributes_mask: u64,
735
736 /// Last access file timestamp
737 atime: Timestamp,
738
739 /// Creation file timestamp
740 btime: Timestamp,
741
742 /// Last status change file timestamp
743 ctime: Timestamp,
744
745 /// Last modification file timestamp
746 mtime: Timestamp,
747
748 /// Major ID, if this file represents a device.
749 rdev_major: u32,
750
751 /// Minor ID, if this file represents a device.
752 rdev_minor: u32,
753
754 /// Major ID of the device containing the filesystem where this file resides.
755 dev_major: u32,
756
757 /// Minor ID of the device containing the filesystem where this file resides.
758 dev_minor: u32,
759
760 __pad2: [14]u64,
761
762 pub const Timestamp = extern struct {
763 sec: i64,
764 nsec: u32,
765 __pad: u32 = 0,
766 };
767
768 pub fn major(dev: u64) u32 {
769 return switch (@import("builtin").target.os.tag) {
770 .macos, .visionos, .tvos, .ios, .watchos => @intCast((dev >> 24) & 0xff),
771 .freebsd, .openbsd, .netbsd, .dragonfly => @intCast((dev >> 8) & 0xff),
772 else => @compileError("unsupported OS for major()"),
773 };
774 }
775
776 pub fn minor(dev: u64) u32 {
777 return switch (@import("builtin").target.os.tag) {
778 .macos, .ios, .visionos, .tvos, .watchos => @intCast(dev & 0xffffff),
779 .openbsd => @intCast(dev & 0xff),
780 .freebsd => @intCast((dev & 0xff) | ((dev >> 12) & 0xfff00)),
781 .dragonfly => @intCast((dev & 0xff) | ((dev >> 12) & 0xfff00)),
782 .netbsd => @intCast((dev & 0xff) | ((dev >> 12) & 0xfff00)),
783 else => @compileError("unsupported OS for minor()"),
784 };
785 }
786};
787
788test "runtime: noop" {
789 var rt: io.Ring = try .init(std.testing.allocator, 16);
790 defer rt.deinit();
791
792 var foo: Foo = .{};
793
794 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
795
796 // noop is triggered synchronously with submit. If we wait, we'll be waiting forever
797 _ = try rt.noop(ctx);
798 try rt.run(.once);
799 try std.testing.expectEqual(1, foo.bar);
800 _ = try rt.noop(ctx);
801 _ = try rt.noop(ctx);
802 try rt.run(.once);
803 try std.testing.expectEqual(3, foo.bar);
804}
805
806test "runtime: timer" {
807 var rt: io.Ring = try .init(std.testing.allocator, 16);
808 defer rt.deinit();
809
810 var foo: Foo = .{};
811
812 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
813
814 const start = std.time.nanoTimestamp();
815 const end = start + 100 * std.time.ns_per_ms;
816 _ = try rt.timer(.{ .nsec = 100 * std.time.ns_per_ms }, ctx);
817 try rt.run(.once);
818 try std.testing.expect(std.time.nanoTimestamp() > end);
819 try std.testing.expectEqual(1, foo.bar);
820}
821
822test "runtime: poll" {
823 var rt: io.Ring = try .init(std.testing.allocator, 16);
824 defer rt.deinit();
825
826 var foo: Foo = .{};
827 const pipe = try posix.pipe2(.{ .CLOEXEC = true });
828
829 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
830
831 _ = try rt.poll(pipe[0], posix.POLL.IN, ctx);
832 try std.testing.expectEqual(1, rt.submission_q.len());
833
834 _ = try posix.write(pipe[1], "io_uring is the best");
835 try rt.run(.once);
836 try std.testing.expectEqual(1, foo.bar);
837}
838
839test "runtime: deadline doesn't call user callback" {
840 const gpa = std.testing.allocator;
841 var rt = try io.Ring.init(gpa, 16);
842 defer rt.deinit();
843
844 var foo: Foo = .{};
845 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
846 const task = try rt.noop(ctx);
847 try task.setDeadline(&rt, .{ .sec = 1 });
848
849 try rt.run(.until_done);
850
851 // Callback only called once
852 try std.testing.expectEqual(1, foo.bar);
853}
854
855test "runtime: timeout" {
856 const gpa = std.testing.allocator;
857 var rt = try io.Ring.init(gpa, 16);
858 defer rt.deinit();
859
860 var foo: Foo = .{};
861 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
862
863 const delay = 1 * std.time.ns_per_ms;
864 _ = try rt.timer(.{ .nsec = delay }, ctx);
865
866 const start = std.time.nanoTimestamp();
867 try rt.run(.until_done);
868 try std.testing.expect(start + delay < std.time.nanoTimestamp());
869 try std.testing.expectEqual(1, foo.bar);
870}
871
872test "runtime: cancel" {
873 const gpa = std.testing.allocator;
874 var rt = try io.Ring.init(gpa, 16);
875 defer rt.deinit();
876
877 var foo: Foo = .{};
878 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
879
880 const delay = 1 * std.time.ns_per_s;
881 const task = try rt.timer(.{ .nsec = delay }, ctx);
882
883 try task.cancel(&rt, .{});
884
885 const start = std.time.nanoTimestamp();
886 try rt.run(.until_done);
887 // Expect that we didn't delay long enough
888 try std.testing.expect(start + delay > std.time.nanoTimestamp());
889 try std.testing.expectEqual(1, foo.bar);
890}
891
892test "runtime: cancel all" {
893 const gpa = std.testing.allocator;
894 var rt = try io.Ring.init(gpa, 16);
895 defer rt.deinit();
896
897 const Foo2 = struct {
898 bar: usize = 0,
899
900 fn callback(_: *io.Ring, task: io.Task) anyerror!void {
901 const self = task.userdataCast(@This());
902 const result = task.result.?;
903 _ = result.timer catch |err| {
904 switch (err) {
905 error.Canceled => self.bar += 1,
906 else => {},
907 }
908 };
909 }
910 };
911 var foo: Foo2 = .{};
912 const ctx: Context = .{ .ptr = &foo, .cb = Foo2.callback };
913
914 const delay = 1 * std.time.ns_per_s;
915 _ = try rt.timer(.{ .nsec = delay }, ctx);
916 _ = try rt.timer(.{ .nsec = delay }, ctx);
917 _ = try rt.timer(.{ .nsec = delay }, ctx);
918 _ = try rt.timer(.{ .nsec = delay }, ctx);
919
920 try rt.cancelAll();
921 const start = std.time.nanoTimestamp();
922 try rt.run(.until_done);
923 // Expect that we didn't delay long enough
924 try std.testing.expect(start + delay > std.time.nanoTimestamp());
925 try std.testing.expectEqual(4, foo.bar);
926}
927
928test "runtime: msgRing" {
929 const gpa = std.testing.allocator;
930 var rt1 = try io.Ring.init(gpa, 16);
931 defer rt1.deinit();
932
933 var rt2 = try rt1.initChild(16);
934 defer rt2.deinit();
935
936 const Foo2 = struct {
937 rt1: bool = false,
938 rt2: bool = false,
939
940 const Msg = enum { rt1, rt2 };
941
942 fn callback(_: *io.Ring, task: io.Task) anyerror!void {
943 const self = task.userdataCast(@This());
944 const msg = task.msgToEnum(Msg);
945 switch (msg) {
946 .rt1 => self.rt1 = true,
947 .rt2 => self.rt2 = true,
948 }
949 }
950 };
951
952 var foo: Foo2 = .{};
953
954 // The task we will send from rt1 to rt2
955 const target_task = try rt1.getTask();
956 target_task.* = .{
957 .userdata = &foo,
958 .callback = Foo2.callback,
959 .msg = @intFromEnum(Foo2.Msg.rt2),
960 .result = .{ .usermsg = 0 },
961 };
962
963 _ = try rt1.msgRing(
964 &rt2,
965 target_task,
966 .{ .cb = Foo2.callback, .msg = @intFromEnum(Foo2.Msg.rt1), .ptr = &foo },
967 );
968
969 try rt1.run(.until_done);
970 try std.testing.expect(foo.rt1);
971 try std.testing.expect(!foo.rt2);
972 try rt2.run(.until_done);
973 try std.testing.expect(foo.rt1);
974 try std.testing.expect(foo.rt2);
975}
976
977test "runtime: spawnThread" {
978 const gpa = std.testing.allocator;
979 var rt = try io.Ring.init(gpa, 16);
980 defer rt.deinit();
981
982 const thread = try rt.spawnThread(4);
983
984 const Foo2 = struct {
985 kill: bool = false,
986 did_work: bool = false,
987
988 gpa: Allocator,
989 thread: *Thread,
990
991 const Msg = enum { kill, work };
992
993 fn callback(_: *io.Ring, task: io.Task) anyerror!void {
994 const self = task.userdataCast(@This());
995 const msg = task.msgToEnum(Msg);
996 switch (msg) {
997 .kill => {
998 self.kill = true;
999 self.thread.join();
1000 self.gpa.destroy(self.thread);
1001 },
1002 .work => self.did_work = true,
1003 }
1004 }
1005 };
1006
1007 var foo: Foo2 = .{ .thread = thread, .gpa = gpa };
1008
1009 // Send work to the thread
1010 const target_task = try rt.getTask();
1011 target_task.* = .{
1012 .userdata = &foo,
1013 .callback = Foo2.callback,
1014 .msg = @intFromEnum(Foo2.Msg.work),
1015 .result = .{ .usermsg = 0 },
1016 };
1017
1018 _ = try rt.msgRing(&thread.ring, target_task, .{});
1019
1020 try rt.run(.until_done);
1021 _ = try thread.kill(&rt, .{
1022 .ptr = &foo,
1023 .cb = Foo2.callback,
1024 .msg = @intFromEnum(Foo2.Msg.kill),
1025 });
1026 try rt.run(.until_done);
1027
1028 try std.testing.expect(foo.did_work);
1029 try std.testing.expect(foo.kill);
1030}
1031
1032test "runtime: stat" {
1033 const gpa = std.testing.allocator;
1034 var rt = try io.Ring.init(gpa, 16);
1035 defer rt.deinit();
1036
1037 var foo: Foo = .{};
1038 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback };
1039
1040 var statx: Statx = undefined;
1041 const task = try rt.stat("build.zig", &statx, ctx);
1042
1043 try rt.run(.until_done);
1044 try std.testing.expect(task.result != null);
1045}