An asynchronous IO runtime
1const std = @import("std");
2const io = @import("ourio");
3
4const posix = std.posix;
5const Allocator = std.mem.Allocator;
6const Uri = std.Uri;
7
8const assert = std.debug.assert;
9
10pub const dns = @import("net/dns.zig");
11
12pub fn tcpConnectToHost(
13 rt: *io.Ring,
14 host: []const u8,
15 port: u16,
16 ctx: io.Context,
17) !*ConnectTask {
18 // TODO: getAddressList could be rewritten to be async. It accesses the filesystem and could
19 // make a DNS request
20 const list = try std.net.getAddressList(rt.gpa, host, port);
21 defer list.deinit();
22
23 const addr = for (list.addrs) |addr| {
24 break addr;
25 } else return error.AddressNotFound;
26
27 return tcpConnectToAddr(rt, addr, ctx);
28}
29
30pub fn tcpConnectToAddr(
31 rt: *io.Ring,
32 addr: std.net.Address,
33 ctx: io.Context,
34) Allocator.Error!*ConnectTask {
35 const conn = try rt.gpa.create(ConnectTask);
36 errdefer rt.gpa.destroy(conn);
37
38 conn.* = .{
39 .ctx = ctx,
40 .addr = addr,
41 .fd = null,
42 .task = undefined,
43 };
44
45 conn.task = try rt.socket(
46 conn.addr.any.family,
47 posix.SOCK.STREAM | posix.SOCK.CLOEXEC,
48 posix.IPPROTO.TCP,
49 .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg },
50 );
51
52 return conn;
53}
54
55pub fn udpConnectToAddr(
56 rt: *io.Ring,
57 addr: std.net.Address,
58 ctx: io.Context,
59) Allocator.Error!*ConnectTask {
60 const conn = try rt.gpa.create(ConnectTask);
61 errdefer rt.gpa.destroy(conn);
62
63 conn.* = .{
64 .ctx = ctx,
65 .addr = addr,
66 .fd = null,
67 .task = undefined,
68 };
69
70 conn.task = try rt.socket(
71 conn.addr.any.family,
72 posix.SOCK.DGRAM | posix.SOCK.CLOEXEC,
73 posix.IPPROTO.UDP,
74 .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg },
75 );
76
77 return conn;
78}
79
80pub const ConnectTask = struct {
81 ctx: io.Context,
82
83 addr: std.net.Address,
84 fd: ?posix.fd_t,
85
86 /// Task is the current task we are operating on. We store this to provide cancelation
87 task: *io.Task,
88
89 pub const Msg = enum {
90 socket,
91 connect,
92 };
93
94 /// Cancels the current task. Not guaranteed to actually cancel. User's callback will get an
95 /// error.Canceled if cancelation was successful, otherwise the operation will complete as
96 /// normal and this is essentially a no-op
97 pub fn cancel(self: *ConnectTask, rt: *io.Ring) void {
98 _ = self.task.cancel(rt, null, 0, io.noopCallback) catch {};
99 }
100
101 pub fn handleMsg(rt: *io.Ring, task: io.Task) anyerror!void {
102 const self = task.userdataCast(ConnectTask);
103 const result = task.result.?;
104 switch (task.msgToEnum(Msg)) {
105 .socket => {
106 assert(result == .socket);
107 self.fd = result.socket catch |err| {
108 defer rt.gpa.destroy(self);
109 try self.ctx.cb(rt, .{
110 .userdata = self.ctx.ptr,
111 .msg = self.ctx.msg,
112 .result = .{ .userfd = err },
113 .callback = self.ctx.cb,
114 .req = .userfd,
115 });
116 return;
117 };
118
119 self.task = try rt.connect(
120 self.fd.?,
121 &self.addr.any,
122 self.addr.getOsSockLen(),
123 .{ .ptr = self, .msg = @intFromEnum(Msg.connect), .cb = ConnectTask.handleMsg },
124 );
125 },
126
127 .connect => {
128 assert(result == .connect);
129 defer rt.gpa.destroy(self);
130
131 _ = result.connect catch |err| {
132 try self.ctx.cb(rt, .{
133 .userdata = self.ctx.ptr,
134 .msg = self.ctx.msg,
135 .result = .{ .userfd = err },
136 .callback = self.ctx.cb,
137 .req = .userfd,
138 });
139 _ = try rt.close(self.fd.?, .{});
140 return;
141 };
142
143 try self.ctx.cb(rt, .{
144 .userdata = self.ctx.ptr,
145 .msg = self.ctx.msg,
146 .result = .{ .userfd = self.fd.? },
147 .callback = self.ctx.cb,
148 .req = .userfd,
149 });
150 },
151 }
152 }
153};
154
155test {
156 _ = dns;
157}
158
159test "tcp connect" {
160 var rt: io.Ring = try .init(std.testing.allocator, 16);
161 defer rt.deinit();
162
163 const addr: std.net.Address = try .parseIp4("127.0.0.1", 80);
164
165 {
166 // Happy path
167 const conn = try tcpConnectToAddr(&rt, addr, .{});
168 errdefer std.testing.allocator.destroy(conn);
169
170 const task1 = rt.submission_q.pop().?;
171 defer std.testing.allocator.destroy(task1);
172 try std.testing.expect(task1.req == .socket);
173 try std.testing.expect(rt.submission_q.pop() == null);
174
175 const fd: posix.fd_t = 7;
176 try ConnectTask.handleMsg(&rt, .{
177 .userdata = conn,
178 .msg = @intFromEnum(ConnectTask.Msg.socket),
179 .result = .{ .socket = fd },
180 .req = .userfd,
181 });
182
183 const task2 = rt.submission_q.pop().?;
184 defer std.testing.allocator.destroy(task2);
185 try std.testing.expect(task2.req == .connect);
186 try std.testing.expect(rt.submission_q.pop() == null);
187
188 try ConnectTask.handleMsg(&rt, .{
189 .userdata = conn,
190 .msg = @intFromEnum(ConnectTask.Msg.connect),
191 .result = .{ .connect = {} },
192 .req = .userfd,
193 });
194 try std.testing.expect(rt.submission_q.pop() == null);
195 }
196
197 {
198 // socket error
199 const conn = try tcpConnectToAddr(&rt, addr, .{});
200 errdefer std.testing.allocator.destroy(conn);
201
202 const task1 = rt.submission_q.pop().?;
203 defer std.testing.allocator.destroy(task1);
204
205 try ConnectTask.handleMsg(&rt, .{
206 .userdata = conn,
207 .msg = @intFromEnum(ConnectTask.Msg.socket),
208 .result = .{ .socket = error.Canceled },
209 .req = .userfd,
210 });
211 try std.testing.expect(rt.submission_q.pop() == null);
212 }
213
214 {
215 // connect error
216 const conn = try tcpConnectToAddr(&rt, addr, .{});
217 errdefer std.testing.allocator.destroy(conn);
218
219 const task1 = rt.submission_q.pop().?;
220 defer std.testing.allocator.destroy(task1);
221 try std.testing.expect(task1.req == .socket);
222 try std.testing.expect(rt.submission_q.pop() == null);
223
224 const fd: posix.fd_t = 7;
225 try ConnectTask.handleMsg(&rt, .{
226 .userdata = conn,
227 .msg = @intFromEnum(ConnectTask.Msg.socket),
228 .result = .{ .socket = fd },
229 .req = .userfd,
230 });
231
232 const task2 = rt.submission_q.pop().?;
233 defer std.testing.allocator.destroy(task2);
234 try std.testing.expect(task2.req == .connect);
235 try std.testing.expect(rt.submission_q.pop() == null);
236
237 try ConnectTask.handleMsg(&rt, .{
238 .userdata = conn,
239 .msg = @intFromEnum(ConnectTask.Msg.connect),
240 .result = .{ .connect = error.Canceled },
241 .req = .noop,
242 });
243 const task3 = rt.submission_q.pop().?;
244 defer std.testing.allocator.destroy(task3);
245 try std.testing.expect(task3.req == .close);
246 try std.testing.expect(rt.submission_q.pop() == null);
247 }
248}