atproto utils for zig zat.dev
atproto sdk zig
at main 593 lines 21 kB view raw
1//! jetstream client - AT Protocol event stream via WebSocket 2//! 3//! typed, reconnecting client for the Bluesky Jetstream service. 4//! parses commit, identity, and account events into typed structs. 5//! 6//! see: https://github.com/bluesky-social/jetstream 7 8const std = @import("std"); 9const websocket = @import("websocket"); 10const json_helpers = @import("../xrpc/json.zig"); 11const sync = @import("sync.zig"); 12 13const mem = std.mem; 14const json = std.json; 15const posix = std.posix; 16const Allocator = mem.Allocator; 17const log = std.log.scoped(.zat); 18 19pub const CommitAction = sync.CommitAction; 20pub const AccountStatus = sync.AccountStatus; 21 22pub const default_hosts = [_][]const u8{ 23 "jetstream1.us-east.bsky.network", 24 "jetstream2.us-east.bsky.network", 25 "jetstream1.us-west.bsky.network", 26 "jetstream2.us-west.bsky.network", 27 "jetstream.waow.tech", 28 "jetstream.fire.hose.cam", 29 "jet.firehose.stream", 30 "sfo.firehose.stream", 31 "nyc.firehose.stream", 32 "london.firehose.stream", 33 "frankfurt.firehose.stream", 34 "chennai.firehose.stream", 35}; 36 37pub const Options = struct { 38 hosts: []const []const u8 = &default_hosts, 39 wanted_collections: []const []const u8 = &.{}, 40 wanted_dids: []const []const u8 = &.{}, 41 cursor: ?i64 = null, 42 max_message_size: usize = 1024 * 1024, 43}; 44 45pub const Event = union(enum) { 46 commit: CommitEvent, 47 identity: IdentityEvent, 48 account: AccountEvent, 49 50 pub fn timeUs(self: Event) i64 { 51 return switch (self) { 52 inline else => |e| e.time_us, 53 }; 54 } 55}; 56 57pub const CommitEvent = struct { 58 did: []const u8, 59 time_us: i64, 60 rev: ?[]const u8 = null, 61 operation: CommitAction, 62 collection: []const u8, 63 rkey: []const u8, 64 record: ?json.Value = null, 65 cid: ?[]const u8 = null, 66}; 67 68pub const IdentityEvent = struct { 69 did: []const u8, 70 time_us: i64, 71 handle: ?[]const u8 = null, 72 seq: ?i64 = null, 73 time: ?[]const u8 = null, 74}; 75 76pub const AccountEvent = struct { 77 did: []const u8, 78 time_us: i64, 79 active: bool, 80 status: ?AccountStatus = null, 81 seq: ?i64 = null, 82 time: ?[]const u8 = null, 83}; 84 85/// parse a raw JSON payload into a typed Event. 86/// allocator is used for JSON structural data (ObjectMaps for record fields). 87/// string slices in the returned Event reference the source `payload` bytes. 88/// keep both `payload` and allocator-owned memory alive while using the Event. 89pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event { 90 const parsed = try json.parseFromSlice(json.Value, allocator, payload, .{}); 91 const root = parsed.value; 92 93 const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind; 94 const did = json_helpers.getString(root, "did") orelse return error.MissingDid; 95 const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs; 96 97 if (mem.eql(u8, kind_str, "commit")) { 98 const op_str = json_helpers.getString(root, "commit.operation") orelse return error.MissingOperation; 99 return .{ .commit = .{ 100 .did = did, 101 .time_us = time_us, 102 .operation = CommitAction.parse(op_str) orelse return error.UnknownOperation, 103 .collection = json_helpers.getString(root, "commit.collection") orelse return error.MissingCollection, 104 .rkey = json_helpers.getString(root, "commit.rkey") orelse return error.MissingRkey, 105 .rev = json_helpers.getString(root, "commit.rev"), 106 .cid = json_helpers.getString(root, "commit.cid"), 107 .record = json_helpers.getPath(root, "commit.record"), 108 } }; 109 } else if (mem.eql(u8, kind_str, "identity")) { 110 return .{ .identity = .{ 111 .did = did, 112 .time_us = time_us, 113 .handle = json_helpers.getString(root, "identity.handle"), 114 .seq = json_helpers.getInt(root, "identity.seq"), 115 .time = json_helpers.getString(root, "identity.time"), 116 } }; 117 } else if (mem.eql(u8, kind_str, "account")) { 118 const status_str = json_helpers.getString(root, "account.status"); 119 return .{ .account = .{ 120 .did = did, 121 .time_us = time_us, 122 .active = json_helpers.getBool(root, "account.active") orelse true, 123 .status = if (status_str) |s| AccountStatus.parse(s) else null, 124 .seq = json_helpers.getInt(root, "account.seq"), 125 .time = json_helpers.getString(root, "account.time"), 126 } }; 127 } 128 129 return error.UnknownKind; 130} 131 132pub const JetstreamClient = struct { 133 allocator: Allocator, 134 options: Options, 135 last_time_us: ?i64 = null, 136 137 pub fn init(allocator: Allocator, options: Options) JetstreamClient { 138 return .{ 139 .allocator = allocator, 140 .options = options, 141 .last_time_us = options.cursor, 142 }; 143 } 144 145 pub fn deinit(_: *JetstreamClient) void {} 146 147 /// subscribe with a user-provided handler. 148 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 149 /// optional: fn onError(*@TypeOf(handler), anyerror) void 150 /// optional: fn onConnect(*@TypeOf(handler), []const u8) void — called with host on connect 151 /// blocks forever — reconnects with exponential backoff on disconnect. 152 /// rotates through hosts on each reconnect attempt. 153 pub fn subscribe(self: *JetstreamClient, handler: anytype) void { 154 var backoff: u64 = 1; 155 var host_index: usize = 0; 156 const max_backoff: u64 = 60; 157 var prev_host_index: usize = 0; 158 159 while (true) { 160 const host = self.options.hosts[host_index % self.options.hosts.len]; 161 const effective_index = host_index % self.options.hosts.len; 162 163 // rewind cursor by 10s on host switch (different instances may lag) 164 if (host_index > 0 and effective_index != prev_host_index) { 165 if (self.last_time_us) |t| { 166 self.last_time_us = t - 10_000_000; 167 } 168 backoff = 1; 169 } 170 171 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host }); 172 173 self.connectAndRead(host, handler) catch |err| { 174 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 175 handler.onError(err); 176 } else { 177 log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 178 } 179 }; 180 181 prev_host_index = effective_index; 182 host_index += 1; 183 posix.nanosleep(backoff, 0); 184 backoff = @min(backoff * 2, max_backoff); 185 } 186 } 187 188 fn connectAndRead(self: *JetstreamClient, host: []const u8, handler: anytype) !void { 189 var path_buf: [2048]u8 = undefined; 190 const path = try self.buildSubscribePath(&path_buf); 191 192 log.info("connecting to wss://{s}{s}", .{ host, path }); 193 194 var client = try websocket.Client.init(self.allocator, .{ 195 .host = host, 196 .port = 443, 197 .tls = true, 198 .max_size = self.options.max_message_size, 199 }); 200 defer client.deinit(); 201 202 var host_header_buf: [256]u8 = undefined; 203 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 204 205 try client.handshake(path, .{ .headers = host_header }); 206 configureKeepalive(&client); 207 208 log.info("jetstream connected to {s}", .{host}); 209 210 if (comptime @hasDecl(@TypeOf(handler.*), "onConnect")) { 211 handler.onConnect(host); 212 } 213 214 var ws_handler = WsHandler(@TypeOf(handler.*)){ 215 .allocator = self.allocator, 216 .handler = handler, 217 .client_state = self, 218 }; 219 try client.readLoop(&ws_handler); 220 } 221 222 fn buildSubscribePath(self: *JetstreamClient, buf: *[2048]u8) ![]const u8 { 223 var w: std.Io.Writer = .fixed(buf); 224 225 try w.writeAll("/subscribe"); 226 227 var has_param = false; 228 229 for (self.options.wanted_collections) |col| { 230 try w.writeByte(if (!has_param) '?' else '&'); 231 try w.writeAll("wantedCollections="); 232 try w.writeAll(col); 233 has_param = true; 234 } 235 236 for (self.options.wanted_dids) |did| { 237 try w.writeByte(if (!has_param) '?' else '&'); 238 try w.writeAll("wantedDids="); 239 try w.writeAll(did); 240 has_param = true; 241 } 242 243 if (self.last_time_us) |cursor| { 244 try w.writeByte(if (!has_param) '?' else '&'); 245 try w.print("cursor={d}", .{cursor}); 246 } 247 248 return w.buffered(); 249 } 250}; 251 252fn WsHandler(comptime H: type) type { 253 return struct { 254 allocator: Allocator, 255 handler: *H, 256 client_state: *JetstreamClient, 257 258 const Self = @This(); 259 260 pub fn serverMessage(self: *Self, data: []const u8) !void { 261 var arena = std.heap.ArenaAllocator.init(self.allocator); 262 defer arena.deinit(); 263 264 const event = parseEvent(arena.allocator(), data) catch |err| { 265 log.debug("message parse error: {s}", .{@errorName(err)}); 266 return; 267 }; 268 269 self.client_state.last_time_us = event.timeUs(); 270 self.handler.onEvent(event); 271 } 272 273 pub fn close(_: *Self) void { 274 log.info("jetstream connection closed", .{}); 275 } 276 }; 277} 278 279/// enable TCP keepalive so reads don't block forever when a peer 280/// disappears without FIN/RST (network partition, crash, power loss). 281/// detection time: 10s idle + 5s × 2 probes = 20s. 282fn configureKeepalive(client: *websocket.Client) void { 283 const fd = client.stream.stream.handle; 284 const builtin = @import("builtin"); 285 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return; 286 const tcp: i32 = @intCast(posix.IPPROTO.TCP); 287 if (builtin.os.tag == .linux) { 288 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return; 289 } else if (builtin.os.tag == .macos) { 290 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return; 291 } 292 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return; 293 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return; 294} 295 296// === tests === 297 298test "parse commit event" { 299 const payload = 300 \\{ 301 \\ "did": "did:plc:abc123", 302 \\ "time_us": 1700000000000, 303 \\ "kind": "commit", 304 \\ "commit": { 305 \\ "rev": "3mbspmpaidl2a", 306 \\ "operation": "create", 307 \\ "collection": "app.bsky.feed.post", 308 \\ "rkey": "xyz789", 309 \\ "cid": "bafyreitest", 310 \\ "record": { 311 \\ "text": "hello world", 312 \\ "$type": "app.bsky.feed.post" 313 \\ } 314 \\ } 315 \\} 316 ; 317 318 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 319 defer arena.deinit(); 320 321 const event = try parseEvent(arena.allocator(), payload); 322 const commit = event.commit; 323 324 try std.testing.expectEqualStrings("did:plc:abc123", commit.did); 325 try std.testing.expectEqual(@as(i64, 1700000000000), commit.time_us); 326 try std.testing.expectEqualStrings("3mbspmpaidl2a", commit.rev.?); 327 try std.testing.expectEqual(CommitAction.create, commit.operation); 328 try std.testing.expectEqualStrings("app.bsky.feed.post", commit.collection); 329 try std.testing.expectEqualStrings("xyz789", commit.rkey); 330 try std.testing.expectEqualStrings("bafyreitest", commit.cid.?); 331 try std.testing.expect(commit.record != null); 332 try std.testing.expectEqualStrings("hello world", json_helpers.getString(commit.record.?, "text").?); 333} 334 335test "parse identity event" { 336 const payload = 337 \\{ 338 \\ "did": "did:plc:abc123", 339 \\ "time_us": 1700000000000, 340 \\ "kind": "identity", 341 \\ "identity": { 342 \\ "handle": "alice.bsky.social", 343 \\ "seq": 42, 344 \\ "time": "2024-01-01T00:00:00Z" 345 \\ } 346 \\} 347 ; 348 349 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 350 defer arena.deinit(); 351 352 const event = try parseEvent(arena.allocator(), payload); 353 const identity = event.identity; 354 355 try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 356 try std.testing.expectEqual(@as(i64, 1700000000000), identity.time_us); 357 try std.testing.expectEqualStrings("alice.bsky.social", identity.handle.?); 358 try std.testing.expectEqual(@as(i64, 42), identity.seq.?); 359 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", identity.time.?); 360} 361 362test "parse account event" { 363 const payload = 364 \\{ 365 \\ "did": "did:plc:abc123", 366 \\ "time_us": 1700000000000, 367 \\ "kind": "account", 368 \\ "account": { 369 \\ "active": false, 370 \\ "status": "suspended", 371 \\ "seq": 99, 372 \\ "time": "2024-01-01T00:00:00Z" 373 \\ } 374 \\} 375 ; 376 377 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 378 defer arena.deinit(); 379 380 const event = try parseEvent(arena.allocator(), payload); 381 const account = event.account; 382 383 try std.testing.expectEqualStrings("did:plc:abc123", account.did); 384 try std.testing.expectEqual(@as(i64, 1700000000000), account.time_us); 385 try std.testing.expectEqual(false, account.active); 386 try std.testing.expectEqual(AccountStatus.suspended, account.status.?); 387 try std.testing.expectEqual(@as(i64, 99), account.seq.?); 388 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", account.time.?); 389} 390 391test "parse unknown kind returns error" { 392 const payload = 393 \\{ 394 \\ "did": "did:plc:abc123", 395 \\ "time_us": 1700000000000, 396 \\ "kind": "unknown_kind" 397 \\} 398 ; 399 400 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 401 defer arena.deinit(); 402 403 try std.testing.expectError(error.UnknownKind, parseEvent(arena.allocator(), payload)); 404} 405 406test "parse commit with unknown operation returns error" { 407 const payload = 408 \\{ 409 \\ "did": "did:plc:abc123", 410 \\ "time_us": 1700000000000, 411 \\ "kind": "commit", 412 \\ "commit": { 413 \\ "operation": "archive", 414 \\ "collection": "app.bsky.feed.post", 415 \\ "rkey": "xyz789" 416 \\ } 417 \\} 418 ; 419 420 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 421 defer arena.deinit(); 422 423 try std.testing.expectError(error.UnknownOperation, parseEvent(arena.allocator(), payload)); 424} 425 426test "cursor tracking via time_us" { 427 const payloads = [_][]const u8{ 428 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"1"}} 429 , 430 \\{"did":"did:plc:b","time_us":200,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"2"}} 431 , 432 }; 433 434 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 435 defer arena.deinit(); 436 437 const e1 = try parseEvent(arena.allocator(), payloads[0]); 438 const e2 = try parseEvent(arena.allocator(), payloads[1]); 439 440 try std.testing.expect(e1.timeUs() > 0); 441 try std.testing.expect(e2.timeUs() > e1.timeUs()); 442} 443 444test "Event.timeUs works for all variants" { 445 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 446 defer arena.deinit(); 447 448 const commit = try parseEvent(arena.allocator(), 449 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"x","rkey":"1"}} 450 ); 451 const identity = try parseEvent(arena.allocator(), 452 \\{"did":"did:plc:a","time_us":200,"kind":"identity","identity":{}} 453 ); 454 const account = try parseEvent(arena.allocator(), 455 \\{"did":"did:plc:a","time_us":300,"kind":"account","account":{"active":true}} 456 ); 457 458 try std.testing.expectEqual(@as(i64, 100), commit.timeUs()); 459 try std.testing.expectEqual(@as(i64, 200), identity.timeUs()); 460 try std.testing.expectEqual(@as(i64, 300), account.timeUs()); 461} 462 463test "build subscribe path" { 464 var client = JetstreamClient.init(std.testing.allocator, .{ 465 .wanted_collections = &.{"app.bsky.feed.post"}, 466 }); 467 468 var buf: [2048]u8 = undefined; 469 const path = try client.buildSubscribePath(&buf); 470 try std.testing.expectEqualStrings("/subscribe?wantedCollections=app.bsky.feed.post", path); 471} 472 473test "build subscribe path with multiple params" { 474 var client = JetstreamClient.init(std.testing.allocator, .{ 475 .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.feed.like" }, 476 .wanted_dids = &.{"did:plc:abc123"}, 477 .cursor = 1700000000000, 478 }); 479 480 var buf: [2048]u8 = undefined; 481 const path = try client.buildSubscribePath(&buf); 482 try std.testing.expectEqualStrings( 483 "/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedDids=did:plc:abc123&cursor=1700000000000", 484 path, 485 ); 486} 487 488test "build subscribe path no params" { 489 var client = JetstreamClient.init(std.testing.allocator, .{}); 490 491 var buf: [2048]u8 = undefined; 492 const path = try client.buildSubscribePath(&buf); 493 try std.testing.expectEqualStrings("/subscribe", path); 494} 495 496test "parse commit event with delete operation" { 497 const payload = 498 \\{ 499 \\ "did": "did:plc:abc123", 500 \\ "time_us": 1700000000000, 501 \\ "kind": "commit", 502 \\ "commit": { 503 \\ "operation": "delete", 504 \\ "collection": "app.bsky.feed.post", 505 \\ "rkey": "xyz789" 506 \\ } 507 \\} 508 ; 509 510 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 511 defer arena.deinit(); 512 513 const commit = (try parseEvent(arena.allocator(), payload)).commit; 514 515 try std.testing.expectEqual(CommitAction.delete, commit.operation); 516 try std.testing.expect(commit.record == null); 517 try std.testing.expect(commit.rev == null); 518 try std.testing.expect(commit.cid == null); 519} 520 521test "parse identity event with minimal fields" { 522 const payload = 523 \\{ 524 \\ "did": "did:plc:abc123", 525 \\ "time_us": 1700000000000, 526 \\ "kind": "identity", 527 \\ "identity": {} 528 \\} 529 ; 530 531 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 532 defer arena.deinit(); 533 534 const identity = (try parseEvent(arena.allocator(), payload)).identity; 535 536 try std.testing.expectEqualStrings("did:plc:abc123", identity.did); 537 try std.testing.expect(identity.handle == null); 538 try std.testing.expect(identity.seq == null); 539 try std.testing.expect(identity.time == null); 540} 541 542test "parse missing did returns error" { 543 const payload = 544 \\{ 545 \\ "time_us": 1700000000000, 546 \\ "kind": "commit" 547 \\} 548 ; 549 550 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 551 defer arena.deinit(); 552 553 try std.testing.expectError(error.MissingDid, parseEvent(arena.allocator(), payload)); 554} 555 556test "default hosts contains known jetstream instances" { 557 try std.testing.expectEqual(@as(usize, 12), default_hosts.len); 558 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", default_hosts[0]); 559 try std.testing.expectEqualStrings("jetstream2.us-east.bsky.network", default_hosts[1]); 560 try std.testing.expectEqualStrings("jetstream1.us-west.bsky.network", default_hosts[2]); 561 try std.testing.expectEqualStrings("jetstream2.us-west.bsky.network", default_hosts[3]); 562 try std.testing.expectEqualStrings("jetstream.waow.tech", default_hosts[4]); 563 try std.testing.expectEqualStrings("jetstream.fire.hose.cam", default_hosts[5]); 564 try std.testing.expectEqualStrings("jet.firehose.stream", default_hosts[6]); 565 try std.testing.expectEqualStrings("chennai.firehose.stream", default_hosts[11]); 566} 567 568test "round-robin cycles through hosts" { 569 const hosts = [_][]const u8{ "host-a", "host-b", "host-c" }; 570 // simulate the index logic from subscribe() 571 for (0..9) |i| { 572 const host = hosts[i % hosts.len]; 573 const expected: []const u8 = switch (i % 3) { 574 0 => "host-a", 575 1 => "host-b", 576 2 => "host-c", 577 else => unreachable, 578 }; 579 try std.testing.expectEqualStrings(expected, host); 580 } 581} 582 583test "options default hosts are used" { 584 const opts = Options{}; 585 try std.testing.expectEqual(@as(usize, 12), opts.hosts.len); 586 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", opts.hosts[0]); 587} 588 589test "options custom single host" { 590 const opts = Options{ .hosts = &.{"my-custom-host.example.com"} }; 591 try std.testing.expectEqual(@as(usize, 1), opts.hosts.len); 592 try std.testing.expectEqualStrings("my-custom-host.example.com", opts.hosts[0]); 593}