An asynchronous IO runtime

initial commit

pulled code out of horizon

+2
.gitignore
··· 1 + .zig-cache/ 2 + zig-out/
+18
LICENSE
··· 1 + Copyright 2025 Tim Culverhouse 2 + 3 + Permission is hereby granted, free of charge, to any person obtaining a copy of 4 + this software and associated documentation files (the “Software”), to deal in 5 + the Software without restriction, including without limitation the rights to 6 + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of 7 + the Software, and to permit persons to whom the Software is furnished to do so, 8 + subject to the following conditions: 9 + 10 + The above copyright notice and this permission notice shall be included in all 11 + copies or substantial portions of the Software. 12 + 13 + THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS 15 + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR 16 + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER 17 + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 18 + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+167
README.md
··· 1 + # Ourio 2 + 3 + Ourio (prounounced "oreo", think "Ouroboros") is an asynchronous IO runtime 4 + built heavily around the semantics of io_uring. The design is inspired by 5 + [libxev](https://github.com/mitchellh/libxev), which is in turn inspired by 6 + [TigerBeetle](https://github.com/tigerbeetle/tigerbeetle). 7 + 8 + Ourio has only a slightly different approach: it is designed to encourage 9 + message passing approach to asynchronous IO. Users of the library give each task 10 + a Context, which contains a pointer, a callback, *and a message*. The message is 11 + implemented as a u16, and generally you should use an enum for it. The idea is 12 + that you can minimize the number of callback functions required by tagging tasks 13 + with a small amount of semantic meaning in the `msg` field. 14 + 15 + Ourio has io_uring and kqueue backends. Ourio supports the `msg_ring` 16 + capability of io_uring to pass a completion from one ring to another. This 17 + allows a multithreaded application to implement message passing using io_uring 18 + (or kqueue, if that's your flavor). Multithreaded applications should plan to 19 + use one `Runtime` per thread. Submission onto the runtime is not thread safe, 20 + any message passing must occur using `msg_ring` rather than directly submitting 21 + a task to another 22 + 23 + Ourio also includes a fully mockable IO runtime to make it easy to unit test 24 + your async code. 25 + 26 + ## Tasks 27 + 28 + ### Deadlines and Cancelation 29 + 30 + Each IO operation creates a `Task`. When scheduling a task on the runtime, the 31 + caller receives a pointer to the `Task` at which point they may cancel it, or 32 + set a deadline: 33 + 34 + ```zig 35 + // Timers are always relative time 36 + const task = try rt.timer(.{.sec = 3}, .{.cb = onCompletion, .msg = 0}); 37 + 38 + // If the deadline expired, the task will be sent to the onCompletion callback 39 + // with a result of error.Canceled. Deadlines are always absolute time 40 + try task.setDeadline(rt, .{.sec = std.time.timestamp() + 3}); 41 + 42 + // Alternatively, we can hold on to the pointer for the task while it is with 43 + // the runtime and cancel it. The Context we give to the cancel function let's 44 + // us know the result of the cancelation, but we will also receive a message 45 + // from the original task with error.Canceled. We can ignore the cancel result 46 + // by using the default context value 47 + try task.cancel(rt, .{}); 48 + ``` 49 + 50 + ### Passing tasks between threads 51 + 52 + Say we `accept` a connection in one thread, and want to send the file descriptor 53 + to another for handling. 54 + 55 + ```zig 56 + const target_task = try main_rt.getTask(); 57 + target_task.* { 58 + .userdata = &foo, 59 + .msg = @intFromEnum(Msg.some_message), 60 + .cb = Worker.onCompletion, 61 + .req = .{ .userfd = fd }, 62 + }; 63 + 64 + // Send target_task from the main_rt thread to the thread_rt Runtime. The 65 + // thread_rt Runtime will then // process the task as a completion, ie 66 + // Worker.onCompletion will be called with // this task. That thread can then 67 + // schedule a recv, a write, etc on the file // descriptor it just received. 68 + _ = try main_rt.msgRing(thread_rt, target_task, .{}); 69 + ``` 70 + 71 + ### Multiple Runtimes on the same thread 72 + 73 + You can have multiple Runtimes in a single thread. One could be a priority 74 + Runtime, or handle specific types of tasks, etc. Poll any runtime from any other 75 + runtime. 76 + 77 + ```zig 78 + const fd = rt1.backend.pollableFd(); 79 + _ = try rt2.poll(fd, .{ 80 + .cb = onCompletion, 81 + .msg = @intFromEnum(Msg.rt1_has_completions)} 82 + ); 83 + ``` 84 + 85 + ## Example 86 + 87 + An example implementation of an asynchronous writer to two file descriptors: 88 + 89 + ```zig 90 + const std = @import("std"); 91 + const io = @import("ourio"); 92 + const posix = std.posix; 93 + 94 + pub const MultiWriter = struct { 95 + fd1: posix.fd_t, 96 + fd1_written: usize = 0, 97 + 98 + fd2: posix.fd_2, 99 + fd2_written: usize = 0, 100 + 101 + buf: std.ArrayListUnmanaged(u8), 102 + 103 + pub const Msg = enum { fd1, fd2 }; 104 + 105 + pub fn init(fd1: posix.fd_t, fd2: posix.fd_t) MultiWriter { 106 + return .{ .fd1 = fd1, .fd2 = fd2 }; 107 + } 108 + 109 + pub fn write(self: *MultiWriter, gpa: Allocator, bytes: []const u8) !void { 110 + try self.buf.appendSlice(gpa, bytes); 111 + } 112 + 113 + pub fn flush(self: *MultiWriter, rt: *io.Runtime) !void { 114 + if (self.fd1_written < self.buf.items.len) { 115 + _ = try rt.write(self.fd1, self.buf.items[self.fd1_written..], .{ 116 + .ptr = self, 117 + .msg = @intFromEnum(Msg.fd1), 118 + .cb = MultiWriter.onCompletion, 119 + }); 120 + } 121 + 122 + if (self.fd2_written < self.buf.items.len) { 123 + _ = try rt.write(self.fd2, self.buf.items[self.fd2_written], .{ 124 + .ptr = self, 125 + .msg = @intFromEnum(Msg.fd2), 126 + .cb = MultiWriter.onCompletion, 127 + }); 128 + } 129 + } 130 + 131 + pub fn onCompletion(rt: *io.Runtime, task: io.Task) anyerror!void { 132 + const self = task.userdataCast(MultiWriter); 133 + const result = task.result.?; 134 + 135 + const n = try result.write; 136 + switch (task.msgToEnum(MultiWriter.Msg)) { 137 + .fd1 => self.fd1_written += n, 138 + .fd2 => self.fd2_written += n, 139 + } 140 + 141 + const len = self.buf.items.len; 142 + 143 + if (self.fd1_written < len or self.fd2_written < len) 144 + return self.flush(rt); 145 + 146 + self.fd1_written = 0; 147 + self.fd2_written = 0; 148 + self.buf.clearRetainingCapacity(); 149 + } 150 + }; 151 + 152 + pub fn main() !void { 153 + var gpa: std.heap.DebugAllocator(.{}) = .init; 154 + var rt: io.Runtime = try .init(gpa.allocator(), 16); 155 + defer rt.deinit(); 156 + 157 + // Pretend I created some files 158 + const fd1: posix.fd_t = 5; 159 + const fd2: posix.fd_t = 6; 160 + 161 + var mw: MultiWriter = .init(fd1, fd2); 162 + try mw.write(gpa.allocator(), "Hello, world!"); 163 + try mw.flush(&rt); 164 + 165 + try rt.run(.until_done); 166 + } 167 + ```
+23
build.zig
··· 1 + const std = @import("std"); 2 + 3 + pub fn build(b: *std.Build) void { 4 + const target = b.standardTargetOptions(.{}); 5 + const optimize = b.standardOptimizeOption(.{}); 6 + 7 + const io_mod = b.createModule(.{ 8 + .root_source_file = b.path("src/main.zig"), 9 + .target = target, 10 + .optimize = optimize, 11 + }); 12 + const tls_dep = b.dependency("tls", .{ .target = target, .optimize = optimize }); 13 + io_mod.addImport("tls", tls_dep.module("tls")); 14 + 15 + const lib_unit_tests = b.addTest(.{ 16 + .root_module = io_mod, 17 + }); 18 + 19 + const run_lib_unit_tests = b.addRunArtifact(lib_unit_tests); 20 + 21 + const test_step = b.step("test", "Run unit tests"); 22 + test_step.dependOn(&run_lib_unit_tests.step); 23 + }
+20
build.zig.zon
··· 1 + .{ 2 + .name = .ourio, 3 + .version = "0.0.0", 4 + .fingerprint = 0xbfe65736d1b3cffe, // Changing this has security and trust implications. 5 + 6 + .minimum_zig_version = "0.14.0", 7 + 8 + .dependencies = .{ 9 + .tls = .{ 10 + .url = "git+https://github.com/ianic/tls.zig#8250aa9184fbad99983b32411bbe1a5d2fd6f4b7", 11 + .hash = "tls-0.1.0-ER2e0pU3BQB-UD2_s90uvppceH_h4KZxtHCrCct8L054", 12 + }, 13 + }, 14 + 15 + .paths = .{ 16 + "build.zig", 17 + "build.zig.zon", 18 + "src", 19 + }, 20 + }
+825
src/Kqueue.zig
··· 1 + const Kqueue = @This(); 2 + 3 + const std = @import("std"); 4 + const builtin = @import("builtin"); 5 + 6 + const io = @import("main.zig"); 7 + 8 + const Allocator = std.mem.Allocator; 9 + const EV = std.c.EV; 10 + const EVFILT = std.c.EVFILT; 11 + const Queue = @import("queue.zig").Intrusive; 12 + const assert = std.debug.assert; 13 + const posix = std.posix; 14 + 15 + gpa: Allocator, 16 + kq: posix.fd_t, 17 + /// Items we have prepared and waiting to be put into kqueue 18 + submission_queue: std.ArrayListUnmanaged(posix.Kevent) = .empty, 19 + 20 + /// Tasks that have been submitted to kqueue 21 + in_flight: Queue(io.Task, .in_flight) = .{}, 22 + 23 + /// Tasks which were completed synchronously during submission 24 + synchronous_queue: Queue(io.Task, .complete) = .{}, 25 + 26 + /// Queue for other kqueue instances to send "completion" tasks to this thread 27 + msg_ring_queue: Queue(io.Task, .complete) = .{}, 28 + 29 + /// Mutex to guard access to the msg_ring_queue. We *could* merge these tasks with the 30 + /// synchronous_queue, however we want to limit contention and msg_ring is probably pretty rare 31 + /// compared to synchronous tasks 32 + msg_ring_mutex: std.Thread.Mutex = .{}, 33 + 34 + /// List of timers, sorted descending so when we pop we get the next timer to expire 35 + timers: std.ArrayListUnmanaged(Timer) = .empty, 36 + 37 + events: [128]posix.Kevent = undefined, 38 + event_idx: usize = 0, 39 + 40 + const Timer = union(enum) { 41 + /// a deadline timer cancels a task if it fires 42 + deadline: struct { 43 + /// the deadline task. If the parent completes before the deadline, the parent will set the 44 + /// deadline task state to .free 45 + task: *io.Task, 46 + 47 + /// the task to cancel if the deadline expires 48 + parent: *io.Task, 49 + }, 50 + 51 + /// a regular timer 52 + timeout: struct { 53 + /// The task for the timer 54 + task: *io.Task, 55 + 56 + /// Absolute time in ms the timer was added 57 + added_ms: i64, 58 + }, 59 + 60 + /// Timer expires in the return value milliseconds from now 61 + fn expiresInMs(self: Timer, now_abs: i64) i64 { 62 + switch (self) { 63 + .deadline => |deadline| { 64 + const ts = deadline.task.req.deadline; 65 + const expires_ms = ts.sec * std.time.ms_per_s + @divTrunc(ts.nsec, std.time.ns_per_ms); 66 + return expires_ms - now_abs; 67 + }, 68 + 69 + // timeouts are relative, so we add the time it was added to the queue 70 + .timeout => |timeout| { 71 + const ts = timeout.task.req.timer; 72 + const expires_ms = ts.sec * std.time.ms_per_s + 73 + @divTrunc(ts.nsec, std.time.ns_per_ms) + 74 + timeout.added_ms; 75 + return expires_ms - now_abs; 76 + }, 77 + } 78 + } 79 + 80 + /// returns a timespec suitable for a kevent timeout. Relative to now 81 + fn timespec(self: Timer) posix.timespec { 82 + const expires = self.expiresInMs(std.time.milliTimestamp()); 83 + return .{ .sec = @divFloor(expires, 1000), .nsec = @mod(expires, 1000) * std.time.ns_per_ms }; 84 + } 85 + 86 + fn lessThan(now_ms: i64, lhs: Timer, rhs: Timer) bool { 87 + // reverse sort (we want soonest expiring last) 88 + return lhs.expiresInMs(now_ms) > rhs.expiresInMs(now_ms); 89 + } 90 + }; 91 + 92 + /// Messages we are waiting on using an EVFILT.USER 93 + const UserMsg = enum { 94 + /// A general wakeup message 95 + wakeup, 96 + 97 + fn fromInt(v: i64) UserMsg { 98 + return @enumFromInt(v); 99 + } 100 + }; 101 + 102 + /// Initialize a Ring 103 + pub fn init(gpa: Allocator, _: u16) !Kqueue { 104 + const kq = try posix.kqueue(); 105 + 106 + // Register a wakeup EVFILT.USER to wake up this kqueue 107 + var kevent = evSet( 108 + @intFromEnum(UserMsg.wakeup), 109 + EVFILT.USER, 110 + EV.ADD | EV.CLEAR, 111 + null, 112 + ); 113 + kevent.fflags = std.c.NOTE.FFNOP; 114 + _ = try posix.kevent(kq, &.{kevent}, &.{}, null); 115 + 116 + return .{ .gpa = gpa, .kq = kq }; 117 + } 118 + 119 + pub fn deinit(self: *Kqueue, gpa: Allocator) void { 120 + while (self.msg_ring_queue.pop()) |task| gpa.destroy(task); 121 + while (self.in_flight.pop()) |task| gpa.destroy(task); 122 + 123 + self.submission_queue.deinit(gpa); 124 + self.timers.deinit(gpa); 125 + 126 + posix.close(self.kq); 127 + self.* = undefined; 128 + } 129 + 130 + /// Initializes a child Ring which can be woken up by self. This must be called from the thread 131 + /// which will operate the child ring. Initializes with the same queue size as the parent 132 + pub fn initChild(self: Kqueue, entries: u16) !Kqueue { 133 + return init(self.gpa, entries); 134 + } 135 + 136 + pub fn submitAndWait(self: *Kqueue, queue: *io.SubmissionQueue) !void { 137 + defer self.submission_queue.clearRetainingCapacity(); 138 + while (queue.pop()) |task| { 139 + try self.prepTask(task); 140 + 141 + // If this task is queued and has a deadline we need to schedule a timer 142 + if (task.deadline) |deadline| { 143 + try self.addTimer(.{ .deadline = .{ .task = deadline, .parent = task } }); 144 + } 145 + } 146 + 147 + // Sort our timers 148 + const now = std.time.milliTimestamp(); 149 + std.sort.insertion(Timer, self.timers.items, now, Timer.lessThan); 150 + 151 + if (self.synchronous_queue.empty()) { 152 + // We don't have any synchronous completions, so we need to wait for some from kqueue 153 + return self.wait(); 154 + } 155 + 156 + // We already have completions from synchronous tasks. Submit our queued events and grab any new 157 + // completions for processing. We do so with a 0 timeout so that we are only grabbing already 158 + // completed items 159 + const timeout: posix.timespec = .{ .sec = 0, .nsec = 0 }; 160 + self.event_idx = try posix.kevent(self.kq, self.submission_queue.items, &self.events, &timeout); 161 + } 162 + 163 + fn wait(self: *Kqueue) !void { 164 + assert(self.synchronous_queue.empty()); 165 + 166 + // Go through our times until the first unexpired one 167 + while (true) { 168 + const t = self.timers.getLastOrNull() orelse break; 169 + const timeout: posix.timespec = t.timespec(); 170 + 171 + self.event_idx = try posix.kevent( 172 + self.kq, 173 + self.submission_queue.items, 174 + &self.events, 175 + &timeout, 176 + ); 177 + return; 178 + } 179 + 180 + // We had no timers so we wait indefinitely 181 + self.event_idx = try posix.kevent( 182 + self.kq, 183 + self.submission_queue.items, 184 + &self.events, 185 + null, 186 + ); 187 + } 188 + 189 + pub fn submit(self: *Kqueue, queue: *io.SubmissionQueue) !void { 190 + defer self.submission_queue.clearRetainingCapacity(); 191 + while (queue.pop()) |task| { 192 + try self.prepTask(task); 193 + 194 + // If this task is queued and has a deadline we need to schedule a timer 195 + if (task.state == .in_flight and task.deadline != null) { 196 + const deadline = task.deadline.?; 197 + try self.addTimer(.{ .deadline = .{ .task = deadline, .parent = task } }); 198 + } 199 + } 200 + 201 + // Sort our timers 202 + const now = std.time.milliTimestamp(); 203 + std.sort.insertion(Timer, self.timers.items, now, Timer.lessThan); 204 + 205 + // For submit, we don't try to reap any completions. Calls to submit will likely be relying on a 206 + // `poll` of the kqueue. We check in reapCompletinos if we have no reaped events and grab them 207 + // there if needed 208 + const timeout: posix.timespec = .{ .sec = 0, .nsec = 0 }; 209 + _ = try posix.kevent(self.kq, self.submission_queue.items, &.{}, &timeout); 210 + } 211 + 212 + /// preps a task to be submitted into the kqueue 213 + fn prepTask(self: *Kqueue, task: *io.Task) !void { 214 + return switch (task.req) { 215 + .accept => |req| { 216 + self.in_flight.push(task); 217 + const kevent = evSet(@intCast(req), EVFILT.READ, EV.ADD, task); 218 + try self.submission_queue.append(self.gpa, kevent); 219 + }, 220 + 221 + .cancel => |req| { 222 + // Cancel tasks are handled like this: 223 + // 0. If the task is not in flight, we don't do anything 224 + // 1. We set the cancel_task state as canceled 225 + // 2. If there is a kqueue event associated with it, we prep a kevent with EV.DELETE 226 + // 3. We add the task to the synchronous queue. This let's us ensure we hold the 227 + // cancel state until we've submitted the kevent associated with it. If any 228 + // completions occur while the state is canceled, we will ignore them. 229 + // 4. Callbacks are not called in the submit phase. We only call the callback of the 230 + // cancel_task with error.Canceled in reapCompletions 231 + // 5. In reapCompletions, we will return both the task and the cancel_task to the free 232 + // list 233 + 234 + self.synchronous_queue.push(task); 235 + 236 + task.result = .{ .cancel = {} }; 237 + switch (req) { 238 + .all => { 239 + while (self.in_flight.head) |t| { 240 + defer self.synchronous_queue.push(t); 241 + 242 + self.cancelTask(t) catch continue; 243 + } 244 + 245 + while (self.timers.getLastOrNull()) |tmr| { 246 + switch (tmr) { 247 + inline else => |v| { 248 + defer self.synchronous_queue.push(v.task); 249 + self.cancelTask(v.task) catch continue; 250 + }, 251 + } 252 + } 253 + }, 254 + .task => |task_to_cancel| { 255 + switch (task_to_cancel.state) { 256 + .free, .canceled, .complete => { 257 + task.result = .{ .cancel = error.NotCanceled }; 258 + return; 259 + }, 260 + .in_flight => {}, 261 + } 262 + try self.cancelTask(task_to_cancel); 263 + }, 264 + } 265 + }, 266 + 267 + .close => |req| { 268 + task.result = .{ .close = {} }; 269 + self.synchronous_queue.push(task); 270 + posix.close(req); 271 + }, 272 + 273 + .connect => |req| { 274 + // Set nonblocking. Call connect. Then add it to the kqueue. This will return as 275 + // writeable when the connect is complete 276 + const arg: posix.O = .{ .NONBLOCK = true }; 277 + const arg_u32: u32 = @bitCast(arg); 278 + _ = posix.fcntl(req.fd, posix.F.SETFL, arg_u32) catch { 279 + task.result = .{ .connect = error.Unexpected }; 280 + self.synchronous_queue.push(task); 281 + return; 282 + }; 283 + 284 + if (posix.connect(req.fd, req.addr, req.addr_len)) { 285 + // We connected immediately. No need to add to kqueue. Just push to the synchronous 286 + // queue to call the callback later 287 + task.result = .{ .connect = {} }; 288 + self.synchronous_queue.push(task); 289 + } else |err| { 290 + switch (err) { 291 + error.WouldBlock => { 292 + self.in_flight.push(task); 293 + // This is the error we expect. Add the event to kqueue 294 + const kevent = evSet(@intCast(req.fd), EVFILT.WRITE, EV.ADD | EV.ONESHOT, task); 295 + try self.submission_queue.append(self.gpa, kevent); 296 + }, 297 + else => { 298 + task.result = .{ .connect = error.Unexpected }; 299 + self.synchronous_queue.push(task); 300 + }, 301 + } 302 + } 303 + }, 304 + 305 + // deadlines are handled separately 306 + .deadline => unreachable, 307 + 308 + .msg_ring => |req| { 309 + const target = req.target; 310 + 311 + target.backend.platform.msg_ring_mutex.lock(); 312 + target.backend.platform.msg_ring_queue.push(req.task); 313 + target.backend.platform.msg_ring_mutex.unlock(); 314 + 315 + task.result = .{ .msg_ring = {} }; 316 + self.synchronous_queue.push(task); 317 + 318 + // wake up the other ring 319 + var kevent = evSet( 320 + @intFromEnum(UserMsg.wakeup), 321 + EVFILT.USER, 322 + 0, 323 + null, 324 + ); 325 + kevent.fflags |= std.c.NOTE.TRIGGER; 326 + // Trigger the wakeup 327 + _ = try posix.kevent(target.backend.platform.kq, &.{kevent}, &.{}, null); 328 + }, 329 + 330 + .noop => { 331 + task.result = .noop; 332 + self.synchronous_queue.push(task); 333 + }, 334 + 335 + .poll => |req| { 336 + self.in_flight.push(task); 337 + if (req.mask & posix.POLL.IN != 0) { 338 + const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task); 339 + try self.submission_queue.append(self.gpa, kevent); 340 + } 341 + if (req.mask & posix.POLL.OUT != 0) { 342 + const kevent = evSet(@intCast(req.fd), EVFILT.WRITE, EV.ADD | EV.ONESHOT, task); 343 + try self.submission_queue.append(self.gpa, kevent); 344 + } 345 + }, 346 + 347 + .recv => |req| { 348 + self.in_flight.push(task); 349 + const kevent = evSet(@intCast(req.fd), EVFILT.READ, EV.ADD | EV.ONESHOT, task); 350 + try self.submission_queue.append(self.gpa, kevent); 351 + }, 352 + 353 + .socket => |req| { 354 + self.synchronous_queue.push(task); 355 + if (posix.socket(req.domain, req.type, req.protocol)) |fd| 356 + task.result = .{ .socket = fd } 357 + else |_| 358 + task.result = .{ .socket = error.Unexpected }; 359 + }, 360 + 361 + .timer => { 362 + const now = std.time.milliTimestamp(); 363 + try self.addTimer(.{ .timeout = .{ .task = task, .added_ms = now } }); 364 + }, 365 + 366 + // user* fields are never seen by the runtime, only for internal message passing 367 + .userfd, .usermsg, .userptr => unreachable, 368 + 369 + .write => |req| { 370 + self.in_flight.push(task); 371 + const kevent = evSet(@intCast(req.fd), EVFILT.WRITE, EV.ADD | EV.ONESHOT, task); 372 + try self.submission_queue.append(self.gpa, kevent); 373 + }, 374 + 375 + .writev => |req| { 376 + self.in_flight.push(task); 377 + const kevent = evSet(@intCast(req.fd), EVFILT.WRITE, EV.ADD | EV.ONESHOT, task); 378 + try self.submission_queue.append(self.gpa, kevent); 379 + }, 380 + }; 381 + } 382 + 383 + fn cancelTask(self: *Kqueue, task: *io.Task) !void { 384 + task.state = .canceled; 385 + if (task.deadline) |d| d.state = .canceled; 386 + 387 + switch (task.req) { 388 + // Handled synchronously. Probably we couldn't cancel it, but if we did somehow we 389 + // don't need to do anything either way 390 + .cancel, 391 + .close, 392 + .msg_ring, 393 + .noop, 394 + .socket, 395 + .userfd, 396 + .usermsg, 397 + .userptr, 398 + => {}, 399 + 400 + .accept => |cancel_req| { 401 + self.in_flight.remove(task); 402 + task.result = .{ .accept = error.Canceled }; 403 + const kevent = evSet(@intCast(cancel_req), EVFILT.READ, EV.DELETE, task); 404 + try self.submission_queue.append(self.gpa, kevent); 405 + }, 406 + 407 + .connect => |cancel_req| { 408 + self.in_flight.remove(task); 409 + task.result = .{ .connect = error.Canceled }; 410 + const kevent = evSet( 411 + @intCast(cancel_req.fd), 412 + EVFILT.WRITE, 413 + EV.DELETE, 414 + task, 415 + ); 416 + try self.submission_queue.append(self.gpa, kevent); 417 + }, 418 + 419 + .deadline => { 420 + // What does it mean to cancel a deadline? We remove the deadline from 421 + // the parent and the timer from our list 422 + for (self.timers.items, 0..) |t, i| { 423 + if (t == .deadline and t.deadline.task == task) { 424 + // Set the parent deadline to null 425 + t.deadline.parent.deadline = null; 426 + // Remove the timer 427 + _ = self.timers.orderedRemove(i); 428 + task.result = .{ .deadline = error.Canceled }; 429 + return; 430 + } 431 + } else task.result = .{ .cancel = error.EntryNotFound }; 432 + }, 433 + 434 + .poll => |cancel_req| { 435 + self.in_flight.remove(task); 436 + task.result = .{ .poll = error.Canceled }; 437 + if (cancel_req.mask & posix.POLL.IN != 0) { 438 + const kevent = evSet(@intCast(cancel_req.fd), EVFILT.READ, EV.DELETE, task); 439 + try self.submission_queue.append(self.gpa, kevent); 440 + } 441 + if (cancel_req.mask & posix.POLL.OUT != 0) { 442 + const kevent = evSet(@intCast(cancel_req.fd), EVFILT.WRITE, EV.DELETE, task); 443 + try self.submission_queue.append(self.gpa, kevent); 444 + } 445 + }, 446 + 447 + .recv => |cancel_req| { 448 + self.in_flight.remove(task); 449 + task.result = .{ .recv = error.Canceled }; 450 + const kevent = evSet( 451 + @intCast(cancel_req.fd), 452 + EVFILT.READ, 453 + EV.DELETE, 454 + task, 455 + ); 456 + try self.submission_queue.append(self.gpa, kevent); 457 + }, 458 + 459 + .timer => { 460 + for (self.timers.items, 0..) |t, i| { 461 + if (t == .timeout and t.timeout.task == task) { 462 + // Remove the timer 463 + _ = self.timers.orderedRemove(i); 464 + task.result = .{ .timer = error.Canceled }; 465 + return; 466 + } 467 + } else task.result = .{ .cancel = error.EntryNotFound }; 468 + }, 469 + 470 + .write => |cancel_req| { 471 + self.in_flight.remove(task); 472 + task.result = .{ .write = error.Canceled }; 473 + const kevent = evSet( 474 + @intCast(cancel_req.fd), 475 + EVFILT.WRITE, 476 + EV.DELETE, 477 + task, 478 + ); 479 + try self.submission_queue.append(self.gpa, kevent); 480 + }, 481 + 482 + .writev => |cancel_req| { 483 + self.in_flight.remove(task); 484 + task.result = .{ .writev = error.Canceled }; 485 + const kevent = evSet( 486 + @intCast(cancel_req.fd), 487 + EVFILT.WRITE, 488 + EV.DELETE, 489 + task, 490 + ); 491 + try self.submission_queue.append(self.gpa, kevent); 492 + }, 493 + } 494 + } 495 + 496 + fn addTimer(self: *Kqueue, t: Timer) !void { 497 + try self.timers.append(self.gpa, t); 498 + } 499 + 500 + fn evSet(ident: usize, filter: i16, flags: u16, ptr: ?*anyopaque) posix.Kevent { 501 + return switch (builtin.os.tag) { 502 + .netbsd, 503 + .dragonfly, 504 + .openbsd, 505 + .macos, 506 + .ios, 507 + .tvos, 508 + .watchos, 509 + .visionos, 510 + => .{ 511 + .ident = ident, 512 + .filter = filter, 513 + .flags = flags, 514 + .fflags = 0, 515 + .data = 0, 516 + .udata = @intFromPtr(ptr), 517 + }, 518 + 519 + .freebsd => .{ 520 + .ident = ident, 521 + .filter = filter, 522 + .flags = flags, 523 + .fflags = 0, 524 + .data = 0, 525 + .udata = @intFromPtr(ptr), 526 + ._ext = &.{ 0, 0, 0, 0 }, 527 + }, 528 + 529 + else => @compileError("kqueue not supported"), 530 + }; 531 + } 532 + 533 + pub fn done(self: *Kqueue) bool { 534 + if (self.timers.items.len == 0 and 535 + self.in_flight.empty() and 536 + self.submission_queue.items.len == 0) 537 + { 538 + self.msg_ring_mutex.lock(); 539 + defer self.msg_ring_mutex.unlock(); 540 + return self.msg_ring_queue.empty(); 541 + } 542 + 543 + return false; 544 + } 545 + 546 + /// Return a file descriptor which can be used to poll the ring for completions 547 + pub fn pollableFd(self: Kqueue) !posix.fd_t { 548 + return self.kq; 549 + } 550 + 551 + pub fn reapCompletions(self: *Kqueue, rt: *io.Runtime) anyerror!void { 552 + defer self.event_idx = 0; 553 + 554 + if (self.event_idx == 0) { 555 + const timeout: posix.timespec = .{ .sec = 0, .nsec = 0 }; 556 + self.event_idx = try posix.kevent( 557 + self.kq, 558 + self.submission_queue.items, 559 + &self.events, 560 + &timeout, 561 + ); 562 + } 563 + 564 + for (self.events[0..self.event_idx]) |event| { 565 + // if the event is a USER filter, we check our msg_ring_queue 566 + if (event.filter == EVFILT.USER) { 567 + switch (UserMsg.fromInt(event.data)) { 568 + .wakeup => { 569 + // We got a message in our msg_ring_queue 570 + self.msg_ring_mutex.lock(); 571 + defer self.msg_ring_mutex.unlock(); 572 + 573 + while (self.msg_ring_queue.pop()) |task| { 574 + // For canceled msg_rings we do nothing 575 + if (task.state == .canceled) continue; 576 + 577 + defer self.releaseTask(rt, task); 578 + if (task.result == null) task.result = .noop; 579 + try task.callback(rt, task.*); 580 + } 581 + }, 582 + } 583 + continue; 584 + } 585 + 586 + const task: *io.Task = @ptrFromInt(event.udata); 587 + if (task.state == .canceled) continue; 588 + try self.handleCompletion(rt, task, event); 589 + } 590 + 591 + while (self.synchronous_queue.pop()) |task| { 592 + try self.handleSynchronousCompletion(rt, task); 593 + } 594 + 595 + const now = std.time.milliTimestamp(); 596 + while (self.timers.getLastOrNull()) |t| { 597 + if (t.expiresInMs(now) > 0) break; 598 + _ = self.timers.pop(); 599 + try self.handleExpiredTimer(rt, t); 600 + } 601 + } 602 + 603 + /// Handle a completion which was done synchronously. The work has already been done, we just need 604 + /// to call the callback and return the task(s) to the free list 605 + fn handleSynchronousCompletion( 606 + self: *Kqueue, 607 + rt: *io.Runtime, 608 + task: *io.Task, 609 + ) !void { 610 + switch (task.req) { 611 + // async tasks. These can be handled synchronously in a cancel all 612 + .accept, 613 + .poll, 614 + .recv, 615 + .write, 616 + .writev, 617 + 618 + // Timers can be handled synchronously in a cancel all 619 + .deadline, 620 + .timer, 621 + 622 + .connect, // connect is handled both sync and async 623 + .close, 624 + .msg_ring, 625 + .noop, 626 + .socket, 627 + .userfd, 628 + .usermsg, 629 + .userptr, 630 + => { 631 + assert(task.result != null); 632 + defer self.releaseTask(rt, task); 633 + try task.callback(rt, task.*); 634 + }, 635 + 636 + .cancel => |c| { 637 + assert(task.result != null); 638 + defer self.releaseTask(rt, task); 639 + try task.callback(rt, task.*); 640 + 641 + switch (c) { 642 + .all => {}, 643 + 644 + .task => |ct| { 645 + // If the cancel had an error, we don't need to return the task_to_cancel 646 + _ = task.result.?.cancel catch return; 647 + // On success, it is our job to call the canceled tasks' callback and return the 648 + // task to the free list 649 + defer self.releaseTask(rt, ct); 650 + const result: io.Result = switch (ct.req) { 651 + .accept => .{ .accept = error.Canceled }, 652 + .cancel => .{ .cancel = error.Canceled }, 653 + .close => .{ .close = error.Canceled }, 654 + .connect => .{ .connect = error.Canceled }, 655 + .deadline => .{ .deadline = error.Canceled }, 656 + .msg_ring => .{ .msg_ring = error.Canceled }, 657 + .noop => unreachable, 658 + .poll => .{ .poll = error.Canceled }, 659 + .recv => .{ .recv = error.Canceled }, 660 + .socket => .{ .socket = error.Canceled }, 661 + .timer => .{ .timer = error.Canceled }, 662 + .userfd, .usermsg, .userptr => unreachable, 663 + .write => .{ .write = error.Canceled }, 664 + .writev => .{ .writev = error.Canceled }, 665 + }; 666 + ct.result = result; 667 + try ct.callback(rt, ct.*); 668 + }, 669 + } 670 + }, 671 + } 672 + } 673 + 674 + fn dataToE(result: i64) std.posix.E { 675 + if (result > 0) { 676 + return @as(std.posix.E, @enumFromInt(-result)); 677 + } 678 + return .SUCCESS; 679 + } 680 + 681 + fn unexpectedError(err: posix.E) posix.UnexpectedError { 682 + std.log.err("unexpected posix error: {}", .{err}); 683 + return error.Unexpected; 684 + } 685 + 686 + fn handleCompletion( 687 + self: *Kqueue, 688 + rt: *io.Runtime, 689 + task: *io.Task, 690 + event: posix.Kevent, 691 + ) !void { 692 + switch (task.req) { 693 + .cancel, 694 + .close, 695 + .deadline, 696 + .msg_ring, 697 + .noop, 698 + .socket, 699 + .timer, 700 + .userfd, 701 + .usermsg, 702 + .userptr, 703 + => unreachable, 704 + 705 + .accept => |req| { 706 + // Accept is a multishot request, so we don't remove it from the in_flight queue 707 + if (event.flags & EV.ERROR != 0) { 708 + // Interpret data as an errno 709 + const err = unexpectedError(dataToE(event.data)); 710 + task.result = .{ .accept = err }; 711 + return task.callback(rt, task.*); 712 + } 713 + 714 + if (posix.accept(req, null, null, 0)) |fd| 715 + task.result = .{ .accept = fd } 716 + else |_| 717 + task.result = .{ .accept = error.Unexpected }; 718 + return task.callback(rt, task.*); 719 + }, 720 + 721 + .connect => { 722 + defer self.releaseTask(rt, task); 723 + self.in_flight.remove(task); 724 + if (event.flags & EV.ERROR != 0) { 725 + // Interpret data as an errno 726 + const err = unexpectedError(dataToE(event.data)); 727 + task.result = .{ .connect = err }; 728 + } else task.result = .{ .connect = {} }; 729 + return task.callback(rt, task.*); 730 + }, 731 + 732 + .poll => { 733 + defer self.releaseTask(rt, task); 734 + self.in_flight.remove(task); 735 + if (event.flags & EV.ERROR != 0) { 736 + // Interpret data as an errno 737 + const err = unexpectedError(dataToE(event.data)); 738 + task.result = .{ .poll = err }; 739 + } else task.result = .{ .poll = {} }; 740 + return task.callback(rt, task.*); 741 + }, 742 + 743 + .recv => |req| { 744 + defer self.releaseTask(rt, task); 745 + self.in_flight.remove(task); 746 + if (event.flags & EV.ERROR != 0) { 747 + // Interpret data as an errno 748 + const err = unexpectedError(dataToE(event.data)); 749 + task.result = .{ .recv = err }; 750 + return task.callback(rt, task.*); 751 + } 752 + if (posix.recv(req.fd, req.buffer, 0)) |n| 753 + task.result = .{ .recv = n } 754 + else |_| 755 + task.result = .{ .recv = error.Unexpected }; 756 + return task.callback(rt, task.*); 757 + }, 758 + 759 + .write => |req| { 760 + defer self.releaseTask(rt, task); 761 + self.in_flight.remove(task); 762 + if (event.flags & EV.ERROR != 0) { 763 + // Interpret data as an errno 764 + const err = unexpectedError(dataToE(event.data)); 765 + task.result = .{ .write = err }; 766 + return task.callback(rt, task.*); 767 + } 768 + if (posix.write(req.fd, req.buffer)) |n| 769 + task.result = .{ .write = n } 770 + else |_| 771 + task.result = .{ .write = error.Unexpected }; 772 + return task.callback(rt, task.*); 773 + }, 774 + 775 + .writev => |req| { 776 + defer self.releaseTask(rt, task); 777 + self.in_flight.remove(task); 778 + if (event.flags & EV.ERROR != 0) { 779 + // Interpret data as an errno 780 + const err = unexpectedError(dataToE(event.data)); 781 + task.result = .{ .writev = err }; 782 + return task.callback(rt, task.*); 783 + } 784 + if (posix.writev(req.fd, req.vecs)) |n| 785 + task.result = .{ .writev = n } 786 + else |_| 787 + task.result = .{ .writev = error.Unexpected }; 788 + return task.callback(rt, task.*); 789 + }, 790 + } 791 + } 792 + 793 + fn releaseTask(self: *Kqueue, rt: *io.Runtime, task: *io.Task) void { 794 + rt.free_q.push(task); 795 + if (task.deadline) |d| { 796 + // remove the deadline 797 + for (self.timers.items, 0..) |t, i| { 798 + if (t == .deadline and t.deadline.task == d) { 799 + // Remove the timer 800 + _ = self.timers.orderedRemove(i); 801 + rt.free_q.push(t.deadline.task); 802 + return; 803 + } 804 + } 805 + } 806 + } 807 + 808 + fn handleExpiredTimer(self: *Kqueue, rt: *io.Runtime, t: Timer) !void { 809 + switch (t) { 810 + .deadline => |deadline| { 811 + defer self.releaseTask(rt, deadline.task); 812 + if (deadline.task.state == .canceled) return; 813 + 814 + try deadline.parent.cancel(rt, .{}); 815 + }, 816 + 817 + .timeout => |timeout| { 818 + const task = timeout.task; 819 + defer self.releaseTask(rt, task); 820 + if (task.state == .canceled) return; 821 + task.result = .{ .timer = {} }; 822 + try task.callback(rt, task.*); 823 + }, 824 + } 825 + }
+89
src/Mock.zig
··· 1 + const Mock = @This(); 2 + 3 + const std = @import("std"); 4 + 5 + const io = @import("main.zig"); 6 + 7 + const Allocator = std.mem.Allocator; 8 + const Queue = @import("queue.zig").Intrusive; 9 + const assert = std.debug.assert; 10 + const posix = std.posix; 11 + 12 + completions: Queue(io.Task, .complete) = .{}, 13 + 14 + accept_cb: ?*const fn (*io.Task) io.Result = null, 15 + cancel_cb: ?*const fn (*io.Task) io.Result = null, 16 + close_cb: ?*const fn (*io.Task) io.Result = null, 17 + connect_cb: ?*const fn (*io.Task) io.Result = null, 18 + deadline_cb: ?*const fn (*io.Task) io.Result = null, 19 + msg_ring_cb: ?*const fn (*io.Task) io.Result = null, 20 + noop_cb: ?*const fn (*io.Task) io.Result = null, 21 + poll_cb: ?*const fn (*io.Task) io.Result = null, 22 + recv_cb: ?*const fn (*io.Task) io.Result = null, 23 + socket_cb: ?*const fn (*io.Task) io.Result = null, 24 + timer_cb: ?*const fn (*io.Task) io.Result = null, 25 + write_cb: ?*const fn (*io.Task) io.Result = null, 26 + writev_cb: ?*const fn (*io.Task) io.Result = null, 27 + 28 + userfd_cb: ?*const fn (*io.Task) io.Result = null, 29 + usermsg_cb: ?*const fn (*io.Task) io.Result = null, 30 + userptr_cb: ?*const fn (*io.Task) io.Result = null, 31 + 32 + /// Initialize a Ring 33 + pub fn init(_: u16) !Mock { 34 + return .{}; 35 + } 36 + 37 + pub fn deinit(self: *Mock, _: Allocator) void { 38 + self.* = undefined; 39 + } 40 + 41 + pub fn done(self: *Mock) bool { 42 + return self.completions.empty(); 43 + } 44 + 45 + /// Initializes a child Ring which can be woken up by self. This must be called from the thread 46 + /// which will operate the child ring. Initializes with the same queue size as the parent 47 + pub fn initChild(_: Mock, entries: u16) !Mock { 48 + return init(entries); 49 + } 50 + 51 + /// Return a file descriptor which can be used to poll the ring for completions 52 + pub fn pollableFd(_: *Mock) !posix.fd_t { 53 + return -1; 54 + } 55 + 56 + pub fn submitAndWait(self: *Mock, queue: *Queue(io.Task, .in_flight)) !void { 57 + return self.submit(queue); 58 + } 59 + 60 + pub fn submit(self: *Mock, queue: *Queue(io.Task, .in_flight)) !void { 61 + while (queue.pop()) |task| { 62 + task.result = switch (task.req) { 63 + .accept => if (self.accept_cb) |cb| cb(task) else return error.NoMockCallback, 64 + .cancel => if (self.cancel_cb) |cb| cb(task) else return error.NoMockCallback, 65 + .close => if (self.close_cb) |cb| cb(task) else return error.NoMockCallback, 66 + .connect => if (self.connect_cb) |cb| cb(task) else return error.NoMockCallback, 67 + .deadline => if (self.deadline_cb) |cb| cb(task) else return error.NoMockCallback, 68 + .msg_ring => if (self.msg_ring_cb) |cb| cb(task) else return error.NoMockCallback, 69 + .noop => if (self.noop_cb) |cb| cb(task) else return error.NoMockCallback, 70 + .poll => if (self.poll_cb) |cb| cb(task) else return error.NoMockCallback, 71 + .recv => if (self.recv_cb) |cb| cb(task) else return error.NoMockCallback, 72 + .socket => if (self.socket_cb) |cb| cb(task) else return error.NoMockCallback, 73 + .timer => if (self.timer_cb) |cb| cb(task) else return error.NoMockCallback, 74 + .userfd => if (self.userfd_cb) |cb| cb(task) else return error.NoMockCallback, 75 + .usermsg => if (self.usermsg_cb) |cb| cb(task) else return error.NoMockCallback, 76 + .userptr => if (self.userptr_cb) |cb| cb(task) else return error.NoMockCallback, 77 + .write => if (self.write_cb) |cb| cb(task) else return error.NoMockCallback, 78 + .writev => if (self.writev_cb) |cb| cb(task) else return error.NoMockCallback, 79 + }; 80 + self.completions.push(task); 81 + } 82 + } 83 + 84 + pub fn reapCompletions(self: *Mock, rt: *io.Runtime) anyerror!void { 85 + while (self.completions.pop()) |task| { 86 + try task.callback(rt, task.*); 87 + rt.free_q.push(task); 88 + } 89 + }
+77
src/Task.zig
··· 1 + const Task = @This(); 2 + 3 + const std = @import("std"); 4 + const io = @import("main.zig"); 5 + 6 + const Allocator = std.mem.Allocator; 7 + const Runtime = io.Runtime; 8 + 9 + userdata: ?*anyopaque = null, 10 + msg: u16 = 0, 11 + callback: io.Callback = io.noopCallback, 12 + req: io.Request = .noop, 13 + 14 + result: ?io.Result = null, 15 + 16 + state: enum { 17 + /// The task is free to be scheduled 18 + free, 19 + 20 + /// The task is in flight and may not be rescheduled. Some operations generate multiple 21 + /// completions, so it is possible to receive a task in Callback and the task is still 22 + /// considered to be in flight 23 + in_flight, 24 + 25 + /// The task was completed 26 + complete, 27 + 28 + /// The operation was canceled 29 + canceled, 30 + } = .free, 31 + 32 + /// Deadline for the task to complete, in absolute time. If 0, there is no deadline 33 + deadline: ?*Task = null, 34 + 35 + next: ?*Task = null, 36 + prev: ?*Task = null, 37 + 38 + pub fn setDeadline( 39 + self: *Task, 40 + rt: *Runtime, 41 + deadline: io.Timespec, 42 + ) Allocator.Error!void { 43 + std.debug.assert(!deadline.isZero()); 44 + const task = try rt.getTask(); 45 + 46 + task.* = .{ 47 + .callback = io.noopCallback, 48 + .userdata = null, 49 + .msg = 0, 50 + .req = .{ .deadline = deadline }, 51 + }; 52 + 53 + self.deadline = task; 54 + } 55 + 56 + pub fn cancel( 57 + self: *Task, 58 + rt: *Runtime, 59 + ctx: io.Context, 60 + ) Allocator.Error!void { 61 + const task = try rt.getTask(); 62 + task.* = .{ 63 + .callback = ctx.cb, 64 + .msg = ctx.msg, 65 + .userdata = ctx.ptr, 66 + .req = .{ .cancel = .{ .task = self } }, 67 + }; 68 + rt.submission_q.push(task); 69 + } 70 + 71 + pub fn userdataCast(self: Task, comptime T: type) *T { 72 + return @ptrCast(@alignCast(self.userdata)); 73 + } 74 + 75 + pub fn msgToEnum(self: Task, comptime Enum: type) Enum { 76 + return @enumFromInt(self.msg); 77 + }
+375
src/Uring.zig
··· 1 + const Uring = @This(); 2 + 3 + const std = @import("std"); 4 + const builtin = @import("builtin"); 5 + 6 + const io = @import("main.zig"); 7 + 8 + const Allocator = std.mem.Allocator; 9 + const Queue = @import("queue.zig").Intrusive; 10 + const assert = std.debug.assert; 11 + const linux = std.os.linux; 12 + const posix = std.posix; 13 + 14 + const common_flags: u32 = 15 + linux.IORING_SETUP_SUBMIT_ALL | // Keep submitting events even if one had an error 16 + linux.IORING_SETUP_CLAMP | // Clamp entries to system supported max 17 + linux.IORING_SETUP_DEFER_TASKRUN | // Defer work until we submit tasks. Requires SINGLE_ISSUER 18 + linux.IORING_SETUP_COOP_TASKRUN | // Don't interupt userspace when task is complete 19 + linux.IORING_SETUP_SINGLE_ISSUER; // Only a single thread will issue tasks 20 + 21 + const msg_ring_received_cqe = 1 << 8; 22 + 23 + ring: linux.IoUring, 24 + in_flight: Queue(io.Task, .in_flight) = .{}, 25 + eventfd: ?posix.fd_t = null, 26 + 27 + /// Initialize a Ring 28 + pub fn init(_: Allocator, entries: u16) !Uring { 29 + var params = std.mem.zeroInit(linux.io_uring_params, .{ 30 + .flags = common_flags, 31 + .sq_thread_idle = 1000, 32 + }); 33 + 34 + return .{ .ring = try .init_params(entries, &params) }; 35 + } 36 + 37 + pub fn deinit(self: *Uring, gpa: Allocator) void { 38 + while (self.in_flight.pop()) |task| gpa.destroy(task); 39 + 40 + if (self.ring.fd >= 0) { 41 + self.ring.deinit(); 42 + } 43 + if (self.eventfd) |fd| { 44 + posix.close(fd); 45 + self.eventfd = null; 46 + } 47 + self.* = undefined; 48 + } 49 + 50 + /// Initializes a child Ring which can be woken up by self. This must be called from the thread 51 + /// which will operate the child ring. Initializes with the same queue size as the parent 52 + pub fn initChild(self: Uring, entries: u16) !Uring { 53 + const flags: u32 = common_flags | linux.IORING_SETUP_ATTACH_WQ; 54 + 55 + var params = std.mem.zeroInit(linux.io_uring_params, .{ 56 + .flags = flags, 57 + .sq_thread_idle = 1000, 58 + .wq_fd = @as(u32, @bitCast(self.ring.fd)), 59 + }); 60 + 61 + return .{ .ring = try .init_params(entries, &params) }; 62 + } 63 + 64 + pub fn done(self: *Uring) bool { 65 + return self.in_flight.empty(); 66 + } 67 + 68 + /// Return a file descriptor which can be used to poll the ring for completions 69 + pub fn pollableFd(self: *Uring) !posix.fd_t { 70 + if (self.eventfd) |fd| return fd; 71 + const fd: posix.fd_t = @intCast(linux.eventfd(0, linux.EFD.CLOEXEC | linux.EFD.NONBLOCK)); 72 + try self.ring.register_eventfd(fd); 73 + self.eventfd = fd; 74 + return fd; 75 + } 76 + 77 + pub fn submitAndWait(self: *Uring, queue: *io.SubmissionQueue) !void { 78 + var sqes_available = self.sqesAvailable(); 79 + while (queue.pop()) |task| { 80 + const sqes_required = sqesRequired(task); 81 + if (sqes_available < sqes_required) { 82 + sqes_available += try self.ring.submit(); 83 + continue; 84 + } 85 + defer sqes_available -= sqes_required; 86 + self.prepTask(task); 87 + } 88 + 89 + while (true) { 90 + _ = self.ring.submit_and_wait(1) catch |err| { 91 + switch (err) { 92 + error.SignalInterrupt => continue, 93 + else => return err, 94 + } 95 + }; 96 + return; 97 + } 98 + } 99 + 100 + pub fn submit(self: *Uring, queue: *io.SubmissionQueue) !void { 101 + var sqes_available = self.sqesAvailable(); 102 + while (queue.pop()) |task| { 103 + const sqes_required = sqesRequired(task); 104 + if (sqes_available < sqes_required) { 105 + sqes_available += try self.ring.submit(); 106 + continue; 107 + } 108 + defer sqes_available -= sqes_required; 109 + self.prepTask(task); 110 + } 111 + const n = try self.ring.submit(); 112 + _ = try self.ring.enter(n, 0, linux.IORING_ENTER_GETEVENTS); 113 + } 114 + 115 + fn sqesRequired(task: *const io.Task) u32 { 116 + return if (task.deadline == null) 1 else 2; 117 + } 118 + 119 + fn sqesAvailable(self: *Uring) u32 { 120 + return @intCast(self.ring.sq.sqes.len - self.ring.sq_ready()); 121 + } 122 + 123 + fn prepTask(self: *Uring, task: *io.Task) void { 124 + self.in_flight.push(task); 125 + switch (task.req) { 126 + .noop => { 127 + const sqe = self.getSqe(); 128 + sqe.prep_nop(); 129 + sqe.user_data = @intFromPtr(task); 130 + self.prepDeadline(task, sqe); 131 + }, 132 + 133 + // Deadlines are always prepared from their parent task 134 + .deadline => unreachable, 135 + 136 + .timer => |*t| { 137 + const sqe = self.getSqe(); 138 + sqe.prep_timeout(@ptrCast(t), 0, linux.IORING_TIMEOUT_REALTIME); 139 + sqe.user_data = @intFromPtr(task); 140 + self.prepDeadline(task, sqe); 141 + }, 142 + 143 + .cancel => |c| { 144 + const sqe = self.getSqe(); 145 + switch (c) { 146 + .all => sqe.prep_cancel(0, linux.IORING_ASYNC_CANCEL_ANY), 147 + .task => |t| sqe.prep_cancel(@intFromPtr(t), 0), 148 + } 149 + sqe.user_data = @intFromPtr(task); 150 + self.prepDeadline(task, sqe); 151 + }, 152 + 153 + .accept => |fd| { 154 + const sqe = self.getSqe(); 155 + sqe.prep_multishot_accept(fd, null, null, 0); 156 + sqe.user_data = @intFromPtr(task); 157 + self.prepDeadline(task, sqe); 158 + }, 159 + 160 + .msg_ring => |msg| { 161 + const sqe = self.getSqe(); 162 + const fd = msg.target.backend.platform.ring.fd; 163 + sqe.prep_rw(.MSG_RING, fd, 0, 0, @intFromPtr(msg.task)); 164 + sqe.user_data = @intFromPtr(task); 165 + // Pass flags on the sent CQE. We use this to distinguish between a received message and 166 + // a message freom our own loop 167 + sqe.rw_flags |= linux.IORING_MSG_RING_FLAGS_PASS; 168 + sqe.splice_fd_in |= msg_ring_received_cqe; 169 + self.prepDeadline(task, sqe); 170 + }, 171 + 172 + .recv => |req| { 173 + const sqe = self.getSqe(); 174 + sqe.prep_recv(req.fd, req.buffer, 0); 175 + sqe.user_data = @intFromPtr(task); 176 + self.prepDeadline(task, sqe); 177 + }, 178 + 179 + .write => |req| { 180 + const sqe = self.getSqe(); 181 + sqe.prep_write(req.fd, req.buffer, 0); 182 + sqe.user_data = @intFromPtr(task); 183 + self.prepDeadline(task, sqe); 184 + }, 185 + 186 + .writev => |req| { 187 + const sqe = self.getSqe(); 188 + sqe.prep_writev(req.fd, req.vecs, 0); 189 + sqe.user_data = @intFromPtr(task); 190 + self.prepDeadline(task, sqe); 191 + }, 192 + 193 + .close => |fd| { 194 + const sqe = self.getSqe(); 195 + sqe.prep_close(fd); 196 + sqe.user_data = @intFromPtr(task); 197 + self.prepDeadline(task, sqe); 198 + }, 199 + 200 + .poll => |req| { 201 + const sqe = self.getSqe(); 202 + sqe.prep_poll_add(req.fd, req.mask); 203 + sqe.user_data = @intFromPtr(task); 204 + self.prepDeadline(task, sqe); 205 + }, 206 + 207 + .socket => |req| { 208 + const sqe = self.getSqe(); 209 + sqe.prep_socket(req.domain, req.type, req.protocol, 0); 210 + sqe.user_data = @intFromPtr(task); 211 + self.prepDeadline(task, sqe); 212 + }, 213 + 214 + .connect => |req| { 215 + const sqe = self.getSqe(); 216 + sqe.prep_connect(req.fd, req.addr, req.addr_len); 217 + sqe.user_data = @intFromPtr(task); 218 + self.prepDeadline(task, sqe); 219 + }, 220 + 221 + // user* is only sent internally between rings and higher level wrappers 222 + .userfd, .usermsg, .userptr => unreachable, 223 + } 224 + } 225 + 226 + fn prepDeadline(self: *Uring, parent_task: *io.Task, parent_sqe: *linux.io_uring_sqe) void { 227 + const task = parent_task.deadline orelse return; 228 + self.in_flight.push(task); 229 + assert(task.req == .deadline); 230 + parent_sqe.flags |= linux.IOSQE_IO_LINK; 231 + 232 + const sqe = self.getSqe(); 233 + const flags = linux.IORING_TIMEOUT_ABS | // absolute time 234 + linux.IORING_TIMEOUT_REALTIME; // use the realtime clock (as opposed to boot time) 235 + sqe.prep_link_timeout(@ptrCast(&task.req.deadline), flags); 236 + sqe.user_data = @intFromPtr(task); 237 + } 238 + 239 + /// Get an sqe from the ring. Caller should only call this function if they are sure we have an SQE 240 + /// available. Asserts that we have one available 241 + fn getSqe(self: *Uring) *linux.io_uring_sqe { 242 + assert(self.ring.sq.sqes.len > self.ring.sq_ready()); 243 + return self.ring.get_sqe() catch unreachable; 244 + } 245 + 246 + pub fn reapCompletions(self: *Uring, rt: *io.Runtime) anyerror!void { 247 + var cqes: [64]linux.io_uring_cqe = undefined; 248 + const n = self.ring.copy_cqes(&cqes, 0) catch |err| { 249 + switch (err) { 250 + error.SignalInterrupt => return, 251 + else => return err, 252 + } 253 + }; 254 + for (cqes[0..n]) |cqe| { 255 + const task: *io.Task = @ptrFromInt(cqe.user_data); 256 + 257 + task.result = switch (task.req) { 258 + .noop => .noop, 259 + 260 + // Deadlines we don't do anything for, these are always sent to a noopCallback 261 + .deadline => .{ .deadline = {} }, 262 + 263 + .timer => .{ .timer = switch (cqeToE(cqe.res)) { 264 + .SUCCESS, .TIME => {}, 265 + .INVAL, .FAULT => io.ResultError.Invalid, 266 + .CANCELED => io.ResultError.Canceled, 267 + else => |e| unexpectedError(e), 268 + } }, 269 + 270 + .cancel => .{ .cancel = switch (cqeToE(cqe.res)) { 271 + .SUCCESS => {}, 272 + .INVAL => io.ResultError.Invalid, 273 + .CANCELED => io.ResultError.Canceled, 274 + .NOENT => io.CancelError.EntryNotFound, 275 + .ALREADY => io.CancelError.NotCanceled, 276 + else => |e| unexpectedError(e), 277 + } }, 278 + 279 + .accept => .{ .accept = switch (cqeToE(cqe.res)) { 280 + .SUCCESS => cqe.res, 281 + .INVAL => io.ResultError.Invalid, 282 + .CANCELED => io.ResultError.Canceled, 283 + else => |e| unexpectedError(e), 284 + } }, 285 + 286 + .msg_ring => .{ .msg_ring = switch (cqeToE(cqe.res)) { 287 + .SUCCESS => {}, 288 + .INVAL => io.ResultError.Invalid, 289 + .CANCELED => io.ResultError.Canceled, 290 + else => |e| unexpectedError(e), 291 + } }, 292 + 293 + .recv => .{ .recv = switch (cqeToE(cqe.res)) { 294 + .SUCCESS => @intCast(cqe.res), 295 + .INVAL => io.ResultError.Invalid, 296 + .CANCELED => io.ResultError.Canceled, 297 + .CONNRESET => io.RecvError.ConnectionResetByPeer, 298 + else => |e| unexpectedError(e), 299 + } }, 300 + 301 + .write => .{ .write = switch (cqeToE(cqe.res)) { 302 + .SUCCESS => @intCast(cqe.res), 303 + .INVAL => io.ResultError.Invalid, 304 + .CANCELED => io.ResultError.Canceled, 305 + else => |e| unexpectedError(e), 306 + } }, 307 + 308 + .writev => .{ .writev = switch (cqeToE(cqe.res)) { 309 + .SUCCESS => @intCast(cqe.res), 310 + .INVAL => io.ResultError.Invalid, 311 + .CANCELED => io.ResultError.Canceled, 312 + else => |e| unexpectedError(e), 313 + } }, 314 + 315 + .close => .{ .close = switch (cqeToE(cqe.res)) { 316 + .SUCCESS => {}, 317 + .INVAL => io.ResultError.Invalid, 318 + .CANCELED => io.ResultError.Canceled, 319 + else => |e| unexpectedError(e), 320 + } }, 321 + 322 + .poll => .{ .poll = switch (cqeToE(cqe.res)) { 323 + .SUCCESS => {}, 324 + .INVAL => io.ResultError.Invalid, 325 + .CANCELED => io.ResultError.Canceled, 326 + else => |e| unexpectedError(e), 327 + } }, 328 + 329 + .socket => .{ .socket = switch (cqeToE(cqe.res)) { 330 + .SUCCESS => @intCast(cqe.res), 331 + .INVAL => io.ResultError.Invalid, 332 + .CANCELED => io.ResultError.Canceled, 333 + else => |e| unexpectedError(e), 334 + } }, 335 + 336 + .connect => .{ .connect = switch (cqeToE(cqe.res)) { 337 + .SUCCESS => {}, 338 + .INVAL => io.ResultError.Invalid, 339 + .CANCELED => io.ResultError.Canceled, 340 + else => |e| unexpectedError(e), 341 + } }, 342 + 343 + .usermsg => .{ .usermsg = @intCast(cqe.res) }, 344 + 345 + // userfd should never reach the runtime 346 + .userfd, .userptr => unreachable, 347 + }; 348 + 349 + try task.callback(rt, task.*); 350 + 351 + if (cqe.flags & msg_ring_received_cqe != 0) { 352 + // This message was received from another ring. We don't decrement inflight for this. 353 + // But we do need to set the task as free because we will add it to our free list 354 + rt.free_q.push(task); 355 + } else if (cqe.flags & linux.IORING_CQE_F_MORE == 0) { 356 + // If the cqe doesn't have IORING_CQE_F_MORE set, then this task is complete and free to 357 + // be rescheduled 358 + task.state = .complete; 359 + self.in_flight.remove(task); 360 + rt.free_q.push(task); 361 + } 362 + } 363 + } 364 + 365 + fn cqeToE(result: i32) std.posix.E { 366 + if (result > -4096 and result < 0) { 367 + return @as(std.posix.E, @enumFromInt(-result)); 368 + } 369 + return .SUCCESS; 370 + } 371 + 372 + fn unexpectedError(err: posix.E) posix.UnexpectedError { 373 + std.log.err("unexpected posix error: {}", .{err}); 374 + return error.Unexpected; 375 + }
+717
src/main.zig
··· 1 + const std = @import("std"); 2 + const builtin = @import("builtin"); 3 + 4 + const Allocator = std.mem.Allocator; 5 + pub const Mock = @import("Mock.zig"); 6 + const Queue = @import("queue.zig").Intrusive; 7 + const io = @This(); 8 + const posix = std.posix; 9 + 10 + pub const has_kqueue = switch (builtin.os.tag) { 11 + .dragonfly, 12 + .freebsd, 13 + .ios, 14 + .macos, 15 + .netbsd, 16 + .openbsd, 17 + .tvos, 18 + .visionos, 19 + .watchos, 20 + => true, 21 + 22 + else => false, 23 + }; 24 + pub const has_io_uring = builtin.os.tag == .linux; 25 + 26 + pub const Task = @import("Task.zig"); 27 + pub const Callback = *const fn (*Runtime, Task) anyerror!void; 28 + pub fn noopCallback(_: *Runtime, _: Task) anyerror!void {} 29 + 30 + pub const RunCondition = enum { 31 + once, 32 + until_done, 33 + forever, 34 + }; 35 + 36 + pub const Context = struct { 37 + ptr: ?*anyopaque = null, 38 + msg: u16 = 0, 39 + cb: Callback = noopCallback, 40 + }; 41 + 42 + /// Used for timeouts and deadlines. We make this struct extern because we will ptrCast it to the 43 + /// linux kernel timespec struct 44 + pub const Timespec = extern struct { 45 + sec: i64 = 0, 46 + nsec: i64 = 0, 47 + 48 + pub fn isZero(self: Timespec) bool { 49 + return self.sec == 0 and self.nsec == 0; 50 + } 51 + }; 52 + 53 + pub const Backend = union(enum) { 54 + mock: Mock, 55 + 56 + platform: switch (builtin.os.tag) { 57 + .dragonfly, 58 + .freebsd, 59 + .ios, 60 + .macos, 61 + .netbsd, 62 + .openbsd, 63 + .tvos, 64 + .visionos, 65 + .watchos, 66 + => @import("Kqueue.zig"), 67 + 68 + .linux => @import("Uring.zig"), 69 + 70 + else => @compileError("unsupported os"), 71 + }, 72 + 73 + pub fn initChild(self: *Backend, entries: u16) !Backend { 74 + switch (self.*) { 75 + .mock => return .{ .mock = .{} }, 76 + .platform => |*p| return .{ .platform = try p.initChild(entries) }, 77 + } 78 + } 79 + 80 + pub fn deinit(self: *Backend, gpa: Allocator) void { 81 + switch (self.*) { 82 + inline else => |*backend| backend.deinit(gpa), 83 + } 84 + } 85 + 86 + pub fn pollableFd(self: *Backend) !posix.fd_t { 87 + return switch (self.*) { 88 + inline else => |*backend| backend.pollableFd(), 89 + }; 90 + } 91 + 92 + pub fn submitAndWait(self: *Backend, queue: *SubmissionQueue) !void { 93 + return switch (self.*) { 94 + inline else => |*backend| backend.submitAndWait(queue), 95 + }; 96 + } 97 + 98 + pub fn submit(self: *Backend, queue: *SubmissionQueue) !void { 99 + return switch (self.*) { 100 + inline else => |*backend| backend.submit(queue), 101 + }; 102 + } 103 + 104 + pub fn reapCompletions( 105 + self: *Backend, 106 + rt: *Runtime, 107 + ) !void { 108 + return switch (self.*) { 109 + inline else => |*backend| backend.reapCompletions(rt), 110 + }; 111 + } 112 + 113 + pub fn done(self: *Backend) bool { 114 + return switch (self.*) { 115 + inline else => |*backend| backend.done(), 116 + }; 117 + } 118 + }; 119 + 120 + pub const CompletionQueue = Queue(Task, .complete); 121 + pub const FreeQueue = Queue(Task, .free); 122 + pub const SubmissionQueue = Queue(Task, .in_flight); 123 + 124 + pub const Runtime = struct { 125 + backend: Backend, 126 + gpa: Allocator, 127 + 128 + completion_q: CompletionQueue = .{}, 129 + submission_q: SubmissionQueue = .{}, 130 + free_q: FreeQueue = .{}, 131 + 132 + pub fn init(gpa: Allocator, entries: u16) !Runtime { 133 + return .{ 134 + .backend = .{ .platform = try .init(gpa, entries) }, 135 + .gpa = gpa, 136 + }; 137 + } 138 + 139 + pub fn initChild(self: *Runtime, entries: u16) !Runtime { 140 + return .{ 141 + .backend = try self.backend.initChild(entries), 142 + .gpa = self.gpa, 143 + }; 144 + } 145 + 146 + pub fn initMock(gpa: Allocator, entries: u16) !Runtime { 147 + return .{ 148 + .backend = .{ .mock = try .init(entries) }, 149 + .gpa = gpa, 150 + .free_q = .{}, 151 + .submission_q = .{}, 152 + .completion_q = .{}, 153 + }; 154 + } 155 + 156 + pub fn deinit(self: *Runtime) void { 157 + self.backend.deinit(self.gpa); 158 + while (self.free_q.pop()) |task| self.gpa.destroy(task); 159 + while (self.submission_q.pop()) |task| self.gpa.destroy(task); 160 + while (self.completion_q.pop()) |task| self.gpa.destroy(task); 161 + } 162 + 163 + pub fn run(self: *Runtime, condition: RunCondition) !void { 164 + while (true) { 165 + try self.backend.submitAndWait(&self.submission_q); 166 + try self.backend.reapCompletions(self); 167 + switch (condition) { 168 + .once => return, 169 + .until_done => if (self.backend.done() and self.submission_q.empty()) return, 170 + .forever => {}, 171 + } 172 + } 173 + } 174 + 175 + pub fn getTask(self: *Runtime) Allocator.Error!*Task { 176 + return self.free_q.pop() orelse try self.gpa.create(Task); 177 + } 178 + 179 + pub fn noop( 180 + self: *Runtime, 181 + ctx: Context, 182 + ) Allocator.Error!*Task { 183 + const task = try self.getTask(); 184 + task.* = .{ 185 + .userdata = ctx.ptr, 186 + .msg = ctx.msg, 187 + .callback = ctx.cb, 188 + .req = .noop, 189 + }; 190 + 191 + self.submission_q.push(task); 192 + return task; 193 + } 194 + 195 + pub fn timer( 196 + self: *Runtime, 197 + duration: Timespec, 198 + ctx: Context, 199 + ) Allocator.Error!*Task { 200 + const task = try self.getTask(); 201 + task.* = .{ 202 + .userdata = ctx.ptr, 203 + .msg = ctx.msg, 204 + .callback = ctx.cb, 205 + .req = .{ .timer = duration }, 206 + }; 207 + 208 + self.submission_q.push(task); 209 + return task; 210 + } 211 + 212 + pub fn cancelAll(self: *Runtime) Allocator.Error!void { 213 + const task = try self.getTask(); 214 + task.* = .{ 215 + .req = .{ .cancel = .all }, 216 + }; 217 + 218 + self.submission_q.push(task); 219 + } 220 + 221 + pub fn accept( 222 + self: *Runtime, 223 + fd: posix.fd_t, 224 + ctx: Context, 225 + ) Allocator.Error!*Task { 226 + const task = try self.getTask(); 227 + task.* = .{ 228 + .userdata = ctx.ptr, 229 + .msg = ctx.msg, 230 + .callback = ctx.cb, 231 + .req = .{ .accept = fd }, 232 + }; 233 + 234 + self.submission_q.push(task); 235 + return task; 236 + } 237 + 238 + pub fn msgRing( 239 + self: *Runtime, 240 + target: *Runtime, 241 + target_task: *Task, // The task that the target ring will receive. The callbacks of 242 + // this task are what will be called when the target receives the message 243 + 244 + ctx: Context, 245 + ) Allocator.Error!*Task { 246 + // This is the task to send the message 247 + const task = try self.getTask(); 248 + task.* = .{ 249 + .userdata = ctx.ptr, 250 + .msg = ctx.msg, 251 + .callback = ctx.cb, 252 + .req = .{ .msg_ring = .{ 253 + .target = target, 254 + .task = target_task, 255 + } }, 256 + }; 257 + target_task.state = .in_flight; 258 + self.submission_q.push(task); 259 + return task; 260 + } 261 + 262 + pub fn recv( 263 + self: *Runtime, 264 + fd: posix.fd_t, 265 + buffer: []u8, 266 + ctx: Context, 267 + ) Allocator.Error!*Task { 268 + const task = try self.getTask(); 269 + task.* = .{ 270 + .userdata = ctx.ptr, 271 + .msg = ctx.msg, 272 + .callback = ctx.cb, 273 + .req = .{ .recv = .{ 274 + .fd = fd, 275 + .buffer = buffer, 276 + } }, 277 + }; 278 + 279 + self.submission_q.push(task); 280 + return task; 281 + } 282 + 283 + pub fn write( 284 + self: *Runtime, 285 + fd: posix.fd_t, 286 + buffer: []const u8, 287 + ctx: Context, 288 + ) Allocator.Error!*Task { 289 + const task = try self.getTask(); 290 + task.* = .{ 291 + .userdata = ctx.ptr, 292 + .msg = ctx.msg, 293 + .callback = ctx.cb, 294 + .req = .{ .write = .{ 295 + .fd = fd, 296 + .buffer = buffer, 297 + } }, 298 + }; 299 + 300 + self.submission_q.push(task); 301 + return task; 302 + } 303 + 304 + pub fn writev( 305 + self: *Runtime, 306 + fd: posix.fd_t, 307 + vecs: []const posix.iovec_const, 308 + ctx: Context, 309 + ) Allocator.Error!*Task { 310 + const task = try self.getTask(); 311 + task.* = .{ 312 + .userdata = ctx.ptr, 313 + .msg = ctx.msg, 314 + .callback = ctx.cb, 315 + .req = .{ .writev = .{ 316 + .fd = fd, 317 + .vecs = vecs, 318 + } }, 319 + }; 320 + 321 + self.submission_q.push(task); 322 + return task; 323 + } 324 + 325 + pub fn close( 326 + self: *Runtime, 327 + fd: posix.fd_t, 328 + ctx: Context, 329 + ) Allocator.Error!*Task { 330 + const task = try self.getTask(); 331 + task.* = .{ 332 + .userdata = ctx.ptr, 333 + .msg = ctx.msg, 334 + .callback = ctx.cb, 335 + .req = .{ .close = fd }, 336 + }; 337 + 338 + self.submission_q.push(task); 339 + return task; 340 + } 341 + 342 + pub fn poll( 343 + self: *Runtime, 344 + fd: posix.fd_t, 345 + mask: u32, 346 + ctx: Context, 347 + ) Allocator.Error!*Task { 348 + const task = try self.getTask(); 349 + task.* = .{ 350 + .userdata = ctx.ptr, 351 + .msg = ctx.msg, 352 + .callback = ctx.cb, 353 + .req = .{ .poll = .{ .fd = fd, .mask = mask } }, 354 + }; 355 + 356 + self.submission_q.push(task); 357 + return task; 358 + } 359 + 360 + pub fn socket( 361 + self: *Runtime, 362 + domain: u32, 363 + socket_type: u32, 364 + protocol: u32, 365 + ctx: Context, 366 + ) Allocator.Error!*Task { 367 + const task = try self.getTask(); 368 + task.* = .{ 369 + .userdata = ctx.ptr, 370 + .msg = ctx.msg, 371 + .callback = ctx.cb, 372 + .req = .{ .socket = .{ .domain = domain, .type = socket_type, .protocol = protocol } }, 373 + }; 374 + 375 + self.submission_q.push(task); 376 + return task; 377 + } 378 + 379 + pub fn connect( 380 + self: *Runtime, 381 + fd: posix.socket_t, 382 + addr: *posix.sockaddr, 383 + addr_len: posix.socklen_t, 384 + ctx: Context, 385 + ) Allocator.Error!*Task { 386 + const task = try self.getTask(); 387 + task.* = .{ 388 + .userdata = ctx.ptr, 389 + .msg = ctx.msg, 390 + .callback = ctx.cb, 391 + .req = .{ .connect = .{ .fd = fd, .addr = addr, .addr_len = addr_len } }, 392 + }; 393 + 394 + self.submission_q.push(task); 395 + return task; 396 + } 397 + }; 398 + 399 + pub const Op = enum { 400 + noop, 401 + deadline, 402 + timer, 403 + cancel, 404 + accept, 405 + msg_ring, 406 + recv, 407 + write, 408 + writev, 409 + close, 410 + poll, 411 + socket, 412 + connect, 413 + 414 + /// userfd is meant to send file descriptors between Runtime instances (using msgRing) 415 + userfd, 416 + /// usermsg is meant to send a u16 between runtime instances (using msgRing) 417 + usermsg, 418 + /// userptr is meant to send a pointer between runtime instances (using msgRing) 419 + userptr, 420 + }; 421 + 422 + pub const Request = union(Op) { 423 + noop, 424 + deadline: Timespec, 425 + timer: Timespec, 426 + cancel: union(enum) { 427 + all, 428 + task: *Task, 429 + }, 430 + accept: posix.fd_t, 431 + msg_ring: struct { 432 + target: *Runtime, 433 + task: *Task, 434 + }, 435 + recv: struct { 436 + fd: posix.fd_t, 437 + buffer: []u8, 438 + }, 439 + write: struct { 440 + fd: posix.fd_t, 441 + buffer: []const u8, 442 + }, 443 + writev: struct { 444 + fd: posix.fd_t, 445 + vecs: []const posix.iovec_const, 446 + }, 447 + close: posix.fd_t, 448 + poll: struct { 449 + fd: posix.fd_t, 450 + mask: u32, 451 + }, 452 + socket: struct { 453 + domain: u32, 454 + type: u32, 455 + protocol: u32, 456 + }, 457 + connect: struct { 458 + fd: posix.socket_t, 459 + addr: *posix.sockaddr, 460 + addr_len: posix.socklen_t, 461 + }, 462 + 463 + userfd, 464 + usermsg, 465 + userptr, 466 + }; 467 + 468 + pub const Result = union(Op) { 469 + noop, 470 + deadline: ResultError!void, 471 + timer: ResultError!void, 472 + cancel: CancelError!void, 473 + accept: ResultError!posix.fd_t, 474 + msg_ring: ResultError!void, 475 + recv: RecvError!usize, 476 + write: ResultError!usize, 477 + writev: ResultError!usize, 478 + close: ResultError!void, 479 + poll: ResultError!void, 480 + socket: ResultError!posix.fd_t, 481 + connect: ResultError!void, 482 + 483 + userfd: anyerror!posix.fd_t, 484 + usermsg: u16, 485 + userptr: anyerror!?*anyopaque, 486 + }; 487 + 488 + pub const ResultError = error{ 489 + /// The request was invalid 490 + Invalid, 491 + /// The request was canceled 492 + Canceled, 493 + /// An unexpected error occured 494 + Unexpected, 495 + }; 496 + 497 + pub const CancelError = ResultError || error{ 498 + /// The entry to cancel couldn't be found 499 + EntryNotFound, 500 + /// The entry couldn't be canceled 501 + NotCanceled, 502 + }; 503 + 504 + pub const RecvError = ResultError || error{ 505 + /// The entry to cancel couldn't be found 506 + ConnectionResetByPeer, 507 + }; 508 + 509 + test { 510 + _ = @import("net.zig"); 511 + _ = @import("queue.zig"); 512 + _ = @import("tls.zig"); 513 + 514 + _ = @import("Mock.zig"); 515 + 516 + if (has_io_uring) _ = @import("Uring.zig"); 517 + if (has_kqueue) _ = @import("Kqueue.zig"); 518 + } 519 + 520 + /// Foo is only for testing 521 + const Foo = struct { 522 + bar: usize = 0, 523 + 524 + fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 525 + const self = task.userdataCast(Foo); 526 + self.bar += 1; 527 + } 528 + }; 529 + 530 + test "runtime: noop" { 531 + var rt: io.Runtime = try .init(std.testing.allocator, 16); 532 + defer rt.deinit(); 533 + 534 + var foo: Foo = .{}; 535 + 536 + const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 537 + 538 + // noop is triggered synchronously with submit. If we wait, we'll be waiting forever 539 + _ = try rt.noop(ctx); 540 + try rt.run(.once); 541 + try std.testing.expectEqual(1, foo.bar); 542 + _ = try rt.noop(ctx); 543 + _ = try rt.noop(ctx); 544 + try rt.run(.once); 545 + try std.testing.expectEqual(3, foo.bar); 546 + } 547 + 548 + test "runtime: timer" { 549 + var rt: io.Runtime = try .init(std.testing.allocator, 16); 550 + defer rt.deinit(); 551 + 552 + var foo: Foo = .{}; 553 + 554 + const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 555 + 556 + const start = std.time.nanoTimestamp(); 557 + const end = start + 100 * std.time.ns_per_ms; 558 + _ = try rt.timer(.{ .nsec = 100 * std.time.ns_per_ms }, ctx); 559 + try rt.run(.once); 560 + try std.testing.expect(std.time.nanoTimestamp() > end); 561 + try std.testing.expectEqual(1, foo.bar); 562 + } 563 + 564 + test "runtime: poll" { 565 + var rt: io.Runtime = try .init(std.testing.allocator, 16); 566 + defer rt.deinit(); 567 + 568 + var foo: Foo = .{}; 569 + const pipe = try posix.pipe2(.{ .CLOEXEC = true }); 570 + 571 + const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 572 + 573 + _ = try rt.poll(pipe[0], posix.POLL.IN, ctx); 574 + try std.testing.expectEqual(1, rt.submission_q.len()); 575 + 576 + _ = try posix.write(pipe[1], "io_uring is the best"); 577 + try rt.run(.once); 578 + try std.testing.expectEqual(1, foo.bar); 579 + } 580 + 581 + test "runtime: deadline doesn't call user callback" { 582 + const gpa = std.testing.allocator; 583 + var rt = try io.Runtime.init(gpa, 16); 584 + defer rt.deinit(); 585 + 586 + var foo: Foo = .{}; 587 + const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 588 + const task = try rt.noop(ctx); 589 + try task.setDeadline(&rt, .{ .sec = 1 }); 590 + 591 + try rt.run(.until_done); 592 + 593 + // Callback only called once 594 + try std.testing.expectEqual(1, foo.bar); 595 + } 596 + 597 + test "runtime: timeout" { 598 + const gpa = std.testing.allocator; 599 + var rt = try io.Runtime.init(gpa, 16); 600 + defer rt.deinit(); 601 + 602 + var foo: Foo = .{}; 603 + const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 604 + 605 + const delay = 1 * std.time.ns_per_ms; 606 + _ = try rt.timer(.{ .nsec = delay }, ctx); 607 + 608 + const start = std.time.nanoTimestamp(); 609 + try rt.run(.until_done); 610 + try std.testing.expect(start + delay < std.time.nanoTimestamp()); 611 + try std.testing.expectEqual(1, foo.bar); 612 + } 613 + 614 + test "runtime: cancel" { 615 + const gpa = std.testing.allocator; 616 + var rt = try io.Runtime.init(gpa, 16); 617 + defer rt.deinit(); 618 + 619 + var foo: Foo = .{}; 620 + const ctx: Context = .{ .ptr = &foo, .cb = Foo.callback }; 621 + 622 + const delay = 1 * std.time.ns_per_s; 623 + const task = try rt.timer(.{ .nsec = delay }, ctx); 624 + 625 + try task.cancel(&rt, .{}); 626 + 627 + const start = std.time.nanoTimestamp(); 628 + try rt.run(.until_done); 629 + // Expect that we didn't delay long enough 630 + try std.testing.expect(start + delay > std.time.nanoTimestamp()); 631 + try std.testing.expectEqual(1, foo.bar); 632 + } 633 + 634 + test "runtime: cancel all" { 635 + const gpa = std.testing.allocator; 636 + var rt = try io.Runtime.init(gpa, 16); 637 + defer rt.deinit(); 638 + 639 + const Foo2 = struct { 640 + bar: usize = 0, 641 + 642 + fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 643 + const self = task.userdataCast(@This()); 644 + const result = task.result.?; 645 + _ = result.timer catch |err| { 646 + switch (err) { 647 + error.Canceled => self.bar += 1, 648 + else => {}, 649 + } 650 + }; 651 + } 652 + }; 653 + var foo: Foo2 = .{}; 654 + const ctx: Context = .{ .ptr = &foo, .cb = Foo2.callback }; 655 + 656 + const delay = 1 * std.time.ns_per_s; 657 + _ = try rt.timer(.{ .nsec = delay }, ctx); 658 + _ = try rt.timer(.{ .nsec = delay }, ctx); 659 + _ = try rt.timer(.{ .nsec = delay }, ctx); 660 + _ = try rt.timer(.{ .nsec = delay }, ctx); 661 + 662 + try rt.cancelAll(); 663 + const start = std.time.nanoTimestamp(); 664 + try rt.run(.until_done); 665 + // Expect that we didn't delay long enough 666 + try std.testing.expect(start + delay > std.time.nanoTimestamp()); 667 + try std.testing.expectEqual(4, foo.bar); 668 + } 669 + 670 + test "runtime: msgRing" { 671 + const gpa = std.testing.allocator; 672 + var rt1 = try io.Runtime.init(gpa, 16); 673 + defer rt1.deinit(); 674 + 675 + var rt2 = try rt1.initChild(16); 676 + defer rt2.deinit(); 677 + 678 + const Foo2 = struct { 679 + rt1: bool = false, 680 + rt2: bool = false, 681 + 682 + const Msg = enum { rt1, rt2 }; 683 + 684 + fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 685 + const self = task.userdataCast(@This()); 686 + const msg = task.msgToEnum(Msg); 687 + switch (msg) { 688 + .rt1 => self.rt1 = true, 689 + .rt2 => self.rt2 = true, 690 + } 691 + } 692 + }; 693 + 694 + var foo: Foo2 = .{}; 695 + 696 + // The task we will send from rt1 to rt2 697 + const target_task = try rt1.getTask(); 698 + target_task.* = .{ 699 + .userdata = &foo, 700 + .callback = Foo2.callback, 701 + .msg = @intFromEnum(Foo2.Msg.rt2), 702 + .result = .{ .usermsg = 0 }, 703 + }; 704 + 705 + _ = try rt1.msgRing( 706 + &rt2, 707 + target_task, 708 + .{ .cb = Foo2.callback, .msg = @intFromEnum(Foo2.Msg.rt1), .ptr = &foo }, 709 + ); 710 + 711 + try rt1.run(.until_done); 712 + try std.testing.expect(foo.rt1); 713 + try std.testing.expect(!foo.rt2); 714 + try rt2.run(.until_done); 715 + try std.testing.expect(foo.rt1); 716 + try std.testing.expect(foo.rt2); 717 + }
+218
src/net.zig
··· 1 + const std = @import("std"); 2 + const io = @import("main.zig"); 3 + 4 + const posix = std.posix; 5 + const Allocator = std.mem.Allocator; 6 + const Uri = std.Uri; 7 + 8 + const assert = std.debug.assert; 9 + 10 + pub fn tcpConnectToHost( 11 + rt: *io.Runtime, 12 + host: []const u8, 13 + port: u16, 14 + ctx: io.Context, 15 + ) !*ConnectTask { 16 + // TODO: getAddressList could be rewritten to be async. It accesses the filesystem and could 17 + // make a DNS request 18 + const list = try std.net.getAddressList(rt.gpa, host, port); 19 + defer list.deinit(); 20 + 21 + const addr = for (list.addrs) |addr| { 22 + break addr; 23 + } else return error.AddressNotFound; 24 + 25 + return tcpConnectToAddr(rt, addr, ctx); 26 + } 27 + 28 + pub fn tcpConnectToAddr( 29 + rt: *io.Runtime, 30 + addr: std.net.Address, 31 + ctx: io.Context, 32 + ) Allocator.Error!*ConnectTask { 33 + const conn = try rt.gpa.create(ConnectTask); 34 + errdefer rt.gpa.destroy(conn); 35 + 36 + conn.* = .{ 37 + .ctx = ctx, 38 + 39 + .addr = addr, 40 + .fd = null, 41 + .task = undefined, 42 + }; 43 + 44 + conn.task = try rt.socket( 45 + conn.addr.any.family, 46 + posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 47 + posix.IPPROTO.TCP, 48 + .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg }, 49 + ); 50 + 51 + return conn; 52 + } 53 + 54 + pub const ConnectTask = struct { 55 + ctx: io.Context, 56 + 57 + addr: std.net.Address, 58 + fd: ?posix.fd_t, 59 + 60 + /// Task is the current task we are operating on. We store this to provide cancelation 61 + task: *io.Task, 62 + 63 + pub const Msg = enum { 64 + socket, 65 + connect, 66 + }; 67 + 68 + /// Cancels the current task. Not guaranteed to actually cancel. User's callback will get an 69 + /// error.Canceled if cancelation was successful, otherwise the operation will complete as 70 + /// normal and this is essentially a no-op 71 + pub fn cancel(self: *ConnectTask, rt: *io.Runtime) void { 72 + _ = self.task.cancel(rt, null, 0, io.noopCallback) catch {}; 73 + } 74 + 75 + pub fn handleMsg(rt: *io.Runtime, task: io.Task) anyerror!void { 76 + const self = task.userdataCast(ConnectTask); 77 + const result = task.result.?; 78 + switch (task.msgToEnum(Msg)) { 79 + .socket => { 80 + assert(result == .socket); 81 + self.fd = result.socket catch |err| { 82 + defer rt.gpa.destroy(self); 83 + try self.ctx.cb(rt, .{ 84 + .userdata = self.ctx.ptr, 85 + .msg = self.ctx.msg, 86 + .result = .{ .userfd = err }, 87 + .callback = self.ctx.cb, 88 + .req = .userfd, 89 + }); 90 + return; 91 + }; 92 + 93 + self.task = try rt.connect( 94 + self.fd.?, 95 + &self.addr.any, 96 + self.addr.getOsSockLen(), 97 + .{ .ptr = self, .msg = @intFromEnum(Msg.connect), .cb = ConnectTask.handleMsg }, 98 + ); 99 + }, 100 + 101 + .connect => { 102 + assert(result == .connect); 103 + defer rt.gpa.destroy(self); 104 + 105 + _ = result.connect catch |err| { 106 + try self.ctx.cb(rt, .{ 107 + .userdata = self.ctx.ptr, 108 + .msg = self.ctx.msg, 109 + .result = .{ .userfd = err }, 110 + .callback = self.ctx.cb, 111 + .req = .userfd, 112 + }); 113 + _ = try rt.close(self.fd.?, .{}); 114 + return; 115 + }; 116 + 117 + try self.ctx.cb(rt, .{ 118 + .userdata = self.ctx.ptr, 119 + .msg = self.ctx.msg, 120 + .result = .{ .userfd = self.fd.? }, 121 + .callback = self.ctx.cb, 122 + .req = .userfd, 123 + }); 124 + }, 125 + } 126 + } 127 + }; 128 + 129 + test "tcp connect" { 130 + var rt: io.Runtime = try .init(std.testing.allocator, 16); 131 + defer rt.deinit(); 132 + 133 + const addr: std.net.Address = try .parseIp4("127.0.0.1", 80); 134 + 135 + { 136 + // Happy path 137 + const conn = try tcpConnectToAddr(&rt, addr, .{}); 138 + errdefer std.testing.allocator.destroy(conn); 139 + 140 + const task1 = rt.submission_q.pop().?; 141 + defer std.testing.allocator.destroy(task1); 142 + try std.testing.expect(task1.req == .socket); 143 + try std.testing.expect(rt.submission_q.pop() == null); 144 + 145 + const fd: posix.fd_t = 7; 146 + try ConnectTask.handleMsg(&rt, .{ 147 + .userdata = conn, 148 + .msg = @intFromEnum(ConnectTask.Msg.socket), 149 + .result = .{ .socket = fd }, 150 + .req = .userfd, 151 + }); 152 + 153 + const task2 = rt.submission_q.pop().?; 154 + defer std.testing.allocator.destroy(task2); 155 + try std.testing.expect(task2.req == .connect); 156 + try std.testing.expect(rt.submission_q.pop() == null); 157 + 158 + try ConnectTask.handleMsg(&rt, .{ 159 + .userdata = conn, 160 + .msg = @intFromEnum(ConnectTask.Msg.connect), 161 + .result = .{ .connect = {} }, 162 + .req = .userfd, 163 + }); 164 + try std.testing.expect(rt.submission_q.pop() == null); 165 + } 166 + 167 + { 168 + // socket error 169 + const conn = try tcpConnectToAddr(&rt, addr, .{}); 170 + errdefer std.testing.allocator.destroy(conn); 171 + 172 + const task1 = rt.submission_q.pop().?; 173 + defer std.testing.allocator.destroy(task1); 174 + 175 + try ConnectTask.handleMsg(&rt, .{ 176 + .userdata = conn, 177 + .msg = @intFromEnum(ConnectTask.Msg.socket), 178 + .result = .{ .socket = error.Canceled }, 179 + .req = .userfd, 180 + }); 181 + try std.testing.expect(rt.submission_q.pop() == null); 182 + } 183 + 184 + { 185 + // connect error 186 + const conn = try tcpConnectToAddr(&rt, addr, .{}); 187 + errdefer std.testing.allocator.destroy(conn); 188 + 189 + const task1 = rt.submission_q.pop().?; 190 + defer std.testing.allocator.destroy(task1); 191 + try std.testing.expect(task1.req == .socket); 192 + try std.testing.expect(rt.submission_q.pop() == null); 193 + 194 + const fd: posix.fd_t = 7; 195 + try ConnectTask.handleMsg(&rt, .{ 196 + .userdata = conn, 197 + .msg = @intFromEnum(ConnectTask.Msg.socket), 198 + .result = .{ .socket = fd }, 199 + .req = .userfd, 200 + }); 201 + 202 + const task2 = rt.submission_q.pop().?; 203 + defer std.testing.allocator.destroy(task2); 204 + try std.testing.expect(task2.req == .connect); 205 + try std.testing.expect(rt.submission_q.pop() == null); 206 + 207 + try ConnectTask.handleMsg(&rt, .{ 208 + .userdata = conn, 209 + .msg = @intFromEnum(ConnectTask.Msg.connect), 210 + .result = .{ .connect = error.Canceled }, 211 + .req = .noop, 212 + }); 213 + const task3 = rt.submission_q.pop().?; 214 + defer std.testing.allocator.destroy(task3); 215 + try std.testing.expect(task3.req == .close); 216 + try std.testing.expect(rt.submission_q.pop() == null); 217 + } 218 + }
+111
src/queue.zig
··· 1 + // This code is mostly a copy of the intrusive queue code from libxev. I've modified it to be a 2 + // doubly linked list that also ensures a certain state is set on each node when put into the list 3 + // 4 + // MIT License 5 + // 6 + // Copyright (c) 2023 Mitchell Hashimoto 7 + // 8 + // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and 9 + // associated documentation files (the "Software"), to deal in the Software without restriction, 10 + // including without limitation the rights to use, copy, modify, merge, publish, distribute, 11 + // sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is 12 + // furnished to do so, subject to the following conditions: 13 + // 14 + // The above copyright notice and this permission notice shall be included in all copies or 15 + // substantial portions of the Software. 16 + // 17 + // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT 18 + // NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19 + // NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 20 + // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 + // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 22 + 23 + const std = @import("std"); 24 + const assert = std.debug.assert; 25 + 26 + /// An intrusive queue implementation. The type T must have a field 27 + /// "next" of type `?*T` and a field "state" which is an enum with a value matching the passed in 28 + /// value 29 + pub fn Intrusive(comptime T: type, comptime state: @Type(.enum_literal)) type { 30 + return struct { 31 + const Self = @This(); 32 + 33 + const set_state = state; 34 + 35 + /// Head is the front of the queue and tail is the back of the queue. 36 + head: ?*T = null, 37 + tail: ?*T = null, 38 + 39 + /// Enqueue a new element to the back of the queue. 40 + pub fn push(self: *Self, v: *T) void { 41 + assert(v.next == null); 42 + v.state = set_state; 43 + 44 + if (self.tail) |tail| { 45 + // If we have elements in the queue, then we add a new tail. 46 + tail.next = v; 47 + v.prev = tail; 48 + self.tail = v; 49 + } else { 50 + // No elements in the queue we setup the initial state. 51 + self.head = v; 52 + self.tail = v; 53 + } 54 + } 55 + 56 + /// Dequeue the next element from the queue. 57 + pub fn pop(self: *Self) ?*T { 58 + // The next element is in "head". 59 + const next = self.head orelse return null; 60 + 61 + // If the head and tail are equal this is the last element 62 + // so we also set tail to null so we can now be empty. 63 + if (self.head == self.tail) self.tail = null; 64 + 65 + // Head is whatever is next (if we're the last element, 66 + // this will be null); 67 + self.head = next.next; 68 + if (self.head) |head| head.prev = null; 69 + 70 + // We set the "next" field to null so that this element 71 + // can be inserted again. 72 + next.next = null; 73 + next.prev = null; 74 + return next; 75 + } 76 + 77 + /// Returns true if the queue is empty. 78 + pub fn empty(self: Self) bool { 79 + return self.head == null; 80 + } 81 + 82 + /// Removes the item from the queue. Asserts that Queue contains the item 83 + pub fn remove(self: *Self, item: *T) void { 84 + assert(self.hasItem(item)); 85 + if (item.prev) |prev| prev.next = item.next else self.head = item.next; 86 + 87 + if (item.next) |next| next.prev = item.prev else self.tail = item.prev; 88 + 89 + item.prev = null; 90 + item.next = null; 91 + } 92 + 93 + pub fn hasItem(self: Self, item: *T) bool { 94 + var maybe_node = self.head; 95 + while (maybe_node) |node| { 96 + if (node == item) return true; 97 + maybe_node = node.next; 98 + } else return false; 99 + } 100 + 101 + pub fn len(self: Self) usize { 102 + var count: usize = 0; 103 + var maybe_node = self.head; 104 + while (maybe_node) |node| { 105 + count += 1; 106 + maybe_node = node.next; 107 + } 108 + return count; 109 + } 110 + }; 111 + }
+466
src/tls.zig
··· 1 + const std = @import("std"); 2 + const tls = @import("tls"); 3 + const io = @import("main.zig"); 4 + 5 + const Allocator = std.mem.Allocator; 6 + const CertBundle = tls.config.cert.Bundle; 7 + const assert = std.debug.assert; 8 + const mem = std.mem; 9 + const posix = std.posix; 10 + 11 + pub const Client = struct { 12 + gpa: Allocator, 13 + fd: posix.fd_t, 14 + tls: tls.nonblock.Connection, 15 + recv_task: ?*io.Task = null, 16 + 17 + read_buf: [tls.max_ciphertext_record_len]u8 = undefined, 18 + read_end: usize = 0, 19 + 20 + cleartext_buf: std.ArrayListUnmanaged(u8) = .empty, 21 + ciphertext_buf: std.ArrayListUnmanaged(u8) = .empty, 22 + written: usize = 0, 23 + 24 + userdata: ?*anyopaque = null, 25 + callback: *const fn (*io.Runtime, io.Task) anyerror!void = io.noopCallback, 26 + close_msg: u16 = 0, 27 + write_msg: u16 = 0, 28 + recv_msg: u16 = 0, 29 + 30 + pub const HandshakeTask = struct { 31 + userdata: ?*anyopaque, 32 + callback: io.Callback, 33 + msg: u16, 34 + 35 + fd: posix.fd_t, 36 + buffer: [tls.max_ciphertext_record_len]u8 = undefined, 37 + read_end: usize = 0, 38 + handshake: tls.nonblock.Client, 39 + task: *io.Task, 40 + 41 + pub fn handleMsg(rt: *io.Runtime, task: io.Task) anyerror!void { 42 + const self = task.userdataCast(HandshakeTask); 43 + const result = task.result.?; 44 + 45 + switch (result) { 46 + .write => { 47 + _ = result.write catch |err| { 48 + defer rt.gpa.destroy(self); 49 + // send the error to the callback 50 + try self.callback(rt, .{ 51 + .userdata = self.userdata, 52 + .msg = self.msg, 53 + .result = .{ .userptr = err }, 54 + .callback = self.callback, 55 + .req = .userptr, 56 + }); 57 + return; 58 + }; 59 + 60 + if (self.handshake.done()) { 61 + defer rt.gpa.destroy(self); 62 + // Handshake is done. Create a client and deliver it to the callback 63 + const client = try self.initClient(rt.gpa); 64 + try self.callback(rt, .{ 65 + .userdata = self.userdata, 66 + .msg = self.msg, 67 + .result = .{ .userptr = client }, 68 + .callback = self.callback, 69 + .req = .userptr, 70 + }); 71 + return; 72 + } 73 + 74 + // Arm a recv task 75 + self.task = try rt.recv(self.fd, &self.buffer, .{ 76 + .ptr = self, 77 + .cb = handleMsg, 78 + }); 79 + }, 80 + 81 + .recv => { 82 + const n = result.recv catch |err| { 83 + defer rt.gpa.destroy(self); 84 + // send the error to the callback 85 + try self.callback(rt, .{ 86 + .userdata = self.userdata, 87 + .msg = self.msg, 88 + .result = .{ .userptr = err }, 89 + .callback = self.callback, 90 + .req = .userptr, 91 + }); 92 + return; 93 + }; 94 + 95 + self.read_end += n; 96 + const slice = self.buffer[0..self.read_end]; 97 + var scratch: [tls.max_ciphertext_record_len]u8 = undefined; 98 + const r = try self.handshake.run(slice, &scratch); 99 + 100 + if (r.unused_recv.len > 0) { 101 + // Arm a recv task 102 + self.task = try rt.recv(self.fd, self.buffer[self.read_end..], .{ 103 + .ptr = self, 104 + .cb = handleMsg, 105 + }); 106 + return; 107 + } 108 + 109 + if (r.send.len > 0) { 110 + // Queue another send 111 + @memcpy(self.buffer[0..r.send.len], r.send); 112 + self.task = try rt.write( 113 + self.fd, 114 + self.buffer[0..r.send.len], 115 + .{ .ptr = self, .cb = HandshakeTask.handleMsg }, 116 + ); 117 + return; 118 + } 119 + 120 + if (self.handshake.done()) { 121 + defer rt.gpa.destroy(self); 122 + // Handshake is done. Create a client and deliver it to the callback 123 + const client = try self.initClient(rt.gpa); 124 + try self.callback(rt, .{ 125 + .userdata = self.userdata, 126 + .msg = self.msg, 127 + .result = .{ .userptr = client }, 128 + .callback = self.callback, 129 + .req = .userptr, 130 + }); 131 + return; 132 + } 133 + }, 134 + 135 + else => unreachable, 136 + } 137 + } 138 + 139 + fn initClient(self: *HandshakeTask, gpa: Allocator) !*Client { 140 + const client = try gpa.create(Client); 141 + client.* = .{ 142 + .gpa = gpa, 143 + .fd = self.fd, 144 + .tls = .{ .cipher = self.handshake.inner.cipher }, 145 + }; 146 + return client; 147 + } 148 + 149 + /// Tries to cancel the handshake. Callback will receive an error.Canceled if cancelation 150 + /// was successful, otherwise handhsake will proceed 151 + pub fn cancel(self: *HandshakeTask, rt: *io.Runtime) void { 152 + self.task.cancel(rt, null, 0, io.noopCallback) catch {}; 153 + } 154 + }; 155 + 156 + const Msg = enum { 157 + write, 158 + recv, 159 + close_notify, 160 + }; 161 + 162 + /// Initializes a handshake, which will ultimately deliver a Client to the callback via a 163 + /// userptr result 164 + pub fn init( 165 + rt: *io.Runtime, 166 + fd: posix.fd_t, 167 + opts: tls.config.Client, 168 + ctx: io.Context, 169 + ) !*HandshakeTask { 170 + const hs = try rt.gpa.create(HandshakeTask); 171 + hs.* = .{ 172 + .userdata = ctx.ptr, 173 + .callback = ctx.cb, 174 + .msg = ctx.msg, 175 + 176 + .fd = fd, 177 + .handshake = .init(opts), 178 + .task = undefined, 179 + }; 180 + 181 + const result = try hs.handshake.run("", &hs.buffer); 182 + const hs_ctx: io.Context = .{ .ptr = hs, .cb = HandshakeTask.handleMsg }; 183 + hs.task = try rt.write(hs.fd, result.send, hs_ctx); 184 + return hs; 185 + } 186 + 187 + pub fn deinit(self: *Client, gpa: Allocator) void { 188 + self.ciphertext_buf.deinit(gpa); 189 + self.cleartext_buf.deinit(gpa); 190 + } 191 + 192 + pub fn close(self: *Client, gpa: Allocator, rt: *io.Runtime) !void { 193 + // close notify is 2 bytes long 194 + const len = self.tls.encryptedLength(2); 195 + try self.ciphertext_buf.ensureUnusedCapacity(gpa, len); 196 + const buf = self.ciphertext_buf.unusedCapacitySlice(); 197 + const msg = try self.tls.close(buf); 198 + 199 + self.ciphertext_buf.items.len += msg.len; 200 + _ = try rt.write(self.fd, self.ciphertext_buf.items[self.written..], .{ 201 + .ptr = self, 202 + .cb = Client.onCompletion, 203 + .msg = @intFromEnum(Client.Msg.close_notify), 204 + }); 205 + 206 + if (self.recv_task) |task| { 207 + try task.cancel(rt, .{}); 208 + self.recv_task = null; 209 + } 210 + } 211 + 212 + fn onCompletion(rt: *io.Runtime, task: io.Task) anyerror!void { 213 + const self = task.userdataCast(Client); 214 + const result = task.result.?; 215 + 216 + switch (task.msgToEnum(Client.Msg)) { 217 + .recv => { 218 + assert(result == .recv); 219 + self.recv_task = null; 220 + const n = result.recv catch |err| { 221 + return self.callback(rt, .{ 222 + .userdata = self.userdata, 223 + .msg = self.recv_msg, 224 + .callback = self.callback, 225 + .req = .{ .recv = .{ .fd = self.fd, .buffer = &self.read_buf } }, 226 + .result = .{ .recv = err }, 227 + }); 228 + }; 229 + self.read_end += n; 230 + const end = self.read_end; 231 + const r = try self.tls.decrypt(self.read_buf[0..end], self.read_buf[0..end]); 232 + 233 + if (r.cleartext.len > 0) { 234 + try self.callback(rt, .{ 235 + .userdata = self.userdata, 236 + .msg = self.recv_msg, 237 + .callback = self.callback, 238 + .req = .{ .recv = .{ .fd = self.fd, .buffer = &self.read_buf } }, 239 + .result = .{ .recv = r.cleartext.len }, 240 + }); 241 + } 242 + mem.copyForwards(u8, &self.read_buf, r.unused_ciphertext); 243 + self.read_end = r.unused_ciphertext.len; 244 + 245 + if (r.closed) { 246 + _ = try rt.close(self.fd, self.closeContext()); 247 + return; 248 + } 249 + 250 + self.recv_task = try rt.recv( 251 + self.fd, 252 + self.read_buf[self.read_end..], 253 + self.recvContext(), 254 + ); 255 + }, 256 + 257 + .write => { 258 + assert(result == .write); 259 + const n = result.write catch { 260 + return self.callback(rt, .{ 261 + .userdata = self.userdata, 262 + .msg = self.write_msg, 263 + .callback = self.callback, 264 + .req = .{ .write = .{ .fd = self.fd, .buffer = self.ciphertext_buf.items } }, 265 + .result = .{ .write = error.Unexpected }, 266 + }); 267 + }; 268 + self.written += n; 269 + 270 + if (self.written < self.ciphertext_buf.items.len) { 271 + _ = try rt.write( 272 + self.fd, 273 + self.ciphertext_buf.items[self.written..], 274 + self.writeContext(), 275 + ); 276 + } else { 277 + defer { 278 + self.written = 0; 279 + self.ciphertext_buf.clearRetainingCapacity(); 280 + } 281 + return self.callback(rt, .{ 282 + .userdata = self.userdata, 283 + .msg = self.write_msg, 284 + .callback = self.callback, 285 + .req = .{ .write = .{ .fd = self.fd, .buffer = self.ciphertext_buf.items } }, 286 + .result = .{ .write = self.written }, 287 + }); 288 + } 289 + }, 290 + 291 + .close_notify => { 292 + assert(result == .write); 293 + const n = result.write catch { 294 + return self.callback(rt, .{ 295 + .userdata = self.userdata, 296 + .msg = self.close_msg, 297 + .callback = self.callback, 298 + .req = .{ .close = self.fd }, 299 + .result = .{ .close = error.Unexpected }, 300 + }); 301 + }; 302 + 303 + self.written += n; 304 + 305 + if (self.written < self.ciphertext_buf.items.len) { 306 + _ = try rt.write(self.fd, self.ciphertext_buf.items[self.written..], .{ 307 + .ptr = self, 308 + .cb = Client.onCompletion, 309 + .msg = @intFromEnum(Client.Msg.close_notify), 310 + }); 311 + } else { 312 + self.written = 0; 313 + self.ciphertext_buf.clearRetainingCapacity(); 314 + _ = try rt.close(self.fd, self.closeContext()); 315 + } 316 + }, 317 + } 318 + } 319 + 320 + pub fn recv(self: *Client, rt: *io.Runtime) !void { 321 + if (self.recv_task != null) return; 322 + self.recv_task = try rt.recv( 323 + self.fd, 324 + self.read_buf[self.read_end..], 325 + self.recvContext(), 326 + ); 327 + } 328 + 329 + pub fn write(self: *Client, gpa: Allocator, bytes: []const u8) Allocator.Error!void { 330 + try self.cleartext_buf.appendSlice(gpa, bytes); 331 + } 332 + 333 + pub fn flush(self: *Client, gpa: Allocator, rt: *io.Runtime) !void { 334 + const len = self.tls.encryptedLength(self.cleartext_buf.items.len); 335 + try self.ciphertext_buf.ensureUnusedCapacity(gpa, len); 336 + const slice = self.ciphertext_buf.unusedCapacitySlice(); 337 + const result = try self.tls.encrypt(self.cleartext_buf.items, slice); 338 + self.ciphertext_buf.appendSliceAssumeCapacity(result.ciphertext); 339 + self.cleartext_buf.replaceRangeAssumeCapacity(0, result.cleartext_pos, ""); 340 + 341 + _ = try rt.write( 342 + self.fd, 343 + self.ciphertext_buf.items.len, 344 + self, 345 + @intFromEnum(Client.Msg.write), 346 + Client.onCompletion, 347 + ); 348 + } 349 + 350 + fn closeContext(self: Client) io.Context { 351 + return .{ .ptr = self.userdata, .cb = self.callback, .msg = self.close_msg }; 352 + } 353 + 354 + fn recvContext(self: *Client) io.Context { 355 + return .{ 356 + .ptr = self, 357 + .cb = Client.onCompletion, 358 + .msg = @intFromEnum(Client.Msg.recv), 359 + }; 360 + } 361 + 362 + fn writeContext(self: *Client) io.Context { 363 + return .{ 364 + .ptr = self, 365 + .cb = Client.onCompletion, 366 + .msg = @intFromEnum(Client.Msg.write), 367 + }; 368 + } 369 + }; 370 + 371 + test "tls: Client" { 372 + const net = @import("net.zig"); 373 + const gpa = std.testing.allocator; 374 + 375 + var rt = try io.Runtime.init(gpa, 16); 376 + defer rt.deinit(); 377 + 378 + const Foo = struct { 379 + const Self = @This(); 380 + gpa: Allocator, 381 + fd: ?posix.fd_t = null, 382 + tls: ?*Client = null, 383 + 384 + const Msg = enum { 385 + connect, 386 + handshake, 387 + close, 388 + write, 389 + recv, 390 + }; 391 + 392 + fn callback(_: *io.Runtime, task: io.Task) anyerror!void { 393 + const self = task.userdataCast(Self); 394 + const result = task.result.?; 395 + errdefer { 396 + if (self.tls) |client| { 397 + client.deinit(self.gpa); 398 + self.gpa.destroy(client); 399 + self.tls = null; 400 + } 401 + } 402 + 403 + switch (task.msgToEnum(Msg)) { 404 + .connect => { 405 + self.fd = try result.userfd; 406 + }, 407 + .handshake => { 408 + const ptr = try result.userptr; 409 + self.tls = @ptrCast(@alignCast(ptr)); 410 + self.tls.?.userdata = self; 411 + self.tls.?.close_msg = @intFromEnum(@This().Msg.close); 412 + self.tls.?.write_msg = @intFromEnum(@This().Msg.write); 413 + self.tls.?.recv_msg = @intFromEnum(@This().Msg.recv); 414 + self.tls.?.callback = @This().callback; 415 + }, 416 + .close => { 417 + self.tls.?.deinit(self.gpa); 418 + self.gpa.destroy(self.tls.?); 419 + self.tls = null; 420 + self.fd = null; 421 + }, 422 + 423 + else => {}, 424 + } 425 + } 426 + }; 427 + 428 + var foo: Foo = .{ .gpa = gpa }; 429 + defer { 430 + if (foo.tls) |client| { 431 + client.deinit(gpa); 432 + gpa.destroy(client); 433 + } 434 + if (foo.fd) |fd| posix.close(fd); 435 + } 436 + 437 + _ = try net.tcpConnectToHost( 438 + &rt, 439 + "google.com", 440 + 443, 441 + .{ .ptr = &foo, .cb = Foo.callback, .msg = @intFromEnum(Foo.Msg.connect) }, 442 + ); 443 + 444 + try rt.run(.until_done); 445 + 446 + try std.testing.expect(foo.fd != null); 447 + 448 + var bundle: CertBundle = .{}; 449 + try bundle.rescan(gpa); 450 + defer bundle.deinit(gpa); 451 + 452 + _ = try Client.init( 453 + &rt, 454 + foo.fd.?, 455 + .{ .root_ca = bundle, .host = "google.com" }, 456 + .{ .ptr = &foo, .cb = Foo.callback, .msg = @intFromEnum(Foo.Msg.handshake) }, 457 + ); 458 + try rt.run(.until_done); 459 + try std.testing.expect(foo.tls != null); 460 + 461 + try foo.tls.?.recv(&rt); 462 + try foo.tls.?.close(gpa, &rt); 463 + try rt.run(.until_done); 464 + try std.testing.expect(foo.tls == null); 465 + try std.testing.expect(foo.fd == null); 466 + }