this repo has no description

jetstream: properly handle ping, add example

rockorager.dev 414e2a79 6fac37f8

verified
Changed files
+129 -8
example
src
+23
build.zig
··· 22 22 23 23 const test_step = b.step("test", "Run unit tests"); 24 24 test_step.dependOn(&run_lib_unit_tests.step); 25 + 26 + // Example 27 + const example_mod = b.createModule(.{ 28 + .root_source_file = b.path("example/stream.zig"), 29 + .target = target, 30 + .optimize = optimize, 31 + }); 32 + example_mod.addImport("ourio", ourio_dep.module("ourio")); 33 + example_mod.addImport("stda", ourio_dep.module("stda")); 34 + example_mod.addImport("atproto", lib_mod); 35 + const exe = b.addExecutable(.{ 36 + .name = "stream", 37 + .root_module = example_mod, 38 + }); 39 + const run_cmd = b.addRunArtifact(exe); 40 + run_cmd.step.dependOn(b.getInstallStep()); 41 + 42 + if (b.args) |args| { 43 + run_cmd.addArgs(args); 44 + } 45 + 46 + const run_step = b.step("run", "Run the app"); 47 + run_step.dependOn(&run_cmd.step); 25 48 }
+65
example/stream.zig
··· 1 + const std = @import("std"); 2 + const builtin = @import("builtin"); 3 + const ourio = @import("ourio"); 4 + const atproto = @import("atproto"); 5 + const stda = @import("stda"); 6 + 7 + const dns = stda.net.dns; 8 + 9 + pub fn main() !void { 10 + var debug_allocator: std.heap.DebugAllocator(.{}) = .init; 11 + const gpa, const is_debug = gpa: { 12 + break :gpa switch (builtin.mode) { 13 + .Debug, .ReleaseSafe => .{ debug_allocator.allocator(), true }, 14 + .ReleaseFast, .ReleaseSmall => .{ std.heap.smp_allocator, false }, 15 + }; 16 + }; 17 + defer if (is_debug) { 18 + _ = debug_allocator.deinit(); 19 + }; 20 + 21 + var io: ourio.Ring = try .init(gpa, 64); 22 + defer io.deinit(); 23 + 24 + var bundle: std.crypto.Certificate.Bundle = .{}; 25 + try bundle.rescan(gpa); 26 + defer bundle.deinit(gpa); 27 + 28 + var resolver: dns.Resolver = undefined; 29 + try resolver.init(gpa, &io, .{}); 30 + try io.run(.until_done); 31 + 32 + var counter: Counter = .{}; 33 + 34 + var jetstream: atproto.Jetstream = undefined; 35 + try jetstream.init(gpa, &io, bundle, .{}, &resolver, .{ .cb = print, .msg = 1, .ptr = &counter }); 36 + 37 + _ = try io.timer(.{ .sec = 1 }, .{ .cb = print, .msg = 2, .ptr = &counter }); 38 + 39 + try io.run(.until_done); 40 + } 41 + 42 + const Counter = struct { 43 + tick: u8 = 0, 44 + count: usize = 0, 45 + }; 46 + 47 + fn print(io: *ourio.Ring, task: ourio.Task) anyerror!void { 48 + const counter = task.userdataCast(Counter); 49 + 50 + switch (task.msg) { 51 + 1 => { 52 + const result = task.result.?; 53 + _ = try result.userptr; 54 + counter.count += 1; 55 + }, 56 + 2 => { 57 + const writer = std.io.getStdOut().writer(); 58 + try writer.writeAll("\r\x1b[2K"); 59 + try writer.print("{d} events per second", .{counter.count}); 60 + counter.count = 0; 61 + _ = try io.timer(.{ .sec = 1 }, .{ .cb = print, .msg = 2, .ptr = counter }); 62 + }, 63 + else => {}, 64 + } 65 + }
+40 -8
src/jetstream.zig
··· 311 311 defer _ = arena.reset(.retain_capacity); 312 312 switch (data) { 313 313 .text => |s| { 314 - const event: Event = .{ .raw = s, .value = try json.parseFromSliceLeaky( 315 - Event, 314 + var event: Event = .{ .raw = s, .value = try json.parseFromSliceLeaky( 315 + json.Value, 316 316 arena.allocator(), 317 317 s, 318 318 .{ .allocate = .alloc_if_needed }, ··· 326 326 }; 327 327 try self.ctx.cb(io, task); 328 328 }, 329 - else => return error.UnsupportedOp, 329 + 330 + .close => { 331 + const task: ourio.Task = .{ 332 + .callback = self.ctx.cb, 333 + .userdata = self.ctx.ptr, 334 + .msg = self.ctx.msg, 335 + .result = .{ .userptr = error.ConnectionResetByPeer }, 336 + }; 337 + try self.ctx.cb(io, task); 338 + }, 339 + 340 + .ping => |v| { 341 + const byte1: FrameIterator.Byte1 = .{ 342 + .final = true, 343 + .reserved = 0, 344 + .opcode = .pong, 345 + }; 346 + const byte2: FrameIterator.Byte2 = .{ 347 + .len = @intCast(v.len), 348 + .mask = true, 349 + }; 350 + const mask: []const u8 = &.{ 0x12, 0x34, 0x56, 0x78 }; 351 + try self.state.conn.write(self.gpa, &.{ @bitCast(byte1), @bitCast(byte2) }); 352 + try self.state.conn.write(self.gpa, mask); 353 + // TODO: actually apply the mask. As of 2025-05-21 the jetstream doesn't send a 354 + // ping payload so it doesn't matter 355 + try self.state.conn.write(self.gpa, v); 356 + try self.state.conn.flush(self.gpa, io); 357 + }, 358 + 359 + else => { 360 + return error.UnsupportedOp; 361 + }, 330 362 } 331 363 } 332 364 ··· 364 396 const value = self.value.object.get("kind").?; 365 397 const k = std.meta.stringToEnum(KindEnum, value.string).?; 366 398 switch (k) { 367 - .commit => return .{ .commit = self.value.object.get("commit").? }, 368 - .identity => return .{ .identity = self.value.object.get("identity").? }, 369 - .account => return .{ .account = self.value.object.get("account").? }, 399 + .commit => return .{ .commit = .{ .value = self.value.object.get("commit").? } }, 400 + .identity => return .{ .identity = .{ .value = self.value.object.get("identity").? } }, 401 + .account => return .{ .account = .{ .value = self.value.object.get("account").? } }, 370 402 } 371 403 } 372 404 ··· 458 490 text: []const u8, 459 491 binary: []const u8, 460 492 close, 461 - ping, 493 + ping: []const u8, 462 494 pong, 463 495 }; 464 496 ··· 524 556 .text => return .{ .text = self.bytes[data_start..end] }, 525 557 .binary => return .{ .binary = self.bytes[data_start..end] }, 526 558 .close => return .close, 527 - .ping => return .ping, 559 + .ping => return .{ .ping = self.bytes[data_start..end] }, 528 560 .pong => return .pong, 529 561 else => return error.InvalidOpcode, 530 562 }
+1
src/root.zig
··· 6 6 7 7 pub const Did = did.Did; 8 8 pub const Jetstream = jetstream.Stream; 9 + pub const JetstreamEvent = jetstream.Event; 9 10 10 11 test { 11 12 _ = @import("did.zig");