atproto utils for zig
zat.dev
atproto
sdk
zig
1//! firehose codec - com.atproto.sync.subscribeRepos
2//!
3//! encode and decode AT Protocol firehose events over WebSocket. messages are
4//! DAG-CBOR encoded (unlike jetstream, which is JSON). includes frame encoding/
5//! decoding, CAR block packing, and CID creation for records.
6//!
7//! wire format per frame:
8//! [DAG-CBOR header: {op, t}] [DAG-CBOR payload: {seq, repo, ops, blocks, ...}]
9//!
10//! see: https://atproto.com/specs/event-stream
11
12const std = @import("std");
13const websocket = @import("websocket");
14const cbor = @import("../repo/cbor.zig");
15const car = @import("../repo/car.zig");
16const sync = @import("sync.zig");
17
18const mem = std.mem;
19const Allocator = mem.Allocator;
20const posix = std.posix;
21const log = std.log.scoped(.zat);
22
23pub const CommitAction = sync.CommitAction;
24pub const AccountStatus = sync.AccountStatus;
25
26pub const default_hosts = [_][]const u8{
27 "bsky.network",
28 "northamerica.firehose.network",
29 "europe.firehose.network",
30 "asia.firehose.network",
31};
32
33pub const Options = struct {
34 hosts: []const []const u8 = &default_hosts,
35 cursor: ?i64 = null,
36 max_message_size: usize = 5 * 1024 * 1024, // 5MB — firehose frames can be large
37};
38
39/// decoded firehose event
40pub const Event = union(enum) {
41 commit: CommitEvent,
42 identity: IdentityEvent,
43 account: AccountEvent,
44 info: InfoEvent,
45
46 pub fn seq(self: Event) ?i64 {
47 return switch (self) {
48 .commit => |c| c.seq,
49 .identity => |i| i.seq,
50 .account => |a| a.seq,
51 .info => null,
52 };
53 }
54};
55
56pub const CommitEvent = struct {
57 seq: i64,
58 repo: []const u8, // DID
59 rev: []const u8, // TID — revision of the commit
60 time: []const u8, // datetime — when event was received
61 since: ?[]const u8 = null, // TID — rev of preceding commit (null = full repo export)
62 commit: ?cbor.Cid = null, // CID of the commit object
63 ops: []const RepoOp,
64 blobs: []const cbor.Cid = &.{}, // new blobs referenced by records in this commit
65 too_big: bool = false,
66};
67
68pub const RepoOp = struct {
69 action: CommitAction,
70 collection: []const u8,
71 rkey: []const u8,
72 cid: ?cbor.Cid = null, // CID of the record (null for deletes)
73 record: ?cbor.Value = null, // decoded DAG-CBOR record from CAR block
74};
75
76pub const IdentityEvent = struct {
77 seq: i64,
78 did: []const u8,
79 time: []const u8, // datetime — when event was received
80 handle: ?[]const u8 = null,
81};
82
83pub const AccountEvent = struct {
84 seq: i64,
85 did: []const u8,
86 time: []const u8, // datetime — when event was received
87 active: bool = true,
88 status: ?AccountStatus = null,
89};
90
91pub const InfoEvent = struct {
92 name: ?[]const u8 = null,
93 message: ?[]const u8 = null,
94};
95
96/// frame header from the wire
97const FrameHeader = struct {
98 op: i64,
99 t: ?[]const u8 = null,
100};
101
102pub const FrameOp = enum(i64) {
103 message = 1,
104 err = -1,
105};
106
107pub const DecodeError = error{
108 InvalidFrame,
109 InvalidHeader,
110 UnexpectedEof,
111 MissingField,
112 UnknownOp,
113 UnknownEventType,
114} || cbor.DecodeError || car.CarError;
115
116/// decode a raw WebSocket binary frame into a firehose Event
117pub fn decodeFrame(allocator: Allocator, data: []const u8) DecodeError!Event {
118 // frame = [CBOR header] [CBOR payload] concatenated
119 const header_result = try cbor.decode(allocator, data);
120 const header_val = header_result.value;
121 const payload_data = data[header_result.consumed..];
122
123 // parse header
124 const op = header_val.getInt("op") orelse return error.InvalidHeader;
125 if (op == -1) return error.UnknownOp; // error frame
126
127 const t = header_val.getString("t") orelse return error.InvalidHeader;
128
129 // decode payload
130 const payload = try cbor.decodeAll(allocator, payload_data);
131
132 if (mem.eql(u8, t, "#commit")) {
133 return try decodeCommit(allocator, payload);
134 } else if (mem.eql(u8, t, "#identity")) {
135 return decodeIdentity(payload);
136 } else if (mem.eql(u8, t, "#account")) {
137 return decodeAccount(payload);
138 } else if (mem.eql(u8, t, "#info")) {
139 return .{ .info = .{
140 .name = payload.getString("name"),
141 .message = payload.getString("message"),
142 } };
143 }
144
145 return error.UnknownEventType;
146}
147
148fn decodeCommit(allocator: Allocator, payload: cbor.Value) DecodeError!Event {
149 const seq_val = payload.getInt("seq") orelse return error.MissingField;
150 const repo = payload.getString("repo") orelse return error.MissingField;
151 const rev = payload.getString("rev") orelse return error.MissingField;
152 const time = payload.getString("time") orelse return error.MissingField;
153
154 // parse commit CID
155 var commit_cid: ?cbor.Cid = null;
156 if (payload.get("commit")) |commit_val| {
157 switch (commit_val) {
158 .cid => |c| commit_cid = c,
159 else => {},
160 }
161 }
162
163 // parse blobs array (array of CID links)
164 var blobs: std.ArrayList(cbor.Cid) = .{};
165 if (payload.getArray("blobs")) |blob_values| {
166 for (blob_values) |blob_val| {
167 switch (blob_val) {
168 .cid => |c| try blobs.append(allocator, c),
169 else => {},
170 }
171 }
172 }
173
174 // parse CAR blocks
175 const blocks_bytes = payload.getBytes("blocks");
176 var parsed_car: ?car.Car = null;
177 if (blocks_bytes) |b| {
178 parsed_car = car.read(allocator, b) catch null;
179 }
180
181 // parse ops
182 const ops_array = payload.getArray("ops");
183 var ops: std.ArrayList(RepoOp) = .{};
184
185 if (ops_array) |op_values| {
186 for (op_values) |op_val| {
187 const action_str = op_val.getString("action") orelse continue;
188 const action = CommitAction.parse(action_str) orelse continue;
189 const path = op_val.getString("path") orelse continue;
190
191 // split path into collection/rkey
192 const slash = mem.indexOfScalar(u8, path, '/') orelse continue;
193 const collection = path[0..slash];
194 const rkey = path[slash + 1 ..];
195
196 // extract CID from op and look up record from CAR blocks
197 var op_cid: ?cbor.Cid = null;
198 var record: ?cbor.Value = null;
199 if (op_val.get("cid")) |cid_val| {
200 switch (cid_val) {
201 .cid => |cid| {
202 op_cid = cid;
203 if (parsed_car) |c| {
204 if (car.findBlock(c, cid.raw)) |block_data| {
205 record = cbor.decodeAll(allocator, block_data) catch null;
206 }
207 }
208 },
209 else => {},
210 }
211 }
212
213 try ops.append(allocator, .{
214 .action = action,
215 .collection = collection,
216 .rkey = rkey,
217 .cid = op_cid,
218 .record = record,
219 });
220 }
221 }
222
223 return .{ .commit = .{
224 .seq = seq_val,
225 .repo = repo,
226 .rev = rev,
227 .time = time,
228 .since = payload.getString("since"),
229 .commit = commit_cid,
230 .ops = try ops.toOwnedSlice(allocator),
231 .blobs = try blobs.toOwnedSlice(allocator),
232 .too_big = payload.getBool("tooBig") orelse false,
233 } };
234}
235
236fn decodeIdentity(payload: cbor.Value) DecodeError!Event {
237 return .{ .identity = .{
238 .seq = payload.getInt("seq") orelse return error.MissingField,
239 .did = payload.getString("did") orelse return error.MissingField,
240 .time = payload.getString("time") orelse return error.MissingField,
241 .handle = payload.getString("handle"),
242 } };
243}
244
245fn decodeAccount(payload: cbor.Value) DecodeError!Event {
246 const status_str = payload.getString("status");
247 return .{ .account = .{
248 .seq = payload.getInt("seq") orelse return error.MissingField,
249 .did = payload.getString("did") orelse return error.MissingField,
250 .time = payload.getString("time") orelse return error.MissingField,
251 .active = payload.getBool("active") orelse true,
252 .status = if (status_str) |s| AccountStatus.parse(s) else null,
253 } };
254}
255
256// === encoder ===
257
258/// encode a firehose Event into a wire frame: [DAG-CBOR header] [DAG-CBOR payload]
259pub fn encodeFrame(allocator: Allocator, event: Event) ![]u8 {
260 var list: std.ArrayList(u8) = .{};
261 errdefer list.deinit(allocator);
262 const writer = list.writer(allocator);
263
264 const tag = switch (event) {
265 .commit => "#commit",
266 .identity => "#identity",
267 .account => "#account",
268 .info => "#info",
269 };
270
271 // encode header: {op: 1, t: "#..."}
272 const header: cbor.Value = .{ .map = &.{
273 .{ .key = "op", .value = .{ .unsigned = 1 } },
274 .{ .key = "t", .value = .{ .text = tag } },
275 } };
276 try cbor.encode(allocator, writer, header);
277
278 // encode payload based on event type
279 switch (event) {
280 .commit => |c| try encodeCommitPayload(allocator, writer, c),
281 .identity => |i| try encodeIdentityPayload(allocator, writer, i),
282 .account => |a| try encodeAccountPayload(allocator, writer, a),
283 .info => |inf| try encodeInfoPayload(allocator, writer, inf),
284 }
285
286 return try list.toOwnedSlice(allocator);
287}
288
289fn encodeCommitPayload(allocator: Allocator, writer: anytype, commit: CommitEvent) !void {
290 // build ops array and CAR blocks simultaneously
291 var op_values: std.ArrayList(cbor.Value) = .{};
292 defer op_values.deinit(allocator);
293 var car_blocks: std.ArrayList(car.Block) = .{};
294 defer car_blocks.deinit(allocator);
295 var root_cids: std.ArrayList(cbor.Cid) = .{};
296 defer root_cids.deinit(allocator);
297
298 for (commit.ops) |op| {
299 const action_str: []const u8 = @tagName(op.action);
300 const path = try std.fmt.allocPrint(allocator, "{s}/{s}", .{ op.collection, op.rkey });
301
302 if (op.record) |record| {
303 // encode record, create CID, add to CAR blocks
304 const record_bytes = try cbor.encodeAlloc(allocator, record);
305 const cid = try cbor.Cid.forDagCbor(allocator, record_bytes);
306
307 try car_blocks.append(allocator, .{
308 .cid_raw = cid.raw,
309 .data = record_bytes,
310 });
311
312 if (root_cids.items.len == 0) {
313 try root_cids.append(allocator, cid);
314 }
315
316 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{
317 .{ .key = "action", .value = .{ .text = action_str } },
318 .{ .key = "cid", .value = .{ .cid = cid } },
319 .{ .key = "path", .value = .{ .text = path } },
320 }) });
321 } else {
322 try op_values.append(allocator, .{ .map = @constCast(&[_]cbor.Value.MapEntry{
323 .{ .key = "action", .value = .{ .text = action_str } },
324 .{ .key = "path", .value = .{ .text = path } },
325 }) });
326 }
327 }
328
329 // build CAR file from blocks
330 const car_data = car.Car{
331 .roots = root_cids.items,
332 .blocks = car_blocks.items,
333 };
334 const blocks_bytes = try car.writeAlloc(allocator, car_data);
335
336 // build blobs array
337 var blob_values: std.ArrayList(cbor.Value) = .{};
338 defer blob_values.deinit(allocator);
339 for (commit.blobs) |blob| {
340 try blob_values.append(allocator, .{ .cid = blob });
341 }
342
343 // build payload entries
344 var entries: std.ArrayList(cbor.Value.MapEntry) = .{};
345 defer entries.deinit(allocator);
346
347 try entries.append(allocator, .{ .key = "blocks", .value = .{ .bytes = blocks_bytes } });
348 if (commit.commit) |c| {
349 try entries.append(allocator, .{ .key = "commit", .value = .{ .cid = c } });
350 }
351 try entries.append(allocator, .{ .key = "blobs", .value = .{ .array = blob_values.items } });
352 try entries.append(allocator, .{ .key = "ops", .value = .{ .array = op_values.items } });
353 try entries.append(allocator, .{ .key = "repo", .value = .{ .text = commit.repo } });
354 try entries.append(allocator, .{ .key = "rev", .value = .{ .text = commit.rev } });
355 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(commit.seq) } });
356 if (commit.since) |s| {
357 try entries.append(allocator, .{ .key = "since", .value = .{ .text = s } });
358 }
359 try entries.append(allocator, .{ .key = "time", .value = .{ .text = commit.time } });
360 if (commit.too_big) {
361 try entries.append(allocator, .{ .key = "tooBig", .value = .{ .boolean = true } });
362 }
363
364 try cbor.encode(allocator, writer, .{ .map = entries.items });
365}
366
367fn encodeIdentityPayload(allocator: Allocator, writer: anytype, identity: IdentityEvent) !void {
368 var entries: std.ArrayList(cbor.Value.MapEntry) = .{};
369 defer entries.deinit(allocator);
370
371 try entries.append(allocator, .{ .key = "did", .value = .{ .text = identity.did } });
372 if (identity.handle) |h| {
373 try entries.append(allocator, .{ .key = "handle", .value = .{ .text = h } });
374 }
375 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(identity.seq) } });
376 try entries.append(allocator, .{ .key = "time", .value = .{ .text = identity.time } });
377
378 try cbor.encode(allocator, writer, .{ .map = entries.items });
379}
380
381fn encodeAccountPayload(allocator: Allocator, writer: anytype, account: AccountEvent) !void {
382 var entries: std.ArrayList(cbor.Value.MapEntry) = .{};
383 defer entries.deinit(allocator);
384
385 if (!account.active) {
386 try entries.append(allocator, .{ .key = "active", .value = .{ .boolean = false } });
387 }
388 try entries.append(allocator, .{ .key = "did", .value = .{ .text = account.did } });
389 try entries.append(allocator, .{ .key = "seq", .value = .{ .unsigned = @intCast(account.seq) } });
390 if (account.status) |s| {
391 try entries.append(allocator, .{ .key = "status", .value = .{ .text = @tagName(s) } });
392 }
393 try entries.append(allocator, .{ .key = "time", .value = .{ .text = account.time } });
394
395 try cbor.encode(allocator, writer, .{ .map = entries.items });
396}
397
398fn encodeInfoPayload(allocator: Allocator, writer: anytype, info: InfoEvent) !void {
399 var entries: std.ArrayList(cbor.Value.MapEntry) = .{};
400 defer entries.deinit(allocator);
401
402 if (info.message) |m| {
403 try entries.append(allocator, .{ .key = "message", .value = .{ .text = m } });
404 }
405 if (info.name) |n| {
406 try entries.append(allocator, .{ .key = "name", .value = .{ .text = n } });
407 }
408
409 try cbor.encode(allocator, writer, .{ .map = entries.items });
410}
411
412pub const FirehoseClient = struct {
413 allocator: Allocator,
414 options: Options,
415 last_seq: ?i64 = null,
416
417 pub fn init(allocator: Allocator, options: Options) FirehoseClient {
418 return .{
419 .allocator = allocator,
420 .options = options,
421 .last_seq = if (options.cursor) |c| c else null,
422 };
423 }
424
425 pub fn deinit(_: *FirehoseClient) void {}
426
427 /// subscribe with a user-provided handler.
428 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void
429 /// optional: fn onError(*@TypeOf(handler), anyerror) void
430 /// blocks forever — reconnects with exponential backoff on disconnect.
431 /// rotates through hosts on each reconnect attempt.
432 pub fn subscribe(self: *FirehoseClient, handler: anytype) void {
433 var backoff: u64 = 1;
434 var host_index: usize = 0;
435 const max_backoff: u64 = 60;
436 var prev_host_index: usize = 0;
437
438 while (true) {
439 const host = self.options.hosts[host_index % self.options.hosts.len];
440 const effective_index = host_index % self.options.hosts.len;
441
442 // reset backoff on host switch (fresh host deserves a fresh chance)
443 if (host_index > 0 and effective_index != prev_host_index) {
444 backoff = 1;
445 }
446
447 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host });
448
449 self.connectAndRead(host, handler) catch |err| {
450 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) {
451 handler.onError(err);
452 } else {
453 log.err("firehose error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff });
454 }
455 };
456
457 prev_host_index = effective_index;
458 host_index += 1;
459 posix.nanosleep(backoff, 0);
460 backoff = @min(backoff * 2, max_backoff);
461 }
462 }
463
464 fn connectAndRead(self: *FirehoseClient, host: []const u8, handler: anytype) !void {
465 var path_buf: [256]u8 = undefined;
466 var w: std.Io.Writer = .fixed(&path_buf);
467
468 try w.writeAll("/xrpc/com.atproto.sync.subscribeRepos");
469 if (self.last_seq) |cursor| {
470 try w.print("?cursor={d}", .{cursor});
471 }
472 const path = w.buffered();
473
474 log.info("connecting to wss://{s}{s}", .{ host, path });
475
476 var client = try websocket.Client.init(self.allocator, .{
477 .host = host,
478 .port = 443,
479 .tls = true,
480 .max_size = self.options.max_message_size,
481 });
482 defer client.deinit();
483
484 var host_header_buf: [256]u8 = undefined;
485 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host;
486
487 try client.handshake(path, .{ .headers = host_header });
488 configureKeepalive(&client);
489
490 log.info("firehose connected to {s}", .{host});
491
492 var ws_handler = WsHandler(@TypeOf(handler.*)){
493 .allocator = self.allocator,
494 .handler = handler,
495 .client_state = self,
496 };
497 try client.readLoop(&ws_handler);
498 }
499};
500
501fn WsHandler(comptime H: type) type {
502 return struct {
503 allocator: Allocator,
504 handler: *H,
505 client_state: *FirehoseClient,
506
507 const Self = @This();
508
509 pub fn serverMessage(self: *Self, data: []const u8) !void {
510 var arena = std.heap.ArenaAllocator.init(self.allocator);
511 defer arena.deinit();
512
513 const event = decodeFrame(arena.allocator(), data) catch |err| {
514 log.debug("frame decode error: {s}", .{@errorName(err)});
515 return;
516 };
517
518 if (event.seq()) |s| {
519 self.client_state.last_seq = s;
520 }
521
522 self.handler.onEvent(event);
523 }
524
525 pub fn close(_: *Self) void {
526 log.info("firehose connection closed", .{});
527 }
528 };
529}
530
531/// enable TCP keepalive so reads don't block forever when a peer
532/// disappears without FIN/RST (network partition, crash, power loss).
533/// detection time: 10s idle + 5s × 2 probes = 20s.
534fn configureKeepalive(client: *websocket.Client) void {
535 const fd = client.stream.stream.handle;
536 const builtin = @import("builtin");
537 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return;
538 const tcp: i32 = @intCast(posix.IPPROTO.TCP);
539 if (builtin.os.tag == .linux) {
540 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return;
541 } else if (builtin.os.tag == .macos) {
542 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return;
543 }
544 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return;
545 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return;
546}
547
548// === tests ===
549
550test "decode frame header" {
551 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
552 defer arena.deinit();
553 const alloc = arena.allocator();
554
555 // simulate a frame: header {op: 1, t: "#info"} + payload {name: "OutdatedCursor"}
556 const header_bytes = [_]u8{
557 0xa2, // map(2)
558 0x62, 'o', 'p', 0x01, // "op": 1
559 0x61, 't', 0x65, '#', 'i', 'n', 'f', 'o', // "t": "#info"
560 };
561 const payload_bytes = [_]u8{
562 0xa1, // map(1)
563 0x64, 'n', 'a', 'm', 'e', // "name"
564 0x6e, 'O', 'u', 't', 'd', 'a', 't', 'e', 'd', 'C', 'u', 'r', 's', 'o', 'r', // "OutdatedCursor"
565 };
566
567 var frame: [header_bytes.len + payload_bytes.len]u8 = undefined;
568 @memcpy(frame[0..header_bytes.len], &header_bytes);
569 @memcpy(frame[header_bytes.len..], &payload_bytes);
570
571 const event = try decodeFrame(alloc, &frame);
572 const info = event.info;
573 try std.testing.expectEqualStrings("OutdatedCursor", info.name.?);
574}
575
576test "decode identity frame" {
577 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
578 defer arena.deinit();
579 const alloc = arena.allocator();
580
581 // build frame via encoder for cleaner test
582 const original = Event{ .identity = .{
583 .seq = 42,
584 .did = "did:plc:test",
585 .time = "2024-01-15T10:30:00Z",
586 } };
587 const frame = try encodeFrame(alloc, original);
588
589 const event = try decodeFrame(alloc, frame);
590 const identity = event.identity;
591 try std.testing.expectEqual(@as(i64, 42), identity.seq);
592 try std.testing.expectEqualStrings("did:plc:test", identity.did);
593 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", identity.time);
594}
595
596test "Event.seq works" {
597 const info_event = Event{ .info = .{ .name = "test" } };
598 try std.testing.expect(info_event.seq() == null);
599
600 const identity_event = Event{ .identity = .{
601 .seq = 42,
602 .did = "did:plc:test",
603 .time = "2024-01-15T10:30:00Z",
604 } };
605 try std.testing.expectEqual(@as(i64, 42), identity_event.seq().?);
606}
607
608// === encoder tests ===
609
610test "encode → decode info frame" {
611 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
612 defer arena.deinit();
613 const alloc = arena.allocator();
614
615 const original = Event{ .info = .{
616 .name = "OutdatedCursor",
617 .message = "cursor is behind",
618 } };
619
620 const frame = try encodeFrame(alloc, original);
621 const decoded = try decodeFrame(alloc, frame);
622
623 try std.testing.expectEqualStrings("OutdatedCursor", decoded.info.name.?);
624 try std.testing.expectEqualStrings("cursor is behind", decoded.info.message.?);
625}
626
627test "encode → decode identity frame" {
628 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
629 defer arena.deinit();
630 const alloc = arena.allocator();
631
632 const original = Event{ .identity = .{
633 .seq = 42,
634 .did = "did:plc:test123",
635 .time = "2024-01-15T10:30:00Z",
636 .handle = "alice.bsky.social",
637 } };
638
639 const frame = try encodeFrame(alloc, original);
640 const decoded = try decodeFrame(alloc, frame);
641
642 const id = decoded.identity;
643 try std.testing.expectEqual(@as(i64, 42), id.seq);
644 try std.testing.expectEqualStrings("did:plc:test123", id.did);
645 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", id.time);
646 try std.testing.expectEqualStrings("alice.bsky.social", id.handle.?);
647}
648
649test "encode → decode account frame" {
650 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
651 defer arena.deinit();
652 const alloc = arena.allocator();
653
654 const original = Event{ .account = .{
655 .seq = 100,
656 .did = "did:plc:suspended",
657 .time = "2024-01-15T10:30:00Z",
658 .active = false,
659 .status = .suspended,
660 } };
661
662 const frame = try encodeFrame(alloc, original);
663 const decoded = try decodeFrame(alloc, frame);
664
665 const acct = decoded.account;
666 try std.testing.expectEqual(@as(i64, 100), acct.seq);
667 try std.testing.expectEqualStrings("did:plc:suspended", acct.did);
668 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", acct.time);
669 try std.testing.expectEqual(false, acct.active);
670 try std.testing.expectEqual(AccountStatus.suspended, acct.status.?);
671}
672
673test "encode → decode commit frame with record" {
674 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
675 defer arena.deinit();
676 const alloc = arena.allocator();
677
678 const record: cbor.Value = .{ .map = &.{
679 .{ .key = "$type", .value = .{ .text = "app.bsky.feed.post" } },
680 .{ .key = "text", .value = .{ .text = "hello firehose" } },
681 } };
682
683 const original = Event{ .commit = .{
684 .seq = 999,
685 .repo = "did:plc:poster",
686 .rev = "3k2abc000000",
687 .time = "2024-01-15T10:30:00Z",
688 .since = "3k2abd000000",
689 .ops = &.{.{
690 .action = .create,
691 .collection = "app.bsky.feed.post",
692 .rkey = "3k2abc",
693 .record = record,
694 }},
695 } };
696
697 const frame = try encodeFrame(alloc, original);
698 const decoded = try decodeFrame(alloc, frame);
699
700 const commit = decoded.commit;
701 try std.testing.expectEqual(@as(i64, 999), commit.seq);
702 try std.testing.expectEqualStrings("did:plc:poster", commit.repo);
703 try std.testing.expectEqualStrings("3k2abc000000", commit.rev);
704 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", commit.time);
705 try std.testing.expectEqualStrings("3k2abd000000", commit.since.?);
706 try std.testing.expectEqual(@as(usize, 0), commit.blobs.len);
707 try std.testing.expectEqual(@as(usize, 1), commit.ops.len);
708
709 const op = commit.ops[0];
710 try std.testing.expectEqual(CommitAction.create, op.action);
711 try std.testing.expectEqualStrings("app.bsky.feed.post", op.collection);
712 try std.testing.expectEqualStrings("3k2abc", op.rkey);
713 try std.testing.expect(op.cid != null);
714
715 // record should be decoded from the CAR blocks
716 const rec = op.record.?;
717 try std.testing.expectEqualStrings("hello firehose", rec.getString("text").?);
718 try std.testing.expectEqualStrings("app.bsky.feed.post", rec.getString("$type").?);
719}
720
721test "encode → decode commit with delete (no record)" {
722 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
723 defer arena.deinit();
724 const alloc = arena.allocator();
725
726 const original = Event{ .commit = .{
727 .seq = 500,
728 .repo = "did:plc:deleter",
729 .rev = "3k2xyz000000",
730 .time = "2024-01-15T10:30:00Z",
731 .ops = &.{.{
732 .action = .delete,
733 .collection = "app.bsky.feed.post",
734 .rkey = "abc123",
735 .record = null,
736 }},
737 } };
738
739 const frame = try encodeFrame(alloc, original);
740 const decoded = try decodeFrame(alloc, frame);
741
742 try std.testing.expectEqual(@as(i64, 500), decoded.commit.seq);
743 try std.testing.expectEqualStrings("3k2xyz000000", decoded.commit.rev);
744 try std.testing.expectEqualStrings("2024-01-15T10:30:00Z", decoded.commit.time);
745 try std.testing.expectEqual(@as(usize, 1), decoded.commit.ops.len);
746 try std.testing.expectEqual(CommitAction.delete, decoded.commit.ops[0].action);
747 try std.testing.expect(decoded.commit.ops[0].cid == null);
748 try std.testing.expect(decoded.commit.ops[0].record == null);
749}