atproto utils for zig zat.dev
atproto sdk zig
at main 749 lines 27 kB view raw
1//! firehose codec - com.atproto.sync.subscribeRepos 2//! 3//! encode and decode AT Protocol firehose events over WebSocket. messages are 4//! DAG-CBOR encoded (unlike jetstream, which is JSON). includes frame encoding/ 5//! decoding, CAR block packing, and CID creation for records. 6//! 7//! wire format per frame: 8//! [DAG-CBOR header: {op, t}] [DAG-CBOR payload: {seq, repo, ops, blocks, ...}] 9//! 10//! see: https://atproto.com/specs/event-stream 11 12const std = @import("std"); 13const websocket = @import("websocket"); 14const cbor = @import("../repo/cbor.zig"); 15const car = @import("../repo/car.zig"); 16const sync = @import("sync.zig"); 17 18const mem = std.mem; 19const Allocator = mem.Allocator; 20const posix = std.posix; 21const log = std.log.scoped(.zat); 22 23pub const CommitAction = sync.CommitAction; 24pub const AccountStatus = sync.AccountStatus; 25 26pub const default_hosts = [_][]const u8{ 27 "bsky.network", 28 "northamerica.firehose.network", 29 "europe.firehose.network", 30 "asia.firehose.network", 31}; 32 33pub const Options = struct { 34 hosts: []const []const u8 = &default_hosts, 35 cursor: ?i64 = null, 36 max_message_size: usize = 5 * 1024 * 1024, // 5MB — firehose frames can be large 37}; 38 39/// decoded firehose event 40pub const Event = union(enum) { 41 commit: CommitEvent, 42 identity: IdentityEvent, 43 account: AccountEvent, 44 info: InfoEvent, 45 46 pub fn seq(self: Event) ?i64 { 47 return switch (self) { 48 .commit => |c| c.seq, 49 .identity => |i| i.seq, 50 .account => |a| a.seq, 51 .info => null, 52 }; 53 } 54}; 55 56pub const CommitEvent = struct { 57 seq: i64, 58 repo: []const u8, // DID 59 rev: []const u8, // TID — revision of the commit 60 time: []const u8, // datetime — when event was received 61 since: ?[]const u8 = null, // TID — rev of preceding commit (null = full repo export) 62 commit: ?cbor.Cid = null, // CID of the commit object 63 ops: []const RepoOp, 64 blobs: []const cbor.Cid = &.{}, // new blobs referenced by records in this commit 65 too_big: bool = false, 66}; 67 68pub const RepoOp = struct { 69 action: CommitAction, 70 collection: []const u8, 71 rkey: []const u8, 72 cid: ?cbor.Cid = null, // CID of the record (null for deletes) 73 record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block 74}; 75 76pub const IdentityEvent = struct { 77 seq: i64, 78 did: []const u8, 79 time: []const u8, // datetime — when event was received 80 handle: ?[]const u8 = null, 81}; 82 83pub const AccountEvent = struct { 84 seq: i64, 85 did: []const u8, 86 time: []const u8, // datetime — when event was received 87 active: bool = true, 88 status: ?AccountStatus = null, 89}; 90 91pub const InfoEvent = struct { 92 name: ?[]const u8 = null, 93 message: ?[]const u8 = null, 94}; 95 96/// frame header from the wire 97const FrameHeader = struct { 98 op: i64, 99 t: ?[]const u8 = null, 100}; 101 102pub const FrameOp = enum(i64) { 103 message = 1, 104 err = -1, 105}; 106 107pub const DecodeError = error{ 108 InvalidFrame, 109 InvalidHeader, 110 UnexpectedEof, 111 MissingField, 112 UnknownOp, 113 UnknownEventType, 114} || cbor.DecodeError || car.CarError; 115 116/// decode a raw WebSocket binary frame into a firehose Event 117pub fn decodeFrame(allocator: Allocator, data: []const u8) DecodeError!Event { 118 // frame = [CBOR header] [CBOR payload] concatenated 119 const header_result = try cbor.decode(allocator, data); 120 const header_val = header_result.value; 121 const payload_data = data[header_result.consumed..]; 122 123 // parse header 124 const op = header_val.getInt("op") orelse return error.InvalidHeader; 125 if (op == -1) return error.UnknownOp; // error frame 126 127 const t = header_val.getString("t") orelse return error.InvalidHeader; 128 129 // decode payload 130 const payload = try cbor.decodeAll(allocator, payload_data); 131 132 if (mem.eql(u8, t, "#commit")) { 133 return try decodeCommit(allocator, payload); 134 } else if (mem.eql(u8, t, "#identity")) { 135 return decodeIdentity(payload); 136 } else if (mem.eql(u8, t, "#account")) { 137 return decodeAccount(payload); 138 } else if (mem.eql(u8, t, "#info")) { 139 return .{ .info = .{ 140 .name = payload.getString("name"), 141 .message = payload.getString("message"), 142 } }; 143 } 144 145 return error.UnknownEventType; 146} 147 148fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event { 149 const seq_val = payload.getInt("seq") orelse return error.MissingField; 150 const repo = payload.getString("repo") orelse return error.MissingField; 151 const rev = payload.getString("rev") orelse return error.MissingField; 152 const time = payload.getString("time") orelse return error.MissingField; 153 154 // parse commit CID 155 var commit_cid: ?cbor.Cid = null; 156 if (payload.get("commit")) |commit_val| { 157 switch (commit_val) { 158 .cid => |c| commit_cid = c, 159 else => {}, 160 } 161 } 162 163 // parse blobs array (array of CID links) 164 var blobs: std.ArrayList(cbor.Cid) = .{}; 165 if (payload.getArray("blobs")) |blob_values| { 166 for (blob_values) |blob_val| { 167 switch (blob_val) { 168 .cid => |c| try blobs.append(allocator, c), 169 else => {}, 170 } 171 } 172 } 173 174 // parse CAR blocks 175 const blocks_bytes = payload.getBytes("blocks"); 176 var parsed_car: ?car.Car = null; 177 if (blocks_bytes) |b| { 178 parsed_car = car.read(allocator, b) catch null; 179 } 180 181 // parse ops 182 const ops_array = payload.getArray("ops"); 183 var ops: std.ArrayList(RepoOp) = .{}; 184 185 if (ops_array) |op_values| { 186 for (op_values) |op_val| { 187 const action_str = op_val.getString("action") orelse continue; 188 const action = CommitAction.parse(action_str) orelse continue; 189 const path = op_val.getString("path") orelse continue; 190 191 // split path into collection/rkey 192 const slash = mem.indexOfScalar(u8, path, '/') orelse continue; 193 const collection = path[0..slash]; 194 const rkey = path[slash + 1 ..]; 195 196 // extract CID from op and look up record from CAR blocks 197 var op_cid: ?cbor.Cid = null; 198 var record: ?cbor.Value = null; 199 if (op_val.get("cid")) |cid_val| { 200 switch (cid_val) { 201 .cid => |cid| { 202 op_cid = cid; 203 if (parsed_car) |c| { 204 if (car.findBlock(c, cid.raw)) |block_data| { 205 record = cbor.decodeAll(allocator, block_data) catch null; 206 } 207 } 208 }, 209 else => {}, 210 } 211 } 212 213 try ops.append(allocator, .{ 214 .action = action, 215 .collection = collection, 216 .rkey = rkey, 217 .cid = op_cid, 218 .record = record, 219 }); 220 } 221 } 222 223 return .{ .commit = .{ 224 .seq = seq_val, 225 .repo = repo, 226 .rev = rev, 227 .time = time, 228 .since = payload.getString("since"), 229 .commit = commit_cid, 230 .ops = try ops.toOwnedSlice(allocator), 231 .blobs = try blobs.toOwnedSlice(allocator), 232 .too_big = payload.getBool("tooBig") orelse false, 233 } }; 234} 235 236fn decodeIdentity(payload: cbor.Value) DecodeError!Event { 237 return .{ .identity = .{ 238 .seq = payload.getInt("seq") orelse return error.MissingField, 239 .did = payload.getString("did") orelse return error.MissingField, 240 .time = payload.getString("time") orelse return error.MissingField, 241 .handle = payload.getString("handle"), 242 } }; 243} 244 245fn decodeAccount(payload: cbor.Value) DecodeError!Event { 246 const status_str = payload.getString("status"); 247 return .{ .account = .{ 248 .seq = payload.getInt("seq") orelse return error.MissingField, 249 .did = payload.getString("did") orelse return error.MissingField, 250 .time = payload.getString("time") orelse return error.MissingField, 251 .active = payload.getBool("active") orelse true, 252 .status = if (status_str) |s| AccountStatus.parse(s) else null, 253 } }; 254} 255 256// === encoder === 257 258/// encode a firehose Event into a wire frame: [DAG-CBOR header] [DAG-CBOR payload] 259pub fn encodeFrame(allocator: Allocator, event: Event) ![]u8 { 260 var list: std.ArrayList(u8) = .{}; 261 errdefer list.deinit(allocator); 262 const writer = list.writer(allocator); 263 264 const tag = switch (event) { 265 .commit => "#commit", 266 .identity => "#identity", 267 .account => "#account", 268 .info => "#info", 269 }; 270 271 // encode header: {op: 1, t: "#..."} 272 const header: cbor.Value = .{ .map = &.{ 273 .{ .key = "op", .value = .{ .unsigned = 1 } }, 274 .{ .key = "t", .value = .{ .text = tag } }, 275 } }; 276 try cbor.encode(allocator, writer, header); 277 278 // encode payload based on event type 279 switch (event) { 280 .commit => |c| try encodeCommitPayload(allocator, writer, c), 281 .identity => |i| try encodeIdentityPayload(allocator, writer, i), 282 .account => |a| try encodeAccountPayload(allocator, writer, a), 283 .info => |inf| try encodeInfoPayload(allocator, writer, inf), 284 } 285 286 return try list.toOwnedSlice(allocator); 287} 288 289fn encodeCommitPayload(allocator: Allocator, writer: anytype, commit: CommitEvent) !void { 290 // build ops array and CAR blocks simultaneously 291 var op_values: std.ArrayList(cbor.Value) = .{}; 292 defer op_values.deinit(allocator); 293 var car_blocks: std.ArrayList(car.Block) = .{}; 294 defer car_blocks.deinit(allocator); 295 var root_cids: std.ArrayList(cbor.Cid) = .{}; 296 defer root_cids.deinit(allocator); 297 298 for (commit.ops) |op| { 299 const action_str: []const u8 = @tagName(op.action); 300 const path = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ op.collection, op.rkey }); 301 302 if (op.record) |record| { 303 // encode record, create CID, add to CAR blocks 304 const record_bytes = try cbor.encodeAlloc(allocator, record); 305 const cid = try cbor.Cid.forDagCbor(allocator, record_bytes); 306 307 try car_blocks.append(allocator, .{ 308 .cid_raw = cid.raw, 309 .data = record_bytes, 310 }); 311 312 if (root_cids.items.len == 0) { 313 try root_cids.append(allocator, cid); 314 } 315 316 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{ 317 .{ .key = "action", .value = .{ .text = action_str } }, 318 .{ .key = "cid", .value = .{ .cid = cid } }, 319 .{ .key = "path", .value = .{ .text = path } }, 320 }) }); 321 } else { 322 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{ 323 .{ .key = "action", .value = .{ .text = action_str } }, 324 .{ .key = "path", .value = .{ .text = path } }, 325 }) }); 326 } 327 } 328 329 // build CAR file from blocks 330 const car_data = car.Car{ 331 .roots = root_cids.items, 332 .blocks = car_blocks.items, 333 }; 334 const blocks_bytes = try car.writeAlloc(allocator, car_data); 335 336 // build blobs array 337 var blob_values: std.ArrayList(cbor.Value) = .{}; 338 defer blob_values.deinit(allocator); 339 for (commit.blobs) |blob| { 340 try blob_values.append(allocator, .{ .cid = blob }); 341 } 342 343 // build payload entries 344 var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 345 defer entries.deinit(allocator); 346 347 try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } }); 348 if (commit.commit) |c| { 349 try entries.append(allocator, .{ .key = "commit", .value = .{ .cid = c } }); 350 } 351 try entries.append(allocator, .{ .key = "blobs", .value = .{ .array = blob_values.items } }); 352 try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } }); 353 try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } }); 354 try entries.append(allocator, .{ .key = "rev", .value = .{ .text = commit.rev } }); 355 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } }); 356 if (commit.since) |s| { 357 try entries.append(allocator, .{ .key = "since", .value = .{ .text = s } }); 358 } 359 try entries.append(allocator, .{ .key = "time", .value = .{ .text = commit.time } }); 360 if (commit.too_big) { 361 try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } }); 362 } 363 364 try cbor.encode(allocator, writer, .{ .map = entries.items }); 365} 366 367fn encodeIdentityPayload(allocator: Allocator, writer: anytype, identity: IdentityEvent) !void { 368 var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 369 defer entries.deinit(allocator); 370 371 try entries.append(allocator, .{ .key = "did", .value = .{ .text = identity.did } }); 372 if (identity.handle) |h| { 373 try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } }); 374 } 375 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } }); 376 try entries.append(allocator, .{ .key = "time", .value = .{ .text = identity.time } }); 377 378 try cbor.encode(allocator, writer, .{ .map = entries.items }); 379} 380 381fn encodeAccountPayload(allocator: Allocator, writer: anytype, account: AccountEvent) !void { 382 var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 383 defer entries.deinit(allocator); 384 385 if (!account.active) { 386 try entries.append(allocator, .{ .key = "active", .value = .{ .boolean = false } }); 387 } 388 try entries.append(allocator, .{ .key = "did", .value = .{ .text = account.did } }); 389 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(account.seq) } }); 390 if (account.status) |s| { 391 try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } }); 392 } 393 try entries.append(allocator, .{ .key = "time", .value = .{ .text = account.time } }); 394 395 try cbor.encode(allocator, writer, .{ .map = entries.items }); 396} 397 398fn encodeInfoPayload(allocator: Allocator, writer: anytype, info: InfoEvent) !void { 399 var entries: std.ArrayList(cbor.Value.MapEntry) = .{}; 400 defer entries.deinit(allocator); 401 402 if (info.message) |m| { 403 try entries.append(allocator, .{ .key = "message", .value = .{ .text = m } }); 404 } 405 if (info.name) |n| { 406 try entries.append(allocator, .{ .key = "name", .value = .{ .text = n } }); 407 } 408 409 try cbor.encode(allocator, writer, .{ .map = entries.items }); 410} 411 412pub const FirehoseClient = struct { 413 allocator: Allocator, 414 options: Options, 415 last_seq: ?i64 = null, 416 417 pub fn init(allocator: Allocator, options: Options) FirehoseClient { 418 return .{ 419 .allocator = allocator, 420 .options = options, 421 .last_seq = if (options.cursor) |c| c else null, 422 }; 423 } 424 425 pub fn deinit(_: *FirehoseClient) void {} 426 427 /// subscribe with a user-provided handler. 428 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void 429 /// optional: fn onError(*@TypeOf(handler), anyerror) void 430 /// blocks forever — reconnects with exponential backoff on disconnect. 431 /// rotates through hosts on each reconnect attempt. 432 pub fn subscribe(self: *FirehoseClient, handler: anytype) void { 433 var backoff: u64 = 1; 434 var host_index: usize = 0; 435 const max_backoff: u64 = 60; 436 var prev_host_index: usize = 0; 437 438 while (true) { 439 const host = self.options.hosts[host_index % self.options.hosts.len]; 440 const effective_index = host_index % self.options.hosts.len; 441 442 // reset backoff on host switch (fresh host deserves a fresh chance) 443 if (host_index > 0 and effective_index != prev_host_index) { 444 backoff = 1; 445 } 446 447 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host }); 448 449 self.connectAndRead(host, handler) catch |err| { 450 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) { 451 handler.onError(err); 452 } else { 453 log.err("firehose error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff }); 454 } 455 }; 456 457 prev_host_index = effective_index; 458 host_index += 1; 459 posix.nanosleep(backoff, 0); 460 backoff = @min(backoff * 2, max_backoff); 461 } 462 } 463 464 fn connectAndRead(self: *FirehoseClient, host: []const u8, handler: anytype) !void { 465 var path_buf: [256]u8 = undefined; 466 var w: std.Io.Writer = .fixed(&path_buf); 467 468 try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos"); 469 if (self.last_seq) |cursor| { 470 try w.print("?cursor={d}", .{cursor}); 471 } 472 const path = w.buffered(); 473 474 log.info("connecting to wss://{s}{s}", .{ host, path }); 475 476 var client = try websocket.Client.init(self.allocator, .{ 477 .host = host, 478 .port = 443, 479 .tls = true, 480 .max_size = self.options.max_message_size, 481 }); 482 defer client.deinit(); 483 484 var host_header_buf: [256]u8 = undefined; 485 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 486 487 try client.handshake(path, .{ .headers = host_header }); 488 configureKeepalive(&client); 489 490 log.info("firehose connected to {s}", .{host}); 491 492 var ws_handler = WsHandler(@TypeOf(handler.*)){ 493 .allocator = self.allocator, 494 .handler = handler, 495 .client_state = self, 496 }; 497 try client.readLoop(&ws_handler); 498 } 499}; 500 501fn WsHandler(comptime H: type) type { 502 return struct { 503 allocator: Allocator, 504 handler: *H, 505 client_state: *FirehoseClient, 506 507 const Self = @This(); 508 509 pub fn serverMessage(self: *Self, data: []const u8) !void { 510 var arena = std.heap.ArenaAllocator.init(self.allocator); 511 defer arena.deinit(); 512 513 const event = decodeFrame(arena.allocator(), data) catch |err| { 514 log.debug("frame decode error: {s}", .{@errorName(err)}); 515 return; 516 }; 517 518 if (event.seq()) |s| { 519 self.client_state.last_seq = s; 520 } 521 522 self.handler.onEvent(event); 523 } 524 525 pub fn close(_: *Self) void { 526 log.info("firehose connection closed", .{}); 527 } 528 }; 529} 530 531/// enable TCP keepalive so reads don't block forever when a peer 532/// disappears without FIN/RST (network partition, crash, power loss). 533/// detection time: 10s idle + 5s × 2 probes = 20s. 534fn configureKeepalive(client: *websocket.Client) void { 535 const fd = client.stream.stream.handle; 536 const builtin = @import("builtin"); 537 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return; 538 const tcp: i32 = @intCast(posix.IPPROTO.TCP); 539 if (builtin.os.tag == .linux) { 540 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return; 541 } else if (builtin.os.tag == .macos) { 542 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return; 543 } 544 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return; 545 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return; 546} 547 548// === tests === 549 550test "decode frame header" { 551 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 552 defer arena.deinit(); 553 const alloc = arena.allocator(); 554 555 // simulate a frame: header {op: 1, t: "#info"} + payload {name: "OutdatedCursor"} 556 const header_bytes = [_]u8{ 557 0xa2, // map(2) 558 0x62, 'o', 'p', 0x01, // "op": 1 559 0x61, 't', 0x65, '#', 'i', 'n', 'f', 'o', // "t": "#info" 560 }; 561 const payload_bytes = [_]u8{ 562 0xa1, // map(1) 563 0x64, 'n', 'a', 'm', 'e', // "name" 564 0x6e, 'O', 'u', 't', 'd', 'a', 't', 'e', 'd', 'C', 'u', 'r', 's', 'o', 'r', // "OutdatedCursor" 565 }; 566 567 var frame: [header_bytes.len + payload_bytes.len]u8 = undefined; 568 @memcpy(frame[0..header_bytes.len], &header_bytes); 569 @memcpy(frame[header_bytes.len..], &payload_bytes); 570 571 const event = try decodeFrame(alloc, &frame); 572 const info = event.info; 573 try std.testing.expectEqualStrings("OutdatedCursor", info.name.?); 574} 575 576test "decode identity frame" { 577 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 578 defer arena.deinit(); 579 const alloc = arena.allocator(); 580 581 // build frame via encoder for cleaner test 582 const original = Event{ .identity = .{ 583 .seq = 42, 584 .did = "did:plc:test", 585 .time = "2024-01-15T10:30:00Z", 586 } }; 587 const frame = try encodeFrame(alloc, original); 588 589 const event = try decodeFrame(alloc, frame); 590 const identity = event.identity; 591 try std.testing.expectEqual(@as(i64, 42), identity.seq); 592 try std.testing.expectEqualStrings("did:plc:test", identity.did); 593 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", identity.time); 594} 595 596test "Event.seq works" { 597 const info_event = Event{ .info = .{ .name = "test" } }; 598 try std.testing.expect(info_event.seq() == null); 599 600 const identity_event = Event{ .identity = .{ 601 .seq = 42, 602 .did = "did:plc:test", 603 .time = "2024-01-15T10:30:00Z", 604 } }; 605 try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?); 606} 607 608// === encoder tests === 609 610test "encode → decode info frame" { 611 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 612 defer arena.deinit(); 613 const alloc = arena.allocator(); 614 615 const original = Event{ .info = .{ 616 .name = "OutdatedCursor", 617 .message = "cursor is behind", 618 } }; 619 620 const frame = try encodeFrame(alloc, original); 621 const decoded = try decodeFrame(alloc, frame); 622 623 try std.testing.expectEqualStrings("OutdatedCursor", decoded.info.name.?); 624 try std.testing.expectEqualStrings("cursor is behind", decoded.info.message.?); 625} 626 627test "encode → decode identity frame" { 628 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 629 defer arena.deinit(); 630 const alloc = arena.allocator(); 631 632 const original = Event{ .identity = .{ 633 .seq = 42, 634 .did = "did:plc:test123", 635 .time = "2024-01-15T10:30:00Z", 636 .handle = "alice.bsky.social", 637 } }; 638 639 const frame = try encodeFrame(alloc, original); 640 const decoded = try decodeFrame(alloc, frame); 641 642 const id = decoded.identity; 643 try std.testing.expectEqual(@as(i64, 42), id.seq); 644 try std.testing.expectEqualStrings("did:plc:test123", id.did); 645 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time); 646 try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?); 647} 648 649test "encode → decode account frame" { 650 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 651 defer arena.deinit(); 652 const alloc = arena.allocator(); 653 654 const original = Event{ .account = .{ 655 .seq = 100, 656 .did = "did:plc:suspended", 657 .time = "2024-01-15T10:30:00Z", 658 .active = false, 659 .status = .suspended, 660 } }; 661 662 const frame = try encodeFrame(alloc, original); 663 const decoded = try decodeFrame(alloc, frame); 664 665 const acct = decoded.account; 666 try std.testing.expectEqual(@as(i64, 100), acct.seq); 667 try std.testing.expectEqualStrings("did:plc:suspended", acct.did); 668 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", acct.time); 669 try std.testing.expectEqual(false, acct.active); 670 try std.testing.expectEqual(AccountStatus.suspended, acct.status.?); 671} 672 673test "encode → decode commit frame with record" { 674 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 675 defer arena.deinit(); 676 const alloc = arena.allocator(); 677 678 const record: cbor.Value = .{ .map = &.{ 679 .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } }, 680 .{ .key = "text", .value = .{ .text = "hello firehose" } }, 681 } }; 682 683 const original = Event{ .commit = .{ 684 .seq = 999, 685 .repo = "did:plc:poster", 686 .rev = "3k2abc000000", 687 .time = "2024-01-15T10:30:00Z", 688 .since = "3k2abd000000", 689 .ops = &.{.{ 690 .action = .create, 691 .collection = "app.bsky.feed.post", 692 .rkey = "3k2abc", 693 .record = record, 694 }}, 695 } }; 696 697 const frame = try encodeFrame(alloc, original); 698 const decoded = try decodeFrame(alloc, frame); 699 700 const commit = decoded.commit; 701 try std.testing.expectEqual(@as(i64, 999), commit.seq); 702 try std.testing.expectEqualStrings("did:plc:poster", commit.repo); 703 try std.testing.expectEqualStrings("3k2abc000000", commit.rev); 704 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", commit.time); 705 try std.testing.expectEqualStrings("3k2abd000000", commit.since.?); 706 try std.testing.expectEqual(@as(usize, 0), commit.blobs.len); 707 try std.testing.expectEqual(@as(usize, 1), commit.ops.len); 708 709 const op = commit.ops[0]; 710 try std.testing.expectEqual(CommitAction.create, op.action); 711 try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection); 712 try std.testing.expectEqualStrings("3k2abc", op.rkey); 713 try std.testing.expect(op.cid != null); 714 715 // record should be decoded from the CAR blocks 716 const rec = op.record.?; 717 try std.testing.expectEqualStrings("hello firehose", rec.getString("text").?); 718 try std.testing.expectEqualStrings("app.bsky.feed.post", rec.getString("$type").?); 719} 720 721test "encode → decode commit with delete (no record)" { 722 var arena = std.heap.ArenaAllocator.init(std.testing.allocator); 723 defer arena.deinit(); 724 const alloc = arena.allocator(); 725 726 const original = Event{ .commit = .{ 727 .seq = 500, 728 .repo = "did:plc:deleter", 729 .rev = "3k2xyz000000", 730 .time = "2024-01-15T10:30:00Z", 731 .ops = &.{.{ 732 .action = .delete, 733 .collection = "app.bsky.feed.post", 734 .rkey = "abc123", 735 .record = null, 736 }}, 737 } }; 738 739 const frame = try encodeFrame(alloc, original); 740 const decoded = try decodeFrame(alloc, frame); 741 742 try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq); 743 try std.testing.expectEqualStrings("3k2xyz000000", decoded.commit.rev); 744 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", decoded.commit.time); 745 try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len); 746 try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action); 747 try std.testing.expect(decoded.commit.ops[0].cid == null); 748 try std.testing.expect(decoded.commit.ops[0].record == null); 749}