zig library for atproto applications
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: use async dns resolution

rockorager.dev 14724178 07e32988

verified
+43 -6
+2 -2
build.zig.zon
··· 8 8 9 9 .dependencies = .{ 10 10 .ourio = .{ 11 - .url = "/home/tim/repos/github.com/rockorager/ourio", 12 - .hash = "ourio-0.0.0-_s-z0SsPAgB0SDzLiMiEKWY9MrHCrdVEPL01OQF4uVcm", 11 + .url = "git+https://github.com/rockorager/ourio#54a2eae0fe28d5258b0982ea66b77e3d340e376c", 12 + .hash = "ourio-0.0.0-_s-z0cQPAgAbjkkQLaiiDRRRYUZlSyYLQhXwZ_J32dj2", 13 13 }, 14 14 }, 15 15
+41 -4
src/jetstream.zig
··· 3 3 const ourio = @import("ourio"); 4 4 5 5 const Allocator = std.mem.Allocator; 6 + const dns = stda.net.dns; 6 7 const posix = std.posix; 7 8 8 9 /// A stream of events from the jetstream ··· 18 19 challenge: []const u8, 19 20 20 21 state: union(enum) { 22 + dns, 21 23 connect: *stda.net.ConnectTask, 22 24 handshake: *stda.tls.Client.HandshakeTask, 23 25 conn: *stda.tls.Client, ··· 33 35 const guid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 34 36 35 37 const Msg = enum { 38 + dns, 36 39 connect, 37 40 handshake, 38 41 upgrade, ··· 62 65 io: *ourio.Ring, 63 66 bundle: std.crypto.Certificate.Bundle, 64 67 opts: Options, 68 + resolver: *dns.Resolver, 65 69 ctx: ourio.Context, 66 70 ) !void { 67 71 var seed: u64 = undefined; ··· 69 73 var prng = std.Random.DefaultPrng.init(seed); 70 74 const idx = prng.random().int(u2); 71 75 const host = hosts[idx]; 72 - const task = try stda.net.tcpConnectToHost(io, host, 443, .{ 76 + const question: dns.Question = .{ 77 + .host = host, 78 + .type = .A, 79 + }; 80 + 81 + try resolver.resolveQuery(io, question, .{ 73 82 .ptr = self, 74 - .msg = @intFromEnum(Msg.connect), 83 + .msg = @intFromEnum(Msg.dns), 75 84 .cb = Stream.onCompletion, 76 85 }); 77 86 ··· 123 132 .gpa = gpa, 124 133 .fd = -1, 125 134 .bundle = bundle, 126 - .state = .{ .connect = task }, 135 + .state = .dns, 127 136 .challenge = "", 128 137 .host = idx, 129 138 .path = path, ··· 135 144 self.gpa.free(self.path); 136 145 self.gpa.free(self.challenge); 137 146 switch (self.state) { 147 + .dns => {}, 138 148 .connect => {}, 139 149 .handshake => {}, 140 150 .conn => |conn| { ··· 159 169 const result = task.result.?; 160 170 161 171 switch (task.msgToEnum(Msg)) { 172 + .dns => { 173 + const bytes = try result.userbytes; 174 + const resp: dns.Response = .{ .bytes = bytes }; 175 + var iter = try resp.answerIterator(); 176 + while (iter.next()) |answer| { 177 + switch (answer) { 178 + .A => |ip4| { 179 + const addr = std.net.Ip4Address.init(ip4, 443); 180 + const conn_task = try stda.net.tcpConnectToAddr(io, .{ .in = addr }, .{ 181 + .ptr = self, 182 + .msg = @intFromEnum(Msg.connect), 183 + .cb = Stream.onCompletion, 184 + }); 185 + self.state = .{ .connect = conn_task }; 186 + }, 187 + 188 + .CNAME => {}, 189 + 190 + else => return error.Unexpected, 191 + } 192 + } 193 + }, 194 + 162 195 .connect => { 163 196 self.fd = result.userfd catch |err| return self.reportError(io, err); 164 197 const hs_task = try stda.tls.Client.init( ··· 372 405 var io: ourio.Ring = try .init(gpa, 16); 373 406 defer io.deinit(); 374 407 408 + var resolver: stda.net.dns.Resolver = undefined; 409 + try resolver.init(gpa, &io, .{}); 410 + try io.run(.until_done); 411 + 375 412 var stream: Stream = undefined; 376 - try stream.init(gpa, &io, bundle, .{}, .{ .cb = testHandler }); 413 + try stream.init(gpa, &io, bundle, .{}, &resolver, .{ .cb = testHandler }); 377 414 defer stream.deinit(); 378 415 379 416 try io.run(.until_done);