An asynchronous IO runtime

stda: add dns resolver

Includes:
- Timeouts
- Retries
- A, AAAA records (more to come as needed)
- /etc/resolv.conf parsing

rockorager.dev 8896ef65 658cb01c

verified
Changed files
+627 -6
src
+2 -1
build.zig
··· 41 .target = target, 42 .optimize = optimize, 43 }); 44 const tls_dep = b.dependency("tls", .{ .target = target, .optimize = optimize }); 45 stda_mod.addImport("tls", tls_dep.module("tls")); 46 - stda_mod.addImport("ourio", ourio); 47 48 return stda_mod; 49 }
··· 41 .target = target, 42 .optimize = optimize, 43 }); 44 + stda_mod.addImport("ourio", ourio); 45 + 46 const tls_dep = b.dependency("tls", .{ .target = target, .optimize = optimize }); 47 stda_mod.addImport("tls", tls_dep.module("tls")); 48 49 return stda_mod; 50 }
+4
src/ourio.zig
··· 558 open, 559 read, 560 561 /// userfd is meant to send file descriptors between Ring instances (using msgRing) 562 userfd, 563 /// usermsg is meant to send a u16 between runtime instances (using msgRing) ··· 624 buffer: []u8, 625 }, 626 627 userfd, 628 usermsg, 629 userptr, ··· 648 open: ResultError!posix.fd_t, 649 read: ResultError!usize, 650 651 userfd: anyerror!posix.fd_t, 652 usermsg: u16, 653 userptr: anyerror!?*anyopaque,
··· 558 open, 559 read, 560 561 + /// userbytes is meant to send slices of bytes between Ring instances or callbacks 562 + userbytes, 563 /// userfd is meant to send file descriptors between Ring instances (using msgRing) 564 userfd, 565 /// usermsg is meant to send a u16 between runtime instances (using msgRing) ··· 626 buffer: []u8, 627 }, 628 629 + userbytes, 630 userfd, 631 usermsg, 632 userptr, ··· 651 open: ResultError!posix.fd_t, 652 read: ResultError!usize, 653 654 + userbytes: anyerror![]const u8, 655 userfd: anyerror!posix.fd_t, 656 usermsg: u16, 657 userptr: anyerror!?*anyopaque,
+5 -2
src/ourio/Kqueue.zig
··· 427 }, 428 429 // user* fields are never seen by the runtime, only for internal message passing 430 - .userfd, .usermsg, .userptr => unreachable, 431 432 .write => |req| { 433 self.in_flight.push(task); ··· 457 .open, 458 .socket, 459 .statx, 460 .userfd, 461 .usermsg, 462 .userptr, ··· 717 .open, 718 .socket, 719 .statx, 720 .userfd, 721 .usermsg, 722 .userptr, ··· 756 .socket => .{ .socket = error.Canceled }, 757 .statx => .{ .statx = error.Canceled }, 758 .timer => .{ .timer = error.Canceled }, 759 - .userfd, .usermsg, .userptr => unreachable, 760 .write => .{ .write = error.Canceled }, 761 .writev => .{ .writev = error.Canceled }, 762 }; ··· 796 .socket, 797 .statx, 798 .timer, 799 .userfd, 800 .usermsg, 801 .userptr,
··· 427 }, 428 429 // user* fields are never seen by the runtime, only for internal message passing 430 + .userbytes, .userfd, .usermsg, .userptr => unreachable, 431 432 .write => |req| { 433 self.in_flight.push(task); ··· 457 .open, 458 .socket, 459 .statx, 460 + .userbytes, 461 .userfd, 462 .usermsg, 463 .userptr, ··· 718 .open, 719 .socket, 720 .statx, 721 + .userbytes, 722 .userfd, 723 .usermsg, 724 .userptr, ··· 758 .socket => .{ .socket = error.Canceled }, 759 .statx => .{ .statx = error.Canceled }, 760 .timer => .{ .timer = error.Canceled }, 761 + .userbytes, .userfd, .usermsg, .userptr => unreachable, 762 .write => .{ .write = error.Canceled }, 763 .writev => .{ .writev = error.Canceled }, 764 }; ··· 798 .socket, 799 .statx, 800 .timer, 801 + .userbytes, 802 .userfd, 803 .usermsg, 804 .userptr,
+2
src/ourio/Mock.zig
··· 30 write_cb: ?*const fn (*io.Task) io.Result = null, 31 writev_cb: ?*const fn (*io.Task) io.Result = null, 32 33 userfd_cb: ?*const fn (*io.Task) io.Result = null, 34 usermsg_cb: ?*const fn (*io.Task) io.Result = null, 35 userptr_cb: ?*const fn (*io.Task) io.Result = null, ··· 117 while (self.completions.pop()) |task| { 118 try task.callback(rt, task.*); 119 rt.free_q.push(task); 120 } 121 }
··· 30 write_cb: ?*const fn (*io.Task) io.Result = null, 31 writev_cb: ?*const fn (*io.Task) io.Result = null, 32 33 + userbytes_cb: ?*const fn (*io.Task) io.Result = null, 34 userfd_cb: ?*const fn (*io.Task) io.Result = null, 35 usermsg_cb: ?*const fn (*io.Task) io.Result = null, 36 userptr_cb: ?*const fn (*io.Task) io.Result = null, ··· 118 while (self.completions.pop()) |task| { 119 try task.callback(rt, task.*); 120 rt.free_q.push(task); 121 + if (task.deadline) |dl| rt.free_q.push(dl); 122 } 123 }
+2 -2
src/ourio/Uring.zig
··· 247 }, 248 249 // user* is only sent internally between rings and higher level wrappers 250 - .userfd, .usermsg, .userptr => unreachable, 251 } 252 } 253 ··· 399 .usermsg => .{ .usermsg = @intCast(cqe.res) }, 400 401 // userfd should never reach the runtime 402 - .userfd, .userptr => unreachable, 403 }; 404 405 defer {
··· 247 }, 248 249 // user* is only sent internally between rings and higher level wrappers 250 + .userbytes, .userfd, .usermsg, .userptr => unreachable, 251 } 252 } 253 ··· 399 .usermsg => .{ .usermsg = @intCast(cqe.res) }, 400 401 // userfd should never reach the runtime 402 + .userbytes, .userfd, .userptr => unreachable, 403 }; 404 405 defer {
+13
src/stda.zig
··· 1 pub const net = @import("stda/net.zig"); 2 pub const tls = @import("stda/tls.zig"); 3 4 test { 5 _ = net;
··· 1 + const std = @import("std"); 2 + 3 pub const net = @import("stda/net.zig"); 4 pub const tls = @import("stda/tls.zig"); 5 + 6 + const root = @import("root"); 7 + pub const options: Options = if (@hasDecl(root, "stda_options")) root.std_options else .{}; 8 + 9 + pub const Options = struct { 10 + /// nameservers to be used if system nameservers can't be located 11 + nameservers: [2]std.net.Address = .{ 12 + std.net.Address.initIp4(.{ 1, 1, 1, 1 }, 53), 13 + std.net.Address.initIp4(.{ 1, 0, 0, 1 }, 53), 14 + }, 15 + }; 16 17 test { 18 _ = net;
+31 -1
src/stda/net.zig
··· 7 8 const assert = std.debug.assert; 9 10 pub fn tcpConnectToHost( 11 rt: *io.Ring, 12 host: []const u8, ··· 35 36 conn.* = .{ 37 .ctx = ctx, 38 - 39 .addr = addr, 40 .fd = null, 41 .task = undefined, ··· 45 conn.addr.any.family, 46 posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 47 posix.IPPROTO.TCP, 48 .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg }, 49 ); 50 ··· 125 } 126 } 127 }; 128 129 test "tcp connect" { 130 var rt: io.Ring = try .init(std.testing.allocator, 16);
··· 7 8 const assert = std.debug.assert; 9 10 + pub const dns = @import("net/dns.zig"); 11 + 12 pub fn tcpConnectToHost( 13 rt: *io.Ring, 14 host: []const u8, ··· 37 38 conn.* = .{ 39 .ctx = ctx, 40 .addr = addr, 41 .fd = null, 42 .task = undefined, ··· 46 conn.addr.any.family, 47 posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 48 posix.IPPROTO.TCP, 49 + .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg }, 50 + ); 51 + 52 + return conn; 53 + } 54 + 55 + pub fn udpConnectToAddr( 56 + rt: *io.Ring, 57 + addr: std.net.Address, 58 + ctx: io.Context, 59 + ) Allocator.Error!*ConnectTask { 60 + const conn = try rt.gpa.create(ConnectTask); 61 + errdefer rt.gpa.destroy(conn); 62 + 63 + conn.* = .{ 64 + .ctx = ctx, 65 + .addr = addr, 66 + .fd = null, 67 + .task = undefined, 68 + }; 69 + 70 + conn.task = try rt.socket( 71 + conn.addr.any.family, 72 + posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, 73 + posix.IPPROTO.UDP, 74 .{ .ptr = conn, .msg = @intFromEnum(ConnectTask.Msg.socket), .cb = ConnectTask.handleMsg }, 75 ); 76 ··· 151 } 152 } 153 }; 154 + 155 + test { 156 + _ = dns; 157 + } 158 159 test "tcp connect" { 160 var rt: io.Ring = try .init(std.testing.allocator, 16);
+568
src/stda/net/dns.zig
···
··· 1 + const std = @import("std"); 2 + const stda = @import("../../stda.zig"); 3 + const ourio = @import("ourio"); 4 + 5 + const Allocator = std.mem.Allocator; 6 + const Context = ourio.Context; 7 + const Ring = ourio.Ring; 8 + const Task = ourio.Task; 9 + const assert = std.debug.assert; 10 + const net = stda.net; 11 + const posix = std.posix; 12 + 13 + const default_dns = &stda.options.nameservers; 14 + 15 + pub const Resolver = struct { 16 + gpa: Allocator, 17 + ctx: Context, 18 + config: Config = .{}, 19 + 20 + const Msg = enum { open_resolv, read_resolv }; 21 + 22 + /// initialize a Resolver instance. When the resolver is complete with initialization, a userptr 23 + /// result type will be delivered to ctx. The resolver will then be ready to resolve DNS queries 24 + pub fn init( 25 + self: *Resolver, 26 + gpa: Allocator, 27 + io: *Ring, 28 + ctx: Context, 29 + ) Allocator.Error!void { 30 + self.* = .{ 31 + .gpa = gpa, 32 + .ctx = ctx, 33 + }; 34 + 35 + _ = try io.open("/etc/resolv.conf", .{ .CLOEXEC = true }, 0, .{ 36 + .cb = Resolver.onCompletion, 37 + .ptr = self, 38 + .msg = @intFromEnum(Msg.open_resolv), 39 + }); 40 + } 41 + 42 + pub fn deinit(self: *Resolver) void { 43 + self.gpa.free(self.config.nameservers); 44 + } 45 + 46 + pub fn resolveQueries(self: *Resolver, io: *Ring, queries: []const Question, ctx: ourio.Context) !void { 47 + assert(self.config.nameservers.len > 0); 48 + 49 + const conn = try self.gpa.create(Connection); 50 + conn.* = .{ .gpa = self.gpa, .ctx = ctx, .config = self.config }; 51 + try conn.writeQuestions(queries); 52 + 53 + try conn.tryNext(io); 54 + } 55 + 56 + pub fn onCompletion(io: *Ring, task: Task) anyerror!void { 57 + const self = task.userdataCast(Resolver); 58 + const msg = task.msgToEnum(Resolver.Msg); 59 + const result = task.result.?; 60 + 61 + switch (msg) { 62 + .open_resolv => { 63 + const fd = result.open catch { 64 + self.config.nameservers = try self.gpa.dupe(std.net.Address, default_dns); 65 + const t: Task = .{ 66 + .callback = self.ctx.cb, 67 + .msg = self.ctx.msg, 68 + .userdata = self.ctx.ptr, 69 + .result = .{ .userptr = self }, 70 + }; 71 + try self.ctx.cb(io, t); 72 + return; 73 + }; 74 + 75 + const buffer = try self.gpa.alloc(u8, 4096); 76 + errdefer self.gpa.free(buffer); 77 + 78 + _ = try io.read(fd, buffer, .{ 79 + .cb = Resolver.onCompletion, 80 + .ptr = self, 81 + .msg = @intFromEnum(Resolver.Msg.read_resolv), 82 + }); 83 + }, 84 + 85 + .read_resolv => { 86 + const buffer = task.req.read.buffer; 87 + defer self.gpa.free(buffer); 88 + 89 + _ = try io.close(task.req.read.fd, .{}); 90 + 91 + const n = result.read catch |err| { 92 + const t: Task = .{ 93 + .callback = self.ctx.cb, 94 + .msg = self.ctx.msg, 95 + .userdata = self.ctx.ptr, 96 + .result = .{ .userptr = err }, 97 + }; 98 + try self.ctx.cb(io, t); 99 + return; 100 + }; 101 + 102 + if (n >= buffer.len) { 103 + @panic("TODO: more to read"); 104 + } 105 + 106 + var line_iter = std.mem.splitScalar(u8, buffer[0..n], '\n'); 107 + var addresses: std.ArrayListUnmanaged(std.net.Address) = .empty; 108 + defer addresses.deinit(self.gpa); 109 + 110 + while (line_iter.next()) |line| { 111 + if (line.len == 0 or line[0] == ';' or line[0] == '#') continue; 112 + 113 + var iter = std.mem.splitAny(u8, line, &std.ascii.whitespace); 114 + const key = iter.first(); 115 + 116 + if (std.mem.eql(u8, key, "nameserver")) { 117 + const addr = try std.net.Address.parseIp(iter.rest(), 53); 118 + try addresses.append(self.gpa, addr); 119 + continue; 120 + } 121 + 122 + if (std.mem.eql(u8, key, "options")) { 123 + while (iter.next()) |opt| { 124 + if (std.mem.startsWith(u8, opt, "timeout:")) { 125 + const timeout = std.fmt.parseInt(u5, opt[8..], 10) catch 30; 126 + self.config.timeout_s = @min(30, timeout); 127 + continue; 128 + } 129 + 130 + if (std.mem.startsWith(u8, opt, "attempts:")) { 131 + const attempts = std.fmt.parseInt(u3, opt[9..], 10) catch 5; 132 + self.config.attempts = @max(@min(5, attempts), 1); 133 + continue; 134 + } 135 + 136 + if (std.mem.eql(u8, opt, "edns0")) { 137 + self.config.edns0 = true; 138 + continue; 139 + } 140 + } 141 + } 142 + } 143 + 144 + self.config.nameservers = try addresses.toOwnedSlice(self.gpa); 145 + 146 + const t: Task = .{ 147 + .callback = self.ctx.cb, 148 + .msg = self.ctx.msg, 149 + .userdata = self.ctx.ptr, 150 + .result = .{ .userptr = self }, 151 + }; 152 + try self.ctx.cb(io, t); 153 + }, 154 + } 155 + } 156 + }; 157 + 158 + pub const Config = struct { 159 + nameservers: []const std.net.Address = &.{}, 160 + 161 + /// timeout_s is silently capped to 30 according to man resolv.conf 162 + timeout_s: u5 = 30, 163 + 164 + /// attempts is capped at 5 165 + attempts: u3 = 5, 166 + 167 + edns0: bool = false, 168 + }; 169 + 170 + pub const Header = packed struct(u96) { 171 + id: u16 = 0, 172 + 173 + flags1: packed struct(u8) { 174 + recursion_desired: bool = true, 175 + truncated: bool = false, 176 + authoritative_answer: bool = false, 177 + opcode: enum(u4) { 178 + query = 0, 179 + inverse_query = 1, 180 + server_status_request = 2, 181 + } = .query, 182 + is_response: bool = false, 183 + } = .{}, 184 + 185 + flags2: packed struct(u8) { 186 + response_code: enum(u4) { 187 + success = 0, 188 + format_error = 1, 189 + server_failure = 2, 190 + name_error = 3, 191 + not_implemented = 4, 192 + refuse = 5, 193 + } = .success, 194 + z: u3 = 0, 195 + recursion_available: bool = false, 196 + } = .{}, 197 + 198 + question_count: u16 = 0, 199 + 200 + answer_count: u16 = 0, 201 + 202 + authority_count: u16 = 0, 203 + 204 + additional_count: u16 = 0, 205 + 206 + pub fn asBytes(self: Header) [12]u8 { 207 + var bytes: [12]u8 = undefined; 208 + var fbs = std.io.fixedBufferStream(&bytes); 209 + fbs.writer().writeInt(u16, self.id, .big) catch unreachable; 210 + 211 + fbs.writer().writeByte(@bitCast(self.flags1)) catch unreachable; 212 + fbs.writer().writeByte(@bitCast(self.flags2)) catch unreachable; 213 + 214 + fbs.writer().writeInt(u16, self.question_count, .big) catch unreachable; 215 + fbs.writer().writeInt(u16, self.answer_count, .big) catch unreachable; 216 + fbs.writer().writeInt(u16, self.authority_count, .big) catch unreachable; 217 + fbs.writer().writeInt(u16, self.additional_count, .big) catch unreachable; 218 + assert(fbs.pos == 12); 219 + return bytes; 220 + } 221 + }; 222 + 223 + pub const Question = struct { 224 + host: []const u8, 225 + type: ResourceType = .A, 226 + class: enum(u16) { 227 + IN = 1, 228 + // CS = 2, 229 + // CH = 3, 230 + // HS = 4, 231 + // WILDCARD = 255, 232 + } = .IN, 233 + }; 234 + 235 + pub const ResourceType = enum(u16) { 236 + A = 1, 237 + // NS = 2, 238 + // MD = 3, 239 + // MF = 4, 240 + // CNAME = 5, 241 + // SOA = 6, 242 + // MB = 7, 243 + // MG = 8, 244 + // MR = 9, 245 + // NULL = 10, 246 + // WKS = 11, 247 + // PTR = 12, 248 + // HINFO = 13, 249 + // MINFO = 14, 250 + // MX = 15, 251 + // TXT = 16, 252 + AAAA = 28, 253 + // SRV = 33, 254 + // OPT = 41, 255 + }; 256 + 257 + pub const Answer = union(ResourceType) { 258 + A: [4]u8, 259 + AAAA: [16]u8, 260 + }; 261 + 262 + pub const Response = struct { 263 + bytes: []const u8, 264 + 265 + pub fn header(self: Response) Header { 266 + assert(self.bytes.len >= 12); 267 + const readInt = std.mem.readInt; 268 + 269 + return .{ 270 + .id = readInt(u16, self.bytes[0..2], .big), 271 + .flags1 = @bitCast(self.bytes[2]), 272 + .flags2 = @bitCast(self.bytes[3]), 273 + .question_count = readInt(u16, self.bytes[4..6], .big), 274 + .answer_count = readInt(u16, self.bytes[6..8], .big), 275 + .authority_count = readInt(u16, self.bytes[8..10], .big), 276 + .additional_count = readInt(u16, self.bytes[10..12], .big), 277 + }; 278 + } 279 + 280 + pub const AnswerIterator = struct { 281 + bytes: []const u8, 282 + /// offset into bytes 283 + offset: usize = 0, 284 + 285 + count: usize, 286 + /// number of answers we have returned 287 + idx: usize = 0, 288 + 289 + pub fn next(self: *AnswerIterator) ?Answer { 290 + if (self.idx >= self.count or self.offset >= self.bytes.len) return null; 291 + defer self.idx += 1; 292 + 293 + // Read the name 294 + const b = self.bytes[self.offset]; 295 + if (b & 0b1100_0000 == 0) { 296 + // Encoded name. Get past this 297 + self.offset = std.mem.indexOfScalar(u8, self.bytes[self.idx..], 0x00) orelse 298 + return null; 299 + } else { 300 + // Name is pointer, we can advance 2 bytes 301 + self.offset += 2; 302 + } 303 + 304 + const typ: ResourceType = @enumFromInt(std.mem.readInt( 305 + u16, 306 + self.bytes[self.offset..][0..2], 307 + .big, 308 + )); 309 + self.offset += 2; 310 + const class = std.mem.readInt(u16, self.bytes[self.offset..][0..2], .big); 311 + assert(class == 1); 312 + self.offset += 2; 313 + const ttl = std.mem.readInt(u32, self.bytes[self.offset..][0..4], .big); 314 + _ = ttl; 315 + self.offset += 4; 316 + const rd_len = std.mem.readInt(u16, self.bytes[self.offset..][0..2], .big); 317 + self.offset += 2; 318 + defer self.offset += rd_len; 319 + 320 + switch (typ) { 321 + .A => { 322 + assert(rd_len == 4); 323 + return .{ .A = .{ 324 + self.bytes[self.offset], 325 + self.bytes[self.offset + 1], 326 + self.bytes[self.offset + 2], 327 + self.bytes[self.offset + 3], 328 + } }; 329 + }, 330 + 331 + .AAAA => { 332 + assert(rd_len == 4); 333 + return .{ .AAAA = .{ 334 + self.bytes[self.offset], 335 + self.bytes[self.offset + 1], 336 + self.bytes[self.offset + 2], 337 + self.bytes[self.offset + 3], 338 + self.bytes[self.offset + 4], 339 + self.bytes[self.offset + 5], 340 + self.bytes[self.offset + 6], 341 + self.bytes[self.offset + 7], 342 + self.bytes[self.offset + 8], 343 + self.bytes[self.offset + 9], 344 + self.bytes[self.offset + 10], 345 + self.bytes[self.offset + 11], 346 + self.bytes[self.offset + 12], 347 + self.bytes[self.offset + 13], 348 + self.bytes[self.offset + 14], 349 + self.bytes[self.offset + 15], 350 + } }; 351 + }, 352 + } 353 + } 354 + }; 355 + 356 + pub fn answerIterator(self: Response) !AnswerIterator { 357 + const h = self.header(); 358 + 359 + var offset: usize = 12; 360 + 361 + var q: u16 = 0; 362 + while (q < h.question_count) { 363 + offset = std.mem.indexOfScalarPos(u8, self.bytes, offset, 0x00) orelse 364 + return error.InvalidResponse; 365 + offset += 4; // 2 bytes for type, 2 bytes for class 366 + q += 1; 367 + } 368 + 369 + return .{ 370 + .bytes = self.bytes[offset..], 371 + .count = h.answer_count, 372 + }; 373 + } 374 + }; 375 + 376 + pub const Connection = struct { 377 + gpa: Allocator, 378 + ctx: Context, 379 + config: Config, 380 + 381 + nameserver: u8 = 0, 382 + attempt: u5 = 0, 383 + 384 + read_buffer: [2048]u8 = undefined, 385 + write_buffer: std.ArrayListUnmanaged(u8) = .empty, 386 + deadline: i64 = 0, 387 + 388 + const Msg = enum { connect, recv }; 389 + 390 + pub fn tryNext(self: *Connection, io: *Ring) !void { 391 + self.deadline = std.time.timestamp() + self.config.timeout_s; 392 + 393 + if (self.attempt < self.config.attempts) { 394 + const addr = self.config.nameservers[self.nameserver]; 395 + self.attempt += 1; 396 + 397 + _ = try net.udpConnectToAddr(io, addr, .{ 398 + .cb = Connection.onCompletion, 399 + .msg = @intFromEnum(Connection.Msg.connect), 400 + .ptr = self, 401 + }); 402 + 403 + return; 404 + } 405 + 406 + self.attempt = 0; 407 + 408 + if (self.nameserver < self.config.nameservers.len) { 409 + const addr = self.config.nameservers[self.nameserver]; 410 + self.nameserver += 1; 411 + 412 + _ = try net.udpConnectToAddr(io, addr, .{ 413 + .cb = Connection.onCompletion, 414 + .msg = @intFromEnum(Connection.Msg.connect), 415 + .ptr = self, 416 + }); 417 + return; 418 + } 419 + 420 + defer self.gpa.destroy(self); 421 + try self.sendResult(io, .{ .userbytes = error.Timeout }); 422 + } 423 + 424 + pub fn onCompletion(io: *Ring, task: Task) anyerror!void { 425 + const self = task.userdataCast(Connection); 426 + const msg = task.msgToEnum(Connection.Msg); 427 + const result = task.result.?; 428 + 429 + switch (msg) { 430 + .connect => { 431 + const fd = result.userfd catch return self.tryNext(io); 432 + 433 + const recv_task = try io.recv(fd, &self.read_buffer, .{ 434 + .cb = Connection.onCompletion, 435 + .ptr = self, 436 + .msg = @intFromEnum(Connection.Msg.recv), 437 + }); 438 + try recv_task.setDeadline(io, .{ .sec = self.deadline }); 439 + 440 + const write_task = try io.write(fd, self.write_buffer.items, .{}); 441 + try write_task.setDeadline(io, .{ .sec = self.deadline }); 442 + }, 443 + 444 + .recv => { 445 + const n = result.recv catch { 446 + _ = try io.close(task.req.recv.fd, .{}); 447 + return self.tryNext(io); 448 + }; 449 + 450 + if (n == 0) { 451 + _ = try io.close(task.req.recv.fd, .{}); 452 + return self.tryNext(io); 453 + } 454 + 455 + try self.sendResult(io, .{ .userbytes = self.read_buffer[0..n] }); 456 + _ = try io.close(task.req.recv.fd, .{}); 457 + self.gpa.destroy(self); 458 + }, 459 + } 460 + } 461 + 462 + fn sendResult(self: *Connection, io: *Ring, result: ourio.Result) !void { 463 + defer self.write_buffer.deinit(self.gpa); 464 + const task: ourio.Task = .{ 465 + .callback = self.ctx.cb, 466 + .userdata = self.ctx.ptr, 467 + .msg = self.ctx.msg, 468 + .result = result, 469 + }; 470 + try self.ctx.cb(io, task); 471 + } 472 + 473 + fn writeQuestions(self: *Connection, queries: []const Question) !void { 474 + const header: Header = .{ .question_count = @intCast(queries.len) }; 475 + var writer = self.write_buffer.writer(self.gpa); 476 + try writer.writeAll(&header.asBytes()); 477 + 478 + for (queries) |query| { 479 + var iter = std.mem.splitScalar(u8, query.host, '.'); 480 + while (iter.next()) |val| { 481 + const len: u8 = @intCast(val.len); 482 + try writer.writeByte(len); 483 + try writer.writeAll(val); 484 + } 485 + try writer.writeByte(0x00); 486 + try writer.writeInt(u16, @intFromEnum(query.type), .big); 487 + try writer.writeInt(u16, @intFromEnum(query.class), .big); 488 + } 489 + } 490 + }; 491 + 492 + test "Resolver" { 493 + const Anon = struct { 494 + fn onOpen(_: *Task) ourio.Result { 495 + return .{ .open = 1 }; 496 + } 497 + 498 + fn onRead(task: *Task) ourio.Result { 499 + const @"resolv.conf" = 500 + \\nameserver 1.1.1.1 501 + \\nameserver 1.0.0.1 502 + \\options timeout:10 attempts:3 503 + ; 504 + @memcpy(task.req.read.buffer[0..@"resolv.conf".len], @"resolv.conf"); 505 + return .{ .read = @"resolv.conf".len }; 506 + } 507 + 508 + fn onClose(_: *Task) ourio.Result { 509 + return .{ .close = {} }; 510 + } 511 + 512 + fn onSocket(_: *Task) ourio.Result { 513 + return .{ .socket = 1 }; 514 + } 515 + 516 + fn onConnect(_: *Task) ourio.Result { 517 + return .{ .connect = {} }; 518 + } 519 + 520 + fn onRecv(_: *Task) ourio.Result { 521 + return .{ .recv = 1 }; 522 + } 523 + 524 + fn onWrite(task: *Task) ourio.Result { 525 + return .{ .write = task.req.write.buffer.len }; 526 + } 527 + }; 528 + 529 + var io: ourio.Ring = try .initMock(std.testing.allocator, 16); 530 + defer io.deinit(); 531 + 532 + io.backend.mock = .{ 533 + .open_cb = Anon.onOpen, 534 + .read_cb = Anon.onRead, 535 + .close_cb = Anon.onClose, 536 + .socket_cb = Anon.onSocket, 537 + .connect_cb = Anon.onConnect, 538 + .recv_cb = Anon.onRecv, 539 + .write_cb = Anon.onWrite, 540 + }; 541 + 542 + var resolver: Resolver = undefined; 543 + try resolver.init(std.testing.allocator, &io, .{}); 544 + defer resolver.deinit(); 545 + 546 + try std.testing.expectEqual(0, resolver.config.nameservers.len); 547 + try std.testing.expectEqual(5, resolver.config.attempts); 548 + try std.testing.expectEqual(30, resolver.config.timeout_s); 549 + 550 + try io.run(.until_done); 551 + 552 + try resolver.resolveQueries(&io, &.{ 553 + .{ .host = "timculverhouse.com" }, 554 + .{ .host = "timculverhouse.com", .type = .AAAA }, 555 + }, .{}); 556 + try io.run(.until_done); 557 + try std.testing.expectEqual(2, resolver.config.nameservers.len); 558 + try std.testing.expectEqual(3, resolver.config.attempts); 559 + try std.testing.expectEqual(10, resolver.config.timeout_s); 560 + } 561 + 562 + test "Header roundtrip" { 563 + const header: Header = .{ .question_count = 1 }; 564 + const bytes = header.asBytes(); 565 + const response: Response = .{ .bytes = &bytes }; 566 + const resp_header = response.header(); 567 + try std.testing.expectEqual(header, resp_header); 568 + }