An asynchronous IO runtime
at main 27 kB view raw
1const std = @import("std"); 2const builtin = @import("builtin"); 3 4const Allocator = std.mem.Allocator; 5pub const Mock = @import("ourio/Mock.zig"); 6const Queue = @import("ourio/queue.zig").Intrusive; 7const io = @This(); 8const posix = std.posix; 9 10pub const has_kqueue = switch (builtin.os.tag) { 11 .dragonfly, 12 .freebsd, 13 .ios, 14 .macos, 15 .netbsd, 16 .openbsd, 17 .tvos, 18 .visionos, 19 .watchos, 20 => true, 21 22 else => false, 23}; 24pub const has_io_uring = builtin.os.tag == .linux; 25 26pub const Task = @import("ourio/Task.zig"); 27pub const Callback = *const fn (*Ring, Task) anyerror!void; 28pub fn noopCallback(_: *Ring, _: Task) anyerror!void {} 29 30pub const RunCondition = enum { 31 once, 32 until_done, 33 forever, 34}; 35 36pub const Context = struct { 37 ptr: ?*anyopaque = null, 38 msg: u16 = 0, 39 cb: Callback = noopCallback, 40}; 41 42/// Used for timeouts and deadlines. We make this struct extern because we will ptrCast it to the 43/// linux kernel timespec struct 44pub const Timespec = extern struct { 45 sec: i64 = 0, 46 nsec: i64 = 0, 47 48 pub fn isZero(self: Timespec) bool { 49 return self.sec == 0 and self.nsec == 0; 50 } 51}; 52 53pub const Backend = union(enum) { 54 mock: Mock, 55 56 platform: switch (builtin.os.tag) { 57 .dragonfly, 58 .freebsd, 59 .ios, 60 .macos, 61 .netbsd, 62 .openbsd, 63 .tvos, 64 .visionos, 65 .watchos, 66 => @import("ourio/Kqueue.zig"), 67 68 .linux => @import("ourio/Uring.zig"), 69 70 else => @compileError("unsupported os"), 71 }, 72 73 pub fn initChild(self: *Backend, entries: u16) !Backend { 74 switch (self.*) { 75 .mock => return .{ .mock = .{} }, 76 .platform => |*p| return .{ .platform = try p.initChild(entries) }, 77 } 78 } 79 80 pub fn deinit(self: *Backend, gpa: Allocator) void { 81 switch (self.*) { 82 inline else => |*backend| backend.deinit(gpa), 83 } 84 } 85 86 pub fn pollableFd(self: *Backend) !posix.fd_t { 87 return switch (self.*) { 88 inline else => |*backend| backend.pollableFd(), 89 }; 90 } 91 92 pub fn submitAndWait(self: *Backend, queue: *SubmissionQueue) !void { 93 return switch (self.*) { 94 inline else => |*backend| backend.submitAndWait(queue), 95 }; 96 } 97 98 pub fn submit(self: *Backend, queue: *SubmissionQueue) !void { 99 return switch (self.*) { 100 inline else => |*backend| backend.submit(queue), 101 }; 102 } 103 104 pub fn reapCompletions( 105 self: *Backend, 106 rt: *Ring, 107 ) !void { 108 return switch (self.*) { 109 inline else => |*backend| backend.reapCompletions(rt), 110 }; 111 } 112 113 pub fn done(self: *Backend) bool { 114 return switch (self.*) { 115 inline else => |*backend| backend.done(), 116 }; 117 } 118}; 119 120pub const CompletionQueue = Queue(Task, .complete); 121pub const FreeQueue = Queue(Task, .free); 122pub const SubmissionQueue = Queue(Task, .in_flight); 123 124pub const Ring = struct { 125 backend: Backend, 126 gpa: Allocator, 127 128 completion_q: CompletionQueue = .{}, 129 submission_q: SubmissionQueue = .{}, 130 free_q: FreeQueue = .{}, 131 132 pub fn init(gpa: Allocator, entries: u16) !Ring { 133 return .{ 134 .backend = .{ .platform = try .init(gpa, entries) }, 135 .gpa = gpa, 136 }; 137 } 138 139 pub fn initChild(self: *Ring, entries: u16) !Ring { 140 return .{ 141 .backend = try self.backend.initChild(entries), 142 .gpa = self.gpa, 143 }; 144 } 145 146 pub fn initMock(gpa: Allocator, entries: u16) !Ring { 147 return .{ 148 .backend = .{ .mock = try .init(entries) }, 149 .gpa = gpa, 150 .free_q = .{}, 151 .submission_q = .{}, 152 .completion_q = .{}, 153 }; 154 } 155 156 pub fn deinit(self: *Ring) void { 157 self.backend.deinit(self.gpa); 158 while (self.free_q.pop()) |task| self.gpa.destroy(task); 159 while (self.submission_q.pop()) |task| self.gpa.destroy(task); 160 while (self.completion_q.pop()) |task| self.gpa.destroy(task); 161 } 162 163 pub fn run(self: *Ring, condition: RunCondition) !void { 164 while (true) { 165 try self.backend.submitAndWait(&self.submission_q); 166 try self.backend.reapCompletions(self); 167 switch (condition) { 168 .once => return, 169 .until_done => if (self.backend.done() and self.submission_q.empty()) return, 170 .forever => {}, 171 } 172 } 173 } 174 175 pub fn getTask(self: *Ring) Allocator.Error!*Task { 176 return self.free_q.pop() orelse try self.gpa.create(Task); 177 } 178 179 pub fn noop( 180 self: *Ring, 181 ctx: Context, 182 ) Allocator.Error!*Task { 183 const task = try self.getTask(); 184 task.* = .{ 185 .userdata = ctx.ptr, 186 .msg = ctx.msg, 187 .callback = ctx.cb, 188 .req = .noop, 189 }; 190 191 self.submission_q.push(task); 192 return task; 193 } 194 195 pub fn timer( 196 self: *Ring, 197 duration: Timespec, 198 ctx: Context, 199 ) Allocator.Error!*Task { 200 const task = try self.getTask(); 201 task.* = .{ 202 .userdata = ctx.ptr, 203 .msg = ctx.msg, 204 .callback = ctx.cb, 205 .req = .{ .timer = duration }, 206 }; 207 208 self.submission_q.push(task); 209 return task; 210 } 211 212 pub fn cancelAll(self: *Ring) Allocator.Error!void { 213 const task = try self.getTask(); 214 task.* = .{ 215 .req = .{ .cancel = .all }, 216 }; 217 218 self.submission_q.push(task); 219 } 220 221 pub fn accept( 222 self: *Ring, 223 fd: posix.fd_t, 224 ctx: Context, 225 ) Allocator.Error!*Task { 226 const task = try self.getTask(); 227 task.* = .{ 228 .userdata = ctx.ptr, 229 .msg = ctx.msg, 230 .callback = ctx.cb, 231 .req = .{ .accept = fd }, 232 }; 233 234 self.submission_q.push(task); 235 return task; 236 } 237 238 pub fn msgRing( 239 self: *Ring, 240 target: *Ring, 241 target_task: *Task, // The task that the target ring will receive. The callbacks of 242 // this task are what will be called when the target receives the message 243 244 ctx: Context, 245 ) Allocator.Error!*Task { 246 // This is the task to send the message 247 const task = try self.getTask(); 248 task.* = .{ 249 .userdata = ctx.ptr, 250 .msg = ctx.msg, 251 .callback = ctx.cb, 252 .req = .{ .msg_ring = .{ 253 .target = target, 254 .task = target_task, 255 } }, 256 }; 257 target_task.state = .in_flight; 258 self.submission_q.push(task); 259 return task; 260 } 261 262 pub fn recv( 263 self: *Ring, 264 fd: posix.fd_t, 265 buffer: []u8, 266 ctx: Context, 267 ) Allocator.Error!*Task { 268 const task = try self.getTask(); 269 task.* = .{ 270 .userdata = ctx.ptr, 271 .msg = ctx.msg, 272 .callback = ctx.cb, 273 .req = .{ .recv = .{ 274 .fd = fd, 275 .buffer = buffer, 276 } }, 277 }; 278 279 self.submission_q.push(task); 280 return task; 281 } 282 283 pub fn write( 284 self: *Ring, 285 fd: posix.fd_t, 286 buffer: []const u8, 287 ctx: Context, 288 ) Allocator.Error!*Task { 289 const task = try self.getTask(); 290 task.* = .{ 291 .userdata = ctx.ptr, 292 .msg = ctx.msg, 293 .callback = ctx.cb, 294 .req = .{ .write = .{ 295 .fd = fd, 296 .buffer = buffer, 297 } }, 298 }; 299 300 self.submission_q.push(task); 301 return task; 302 } 303 304 pub fn writev( 305 self: *Ring, 306 fd: posix.fd_t, 307 vecs: []const posix.iovec_const, 308 ctx: Context, 309 ) Allocator.Error!*Task { 310 const task = try self.getTask(); 311 task.* = .{ 312 .userdata = ctx.ptr, 313 .msg = ctx.msg, 314 .callback = ctx.cb, 315 .req = .{ .writev = .{ 316 .fd = fd, 317 .vecs = vecs, 318 } }, 319 }; 320 321 self.submission_q.push(task); 322 return task; 323 } 324 325 pub fn close( 326 self: *Ring, 327 fd: posix.fd_t, 328 ctx: Context, 329 ) Allocator.Error!*Task { 330 const task = try self.getTask(); 331 task.* = .{ 332 .userdata = ctx.ptr, 333 .msg = ctx.msg, 334 .callback = ctx.cb, 335 .req = .{ .close = fd }, 336 }; 337 338 self.submission_q.push(task); 339 return task; 340 } 341 342 pub fn poll( 343 self: *Ring, 344 fd: posix.fd_t, 345 mask: u32, 346 ctx: Context, 347 ) Allocator.Error!*Task { 348 const task = try self.getTask(); 349 task.* = .{ 350 .userdata = ctx.ptr, 351 .msg = ctx.msg, 352 .callback = ctx.cb, 353 .req = .{ .poll = .{ .fd = fd, .mask = mask } }, 354 }; 355 356 self.submission_q.push(task); 357 return task; 358 } 359 360 pub fn socket( 361 self: *Ring, 362 domain: u32, 363 socket_type: u32, 364 protocol: u32, 365 ctx: Context, 366 ) Allocator.Error!*Task { 367 const task = try self.getTask(); 368 task.* = .{ 369 .userdata = ctx.ptr, 370 .msg = ctx.msg, 371 .callback = ctx.cb, 372 .req = .{ .socket = .{ .domain = domain, .type = socket_type, .protocol = protocol } }, 373 }; 374 375 self.submission_q.push(task); 376 return task; 377 } 378 379 pub fn connect( 380 self: *Ring, 381 fd: posix.socket_t, 382 addr: *posix.sockaddr, 383 addr_len: posix.socklen_t, 384 ctx: Context, 385 ) Allocator.Error!*Task { 386 const task = try self.getTask(); 387 task.* = .{ 388 .userdata = ctx.ptr, 389 .msg = ctx.msg, 390 .callback = ctx.cb, 391 .req = .{ .connect = .{ .fd = fd, .addr = addr, .addr_len = addr_len } }, 392 }; 393 394 self.submission_q.push(task); 395 return task; 396 } 397 398 pub fn stat( 399 self: *Ring, 400 path: [:0]const u8, 401 result: *Statx, 402 ctx: Context, 403 ) Allocator.Error!*Task { 404 const task = try self.getTask(); 405 task.* = .{ 406 .userdata = ctx.ptr, 407 .msg = ctx.msg, 408 .callback = ctx.cb, 409 .req = .{ .statx = .{ .path = path, .result = result } }, 410 }; 411 412 self.submission_q.push(task); 413 return task; 414 } 415 416 pub fn readv( 417 self: *Ring, 418 fd: posix.fd_t, 419 vecs: []const posix.iovec, 420 ctx: Context, 421 ) Allocator.Error!*Task { 422 const task = try self.getTask(); 423 task.* = .{ 424 .userdata = ctx.ptr, 425 .msg = ctx.msg, 426 .callback = ctx.cb, 427 .req = .{ .readv = .{ 428 .fd = fd, 429 .vecs = vecs, 430 } }, 431 }; 432 433 self.submission_q.push(task); 434 return task; 435 } 436 437 pub fn read( 438 self: *Ring, 439 fd: posix.fd_t, 440 buffer: []u8, 441 ctx: Context, 442 ) Allocator.Error!*Task { 443 const task = try self.getTask(); 444 task.* = .{ 445 .userdata = ctx.ptr, 446 .msg = ctx.msg, 447 .callback = ctx.cb, 448 .req = .{ .read = .{ 449 .fd = fd, 450 .buffer = buffer, 451 } }, 452 }; 453 454 self.submission_q.push(task); 455 return task; 456 } 457 458 pub fn open( 459 self: *Ring, 460 path: [:0]const u8, 461 flags: posix.O, 462 mode: posix.mode_t, 463 ctx: Context, 464 ) Allocator.Error!*Task { 465 const task = try self.getTask(); 466 task.* = .{ 467 .userdata = ctx.ptr, 468 .msg = ctx.msg, 469 .callback = ctx.cb, 470 .req = .{ .open = .{ .path = path, .flags = flags, .mode = mode } }, 471 }; 472 473 self.submission_q.push(task); 474 return task; 475 } 476 477 /// Spawns a thread with a Ring instance. The thread will be idle and waiting to receive work 478 /// via msgRing when this function returns. Call kill on the returned thread to signal it to 479 /// shutdown. 480 pub fn spawnThread(self: *Ring, entries: u16) !*Thread { 481 const thread = try self.gpa.create(Thread); 482 errdefer self.gpa.destroy(thread); 483 484 var wg: std.Thread.WaitGroup = .{}; 485 wg.start(); 486 thread.thread = try std.Thread.spawn(.{}, Thread.run, .{ thread, self, &wg, entries }); 487 wg.wait(); 488 489 return thread; 490 } 491}; 492 493pub const Thread = struct { 494 thread: std.Thread, 495 ring: io.Ring = undefined, 496 497 pub const Msg = enum { 498 kill, 499 }; 500 501 pub fn run(self: *Thread, parent: *io.Ring, wg: *std.Thread.WaitGroup, entries: u16) !void { 502 self.ring = try parent.initChild(entries); 503 wg.finish(); 504 505 defer self.ring.deinit(); 506 507 // Run forever, because we may not start with a task. Inter-thread messaging means we could 508 // receive work at any time 509 self.ring.run(.forever) catch |err| { 510 switch (err) { 511 error.ThreadKilled => return, 512 else => return err, 513 } 514 }; 515 } 516 517 /// Kill sends a message to the thread telling it to exit. Callers of this thread can safely 518 /// join and deinit the Thread in the Context callback 519 pub fn kill(self: *Thread, rt: *io.Ring, ctx: Context) Allocator.Error!*io.Task { 520 const target_task = try rt.getTask(); 521 target_task.* = .{ 522 .userdata = self, 523 .msg = @intFromEnum(Thread.Msg.kill), 524 .callback = Thread.onCompletion, 525 .result = .noop, 526 }; 527 528 return rt.msgRing(&self.ring, target_task, ctx); 529 } 530 531 pub fn join(self: Thread) void { 532 self.thread.join(); 533 } 534 535 fn onCompletion(_: *io.Ring, task: Task) anyerror!void { 536 switch (task.msgToEnum(Thread.Msg)) { 537 .kill => return error.ThreadKilled, 538 } 539 } 540}; 541 542pub const Op = enum { 543 noop, 544 deadline, 545 timer, 546 cancel, 547 accept, 548 msg_ring, 549 recv, 550 write, 551 writev, 552 close, 553 poll, 554 socket, 555 connect, 556 statx, 557 readv, 558 open, 559 read, 560 561 /// userbytes is meant to send slices of bytes between Ring instances or callbacks 562 userbytes, 563 /// userfd is meant to send file descriptors between Ring instances (using msgRing) 564 userfd, 565 /// usermsg is meant to send a u16 between runtime instances (using msgRing) 566 usermsg, 567 /// userptr is meant to send a pointer between runtime instances (using msgRing) 568 userptr, 569}; 570 571pub const Request = union(Op) { 572 noop, 573 deadline: Timespec, 574 timer: Timespec, 575 cancel: union(enum) { 576 all, 577 task: *Task, 578 }, 579 accept: posix.fd_t, 580 msg_ring: struct { 581 target: *Ring, 582 task: *Task, 583 }, 584 recv: struct { 585 fd: posix.fd_t, 586 buffer: []u8, 587 }, 588 write: struct { 589 fd: posix.fd_t, 590 buffer: []const u8, 591 }, 592 writev: struct { 593 fd: posix.fd_t, 594 vecs: []const posix.iovec_const, 595 }, 596 close: posix.fd_t, 597 poll: struct { 598 fd: posix.fd_t, 599 mask: u32, 600 }, 601 socket: struct { 602 domain: u32, 603 type: u32, 604 protocol: u32, 605 }, 606 connect: struct { 607 fd: posix.socket_t, 608 addr: *posix.sockaddr, 609 addr_len: posix.socklen_t, 610 }, 611 statx: struct { 612 path: [:0]const u8, 613 result: *Statx, // this will be filled in by the op 614 }, 615 readv: struct { 616 fd: posix.fd_t, 617 vecs: []const posix.iovec, 618 }, 619 open: struct { 620 path: [:0]const u8, 621 flags: posix.O, 622 mode: posix.mode_t, 623 }, 624 read: struct { 625 fd: posix.fd_t, 626 buffer: []u8, 627 }, 628 629 userbytes, 630 userfd, 631 usermsg, 632 userptr, 633}; 634 635pub const Result = union(Op) { 636 noop, 637 deadline: ResultError!void, 638 timer: ResultError!void, 639 cancel: CancelError!void, 640 accept: ResultError!posix.fd_t, 641 msg_ring: ResultError!void, 642 recv: RecvError!usize, 643 write: ResultError!usize, 644 writev: ResultError!usize, 645 close: ResultError!void, 646 poll: ResultError!void, 647 socket: ResultError!posix.fd_t, 648 connect: ResultError!void, 649 statx: ResultError!*Statx, 650 readv: ResultError!usize, 651 open: ResultError!posix.fd_t, 652 read: ResultError!usize, 653 654 userbytes: anyerror![]const u8, 655 userfd: anyerror!posix.fd_t, 656 usermsg: u16, 657 userptr: anyerror!?*anyopaque, 658}; 659 660pub const ResultError = error{ 661 /// The request was invalid 662 Invalid, 663 /// The request was canceled 664 Canceled, 665 /// An unexpected error occured 666 Unexpected, 667}; 668 669pub const CancelError = ResultError || error{ 670 /// The entry to cancel couldn't be found 671 EntryNotFound, 672 /// The entry couldn't be canceled 673 NotCanceled, 674}; 675 676pub const RecvError = ResultError || error{ 677 /// The entry to cancel couldn't be found 678 ConnectionResetByPeer, 679}; 680 681test { 682 _ = @import("ourio/Mock.zig"); 683 _ = @import("ourio/queue.zig"); 684 685 if (has_io_uring) _ = @import("ourio/Uring.zig"); 686 if (has_kqueue) _ = @import("ourio/Kqueue.zig"); 687} 688 689/// Foo is only for testing 690const Foo = struct { 691 bar: usize = 0, 692 693 fn callback(_: *io.Ring, task: io.Task) anyerror!void { 694 const self = task.userdataCast(Foo); 695 self.bar += 1; 696 } 697}; 698 699/// Follows the ABI of linux statx. Not all platforms will contain all information, or may contain 700/// more information than requested 701pub const Statx = extern struct { 702 /// Mask of bits indicating filled fields 703 mask: u32, 704 705 /// Block size for filesystem I/O 706 blksize: u32, 707 708 /// Extra file attribute indicators 709 attributes: u64, 710 711 /// Number of hard links 712 nlink: u32, 713 714 /// User ID of owner 715 uid: posix.uid_t, 716 717 /// Group ID of owner 718 gid: posix.gid_t, 719 720 /// File type and mode 721 mode: u16, 722 __pad1: u16, 723 724 /// Inode number 725 ino: u64, 726 727 /// Total size in bytes 728 size: u64, 729 730 /// Number of 512B blocks allocated 731 blocks: u64, 732 733 /// Mask to show what's supported in `attributes`. 734 attributes_mask: u64, 735 736 /// Last access file timestamp 737 atime: Timestamp, 738 739 /// Creation file timestamp 740 btime: Timestamp, 741 742 /// Last status change file timestamp 743 ctime: Timestamp, 744 745 /// Last modification file timestamp 746 mtime: Timestamp, 747 748 /// Major ID, if this file represents a device. 749 rdev_major: u32, 750 751 /// Minor ID, if this file represents a device. 752 rdev_minor: u32, 753 754 /// Major ID of the device containing the filesystem where this file resides. 755 dev_major: u32, 756 757 /// Minor ID of the device containing the filesystem where this file resides. 758 dev_minor: u32, 759 760 __pad2: [14]u64, 761 762 pub const Timestamp = extern struct { 763 sec: i64, 764 nsec: u32, 765 __pad: u32 = 0, 766 }; 767 768 pub fn major(dev: u64) u32 { 769 return switch (@import("builtin").target.os.tag) { 770 .macos, .visionos, .tvos, .ios, .watchos => @intCast((dev >> 24) & 0xff), 771 .freebsd, .openbsd, .netbsd, .dragonfly => @intCast((dev >> 8) & 0xff), 772 else => @compileError("unsupported OS for major()"), 773 }; 774 } 775 776 pub fn minor(dev: u64) u32 { 777 return switch (@import("builtin").target.os.tag) { 778 .macos, .ios, .visionos, .tvos, .watchos => @intCast(dev & 0xffffff), 779 .openbsd => @intCast(dev & 0xff), 780 .freebsd => @intCast((dev & 0xff) | ((dev >> 12) & 0xfff00)), 781 .dragonfly => @intCast((dev & 0xff) | ((dev >> 12) & 0xfff00)), 782 .netbsd => @intCast((dev & 0xff) | ((dev >> 12) & 0xfff00)), 783 else => @compileError("unsupported OS for minor()"), 784 }; 785 } 786}; 787 788test "runtime: noop" { 789 var rt: io.Ring = try .init(std.testing.allocator, 16); 790 defer rt.deinit(); 791 792 var foo: Foo = .{}; 793 794 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 795 796 // noop is triggered synchronously with submit. If we wait, we'll be waiting forever 797 _ = try rt.noop(ctx); 798 try rt.run(.once); 799 try std.testing.expectEqual(1, foo.bar); 800 _ = try rt.noop(ctx); 801 _ = try rt.noop(ctx); 802 try rt.run(.once); 803 try std.testing.expectEqual(3, foo.bar); 804} 805 806test "runtime: timer" { 807 var rt: io.Ring = try .init(std.testing.allocator, 16); 808 defer rt.deinit(); 809 810 var foo: Foo = .{}; 811 812 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 813 814 const start = std.time.nanoTimestamp(); 815 const end = start + 100 * std.time.ns_per_ms; 816 _ = try rt.timer(.{ .nsec = 100 * std.time.ns_per_ms }, ctx); 817 try rt.run(.once); 818 try std.testing.expect(std.time.nanoTimestamp() > end); 819 try std.testing.expectEqual(1, foo.bar); 820} 821 822test "runtime: poll" { 823 var rt: io.Ring = try .init(std.testing.allocator, 16); 824 defer rt.deinit(); 825 826 var foo: Foo = .{}; 827 const pipe = try posix.pipe2(.{ .CLOEXEC = true }); 828 829 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 830 831 _ = try rt.poll(pipe[0], posix.POLL.IN, ctx); 832 try std.testing.expectEqual(1, rt.submission_q.len()); 833 834 _ = try posix.write(pipe[1], "io_uring is the best"); 835 try rt.run(.once); 836 try std.testing.expectEqual(1, foo.bar); 837} 838 839test "runtime: deadline doesn't call user callback" { 840 const gpa = std.testing.allocator; 841 var rt = try io.Ring.init(gpa, 16); 842 defer rt.deinit(); 843 844 var foo: Foo = .{}; 845 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 846 const task = try rt.noop(ctx); 847 try task.setDeadline(&rt, .{ .sec = 1 }); 848 849 try rt.run(.until_done); 850 851 // Callback only called once 852 try std.testing.expectEqual(1, foo.bar); 853} 854 855test "runtime: timeout" { 856 const gpa = std.testing.allocator; 857 var rt = try io.Ring.init(gpa, 16); 858 defer rt.deinit(); 859 860 var foo: Foo = .{}; 861 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 862 863 const delay = 1 * std.time.ns_per_ms; 864 _ = try rt.timer(.{ .nsec = delay }, ctx); 865 866 const start = std.time.nanoTimestamp(); 867 try rt.run(.until_done); 868 try std.testing.expect(start + delay < std.time.nanoTimestamp()); 869 try std.testing.expectEqual(1, foo.bar); 870} 871 872test "runtime: cancel" { 873 const gpa = std.testing.allocator; 874 var rt = try io.Ring.init(gpa, 16); 875 defer rt.deinit(); 876 877 var foo: Foo = .{}; 878 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 879 880 const delay = 1 * std.time.ns_per_s; 881 const task = try rt.timer(.{ .nsec = delay }, ctx); 882 883 try task.cancel(&rt, .{}); 884 885 const start = std.time.nanoTimestamp(); 886 try rt.run(.until_done); 887 // Expect that we didn't delay long enough 888 try std.testing.expect(start + delay > std.time.nanoTimestamp()); 889 try std.testing.expectEqual(1, foo.bar); 890} 891 892test "runtime: cancel all" { 893 const gpa = std.testing.allocator; 894 var rt = try io.Ring.init(gpa, 16); 895 defer rt.deinit(); 896 897 const Foo2 = struct { 898 bar: usize = 0, 899 900 fn callback(_: *io.Ring, task: io.Task) anyerror!void { 901 const self = task.userdataCast(@This()); 902 const result = task.result.?; 903 _ = result.timer catch |err| { 904 switch (err) { 905 error.Canceled => self.bar += 1, 906 else => {}, 907 } 908 }; 909 } 910 }; 911 var foo: Foo2 = .{}; 912 const ctx: Context = .{ .ptr = &foo, .cb = Foo2.callback }; 913 914 const delay = 1 * std.time.ns_per_s; 915 _ = try rt.timer(.{ .nsec = delay }, ctx); 916 _ = try rt.timer(.{ .nsec = delay }, ctx); 917 _ = try rt.timer(.{ .nsec = delay }, ctx); 918 _ = try rt.timer(.{ .nsec = delay }, ctx); 919 920 try rt.cancelAll(); 921 const start = std.time.nanoTimestamp(); 922 try rt.run(.until_done); 923 // Expect that we didn't delay long enough 924 try std.testing.expect(start + delay > std.time.nanoTimestamp()); 925 try std.testing.expectEqual(4, foo.bar); 926} 927 928test "runtime: msgRing" { 929 const gpa = std.testing.allocator; 930 var rt1 = try io.Ring.init(gpa, 16); 931 defer rt1.deinit(); 932 933 var rt2 = try rt1.initChild(16); 934 defer rt2.deinit(); 935 936 const Foo2 = struct { 937 rt1: bool = false, 938 rt2: bool = false, 939 940 const Msg = enum { rt1, rt2 }; 941 942 fn callback(_: *io.Ring, task: io.Task) anyerror!void { 943 const self = task.userdataCast(@This()); 944 const msg = task.msgToEnum(Msg); 945 switch (msg) { 946 .rt1 => self.rt1 = true, 947 .rt2 => self.rt2 = true, 948 } 949 } 950 }; 951 952 var foo: Foo2 = .{}; 953 954 // The task we will send from rt1 to rt2 955 const target_task = try rt1.getTask(); 956 target_task.* = .{ 957 .userdata = &foo, 958 .callback = Foo2.callback, 959 .msg = @intFromEnum(Foo2.Msg.rt2), 960 .result = .{ .usermsg = 0 }, 961 }; 962 963 _ = try rt1.msgRing( 964 &rt2, 965 target_task, 966 .{ .cb = Foo2.callback, .msg = @intFromEnum(Foo2.Msg.rt1), .ptr = &foo }, 967 ); 968 969 try rt1.run(.until_done); 970 try std.testing.expect(foo.rt1); 971 try std.testing.expect(!foo.rt2); 972 try rt2.run(.until_done); 973 try std.testing.expect(foo.rt1); 974 try std.testing.expect(foo.rt2); 975} 976 977test "runtime: spawnThread" { 978 const gpa = std.testing.allocator; 979 var rt = try io.Ring.init(gpa, 16); 980 defer rt.deinit(); 981 982 const thread = try rt.spawnThread(4); 983 984 const Foo2 = struct { 985 kill: bool = false, 986 did_work: bool = false, 987 988 gpa: Allocator, 989 thread: *Thread, 990 991 const Msg = enum { kill, work }; 992 993 fn callback(_: *io.Ring, task: io.Task) anyerror!void { 994 const self = task.userdataCast(@This()); 995 const msg = task.msgToEnum(Msg); 996 switch (msg) { 997 .kill => { 998 self.kill = true; 999 self.thread.join(); 1000 self.gpa.destroy(self.thread); 1001 }, 1002 .work => self.did_work = true, 1003 } 1004 } 1005 }; 1006 1007 var foo: Foo2 = .{ .thread = thread, .gpa = gpa }; 1008 1009 // Send work to the thread 1010 const target_task = try rt.getTask(); 1011 target_task.* = .{ 1012 .userdata = &foo, 1013 .callback = Foo2.callback, 1014 .msg = @intFromEnum(Foo2.Msg.work), 1015 .result = .{ .usermsg = 0 }, 1016 }; 1017 1018 _ = try rt.msgRing(&thread.ring, target_task, .{}); 1019 1020 try rt.run(.until_done); 1021 _ = try thread.kill(&rt, .{ 1022 .ptr = &foo, 1023 .cb = Foo2.callback, 1024 .msg = @intFromEnum(Foo2.Msg.kill), 1025 }); 1026 try rt.run(.until_done); 1027 1028 try std.testing.expect(foo.did_work); 1029 try std.testing.expect(foo.kill); 1030} 1031 1032test "runtime: stat" { 1033 const gpa = std.testing.allocator; 1034 var rt = try io.Ring.init(gpa, 16); 1035 defer rt.deinit(); 1036 1037 var foo: Foo = .{}; 1038 const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 1039 1040 var statx: Statx = undefined; 1041 const task = try rt.stat("build.zig", &statx, ctx); 1042 1043 try rt.run(.until_done); 1044 try std.testing.expect(task.result != null); 1045}