An asynchronous IO runtime
at main 7.5 kB view raw
1const std = @import("std"); 2const io = @import("ourio"); 3 4const posix = std.posix; 5const Allocator = std.mem.Allocator; 6const Uri = std.Uri; 7 8const assert = std.debug.assert; 9 10pub const dns = @import("net/dns.zig"); 11 12pub fn tcpConnectToHost( 13 rt: *io.Ring, 14 host: []const u8, 15 port: u16, 16 ctx: io.Context, 17) !*ConnectTask { 18 // TODO: getAddressList could be rewritten to be async. It accesses the filesystem and could 19 // make a DNS request 20 const list = try std.net.getAddressList(rt.gpa, host, port); 21 defer list.deinit(); 22 23 const addr = for (list.addrs) |addr| { 24 break addr; 25 } else return error.AddressNotFound; 26 27 return tcpConnectToAddr(rt, addr, ctx); 28} 29 30pub fn tcpConnectToAddr( 31 rt: *io.Ring, 32 addr: std.net.Address, 33 ctx: io.Context, 34) Allocator.Error!*ConnectTask { 35 const conn = try rt.gpa.create(ConnectTask); 36 errdefer rt.gpa.destroy(conn); 37 38 conn.* = .{ 39 .ctx = ctx, 40 .addr = addr, 41 .fd = null, 42 .task = undefined, 43 }; 44 45 conn.task = try rt.socket( 46 conn.addr.any.family, 47 posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 48 posix.IPPROTO.TCP, 49 .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg }, 50 ); 51 52 return conn; 53} 54 55pub fn udpConnectToAddr( 56 rt: *io.Ring, 57 addr: std.net.Address, 58 ctx: io.Context, 59) Allocator.Error!*ConnectTask { 60 const conn = try rt.gpa.create(ConnectTask); 61 errdefer rt.gpa.destroy(conn); 62 63 conn.* = .{ 64 .ctx = ctx, 65 .addr = addr, 66 .fd = null, 67 .task = undefined, 68 }; 69 70 conn.task = try rt.socket( 71 conn.addr.any.family, 72 posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, 73 posix.IPPROTO.UDP, 74 .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg }, 75 ); 76 77 return conn; 78} 79 80pub const ConnectTask = struct { 81 ctx: io.Context, 82 83 addr: std.net.Address, 84 fd: ?posix.fd_t, 85 86 /// Task is the current task we are operating on. We store this to provide cancelation 87 task: *io.Task, 88 89 pub const Msg = enum { 90 socket, 91 connect, 92 }; 93 94 /// Cancels the current task. Not guaranteed to actually cancel. User's callback will get an 95 /// error.Canceled if cancelation was successful, otherwise the operation will complete as 96 /// normal and this is essentially a no-op 97 pub fn cancel(self: *ConnectTask, rt: *io.Ring) void { 98 _ = self.task.cancel(rt, null, 0, io.noopCallback) catch {}; 99 } 100 101 pub fn handleMsg(rt: *io.Ring, task: io.Task) anyerror!void { 102 const self = task.userdataCast(ConnectTask); 103 const result = task.result.?; 104 switch (task.msgToEnum(Msg)) { 105 .socket => { 106 assert(result == .socket); 107 self.fd = result.socket catch |err| { 108 defer rt.gpa.destroy(self); 109 try self.ctx.cb(rt, .{ 110 .userdata = self.ctx.ptr, 111 .msg = self.ctx.msg, 112 .result = .{ .userfd = err }, 113 .callback = self.ctx.cb, 114 .req = .userfd, 115 }); 116 return; 117 }; 118 119 self.task = try rt.connect( 120 self.fd.?, 121 &self.addr.any, 122 self.addr.getOsSockLen(), 123 .{ .ptr = self, .msg = @intFromEnum(Msg.connect), .cb = ConnectTask.handleMsg }, 124 ); 125 }, 126 127 .connect => { 128 assert(result == .connect); 129 defer rt.gpa.destroy(self); 130 131 _ = result.connect catch |err| { 132 try self.ctx.cb(rt, .{ 133 .userdata = self.ctx.ptr, 134 .msg = self.ctx.msg, 135 .result = .{ .userfd = err }, 136 .callback = self.ctx.cb, 137 .req = .userfd, 138 }); 139 _ = try rt.close(self.fd.?, .{}); 140 return; 141 }; 142 143 try self.ctx.cb(rt, .{ 144 .userdata = self.ctx.ptr, 145 .msg = self.ctx.msg, 146 .result = .{ .userfd = self.fd.? }, 147 .callback = self.ctx.cb, 148 .req = .userfd, 149 }); 150 }, 151 } 152 } 153}; 154 155test { 156 _ = dns; 157} 158 159test "tcp connect" { 160 var rt: io.Ring = try .init(std.testing.allocator, 16); 161 defer rt.deinit(); 162 163 const addr: std.net.Address = try .parseIp4("127.0.0.1", 80); 164 165 { 166 // Happy path 167 const conn = try tcpConnectToAddr(&rt, addr, .{}); 168 errdefer std.testing.allocator.destroy(conn); 169 170 const task1 = rt.submission_q.pop().?; 171 defer std.testing.allocator.destroy(task1); 172 try std.testing.expect(task1.req == .socket); 173 try std.testing.expect(rt.submission_q.pop() == null); 174 175 const fd: posix.fd_t = 7; 176 try ConnectTask.handleMsg(&rt, .{ 177 .userdata = conn, 178 .msg = @intFromEnum(ConnectTask.Msg.socket), 179 .result = .{ .socket = fd }, 180 .req = .userfd, 181 }); 182 183 const task2 = rt.submission_q.pop().?; 184 defer std.testing.allocator.destroy(task2); 185 try std.testing.expect(task2.req == .connect); 186 try std.testing.expect(rt.submission_q.pop() == null); 187 188 try ConnectTask.handleMsg(&rt, .{ 189 .userdata = conn, 190 .msg = @intFromEnum(ConnectTask.Msg.connect), 191 .result = .{ .connect = {} }, 192 .req = .userfd, 193 }); 194 try std.testing.expect(rt.submission_q.pop() == null); 195 } 196 197 { 198 // socket error 199 const conn = try tcpConnectToAddr(&rt, addr, .{}); 200 errdefer std.testing.allocator.destroy(conn); 201 202 const task1 = rt.submission_q.pop().?; 203 defer std.testing.allocator.destroy(task1); 204 205 try ConnectTask.handleMsg(&rt, .{ 206 .userdata = conn, 207 .msg = @intFromEnum(ConnectTask.Msg.socket), 208 .result = .{ .socket = error.Canceled }, 209 .req = .userfd, 210 }); 211 try std.testing.expect(rt.submission_q.pop() == null); 212 } 213 214 { 215 // connect error 216 const conn = try tcpConnectToAddr(&rt, addr, .{}); 217 errdefer std.testing.allocator.destroy(conn); 218 219 const task1 = rt.submission_q.pop().?; 220 defer std.testing.allocator.destroy(task1); 221 try std.testing.expect(task1.req == .socket); 222 try std.testing.expect(rt.submission_q.pop() == null); 223 224 const fd: posix.fd_t = 7; 225 try ConnectTask.handleMsg(&rt, .{ 226 .userdata = conn, 227 .msg = @intFromEnum(ConnectTask.Msg.socket), 228 .result = .{ .socket = fd }, 229 .req = .userfd, 230 }); 231 232 const task2 = rt.submission_q.pop().?; 233 defer std.testing.allocator.destroy(task2); 234 try std.testing.expect(task2.req == .connect); 235 try std.testing.expect(rt.submission_q.pop() == null); 236 237 try ConnectTask.handleMsg(&rt, .{ 238 .userdata = conn, 239 .msg = @intFromEnum(ConnectTask.Msg.connect), 240 .result = .{ .connect = error.Canceled }, 241 .req = .noop, 242 }); 243 const task3 = rt.submission_q.pop().?; 244 defer std.testing.allocator.destroy(task3); 245 try std.testing.expect(task3.req == .close); 246 try std.testing.expect(rt.submission_q.pop() == null); 247 } 248}