atproto utils for zig
zat.dev
atproto
sdk
zig
1//! jetstream client - AT Protocol event stream via WebSocket
2//!
3//! typed, reconnecting client for the Bluesky Jetstream service.
4//! parses commit, identity, and account events into typed structs.
5//!
6//! see: https://github.com/bluesky-social/jetstream
7
8const std = @import("std");
9const websocket = @import("websocket");
10const json_helpers = @import("../xrpc/json.zig");
11const sync = @import("sync.zig");
12
13const mem = std.mem;
14const json = std.json;
15const posix = std.posix;
16const Allocator = mem.Allocator;
17const log = std.log.scoped(.zat);
18
19pub const CommitAction = sync.CommitAction;
20pub const AccountStatus = sync.AccountStatus;
21
22pub const default_hosts = [_][]const u8{
23 "jetstream1.us-east.bsky.network",
24 "jetstream2.us-east.bsky.network",
25 "jetstream1.us-west.bsky.network",
26 "jetstream2.us-west.bsky.network",
27 "jetstream.waow.tech",
28 "jetstream.fire.hose.cam",
29 "jet.firehose.stream",
30 "sfo.firehose.stream",
31 "nyc.firehose.stream",
32 "london.firehose.stream",
33 "frankfurt.firehose.stream",
34 "chennai.firehose.stream",
35};
36
37pub const Options = struct {
38 hosts: []const []const u8 = &default_hosts,
39 wanted_collections: []const []const u8 = &.{},
40 wanted_dids: []const []const u8 = &.{},
41 cursor: ?i64 = null,
42 max_message_size: usize = 1024 * 1024,
43};
44
45pub const Event = union(enum) {
46 commit: CommitEvent,
47 identity: IdentityEvent,
48 account: AccountEvent,
49
50 pub fn timeUs(self: Event) i64 {
51 return switch (self) {
52 inline else => |e| e.time_us,
53 };
54 }
55};
56
57pub const CommitEvent = struct {
58 did: []const u8,
59 time_us: i64,
60 rev: ?[]const u8 = null,
61 operation: CommitAction,
62 collection: []const u8,
63 rkey: []const u8,
64 record: ?json.Value = null,
65 cid: ?[]const u8 = null,
66};
67
68pub const IdentityEvent = struct {
69 did: []const u8,
70 time_us: i64,
71 handle: ?[]const u8 = null,
72 seq: ?i64 = null,
73 time: ?[]const u8 = null,
74};
75
76pub const AccountEvent = struct {
77 did: []const u8,
78 time_us: i64,
79 active: bool,
80 status: ?AccountStatus = null,
81 seq: ?i64 = null,
82 time: ?[]const u8 = null,
83};
84
85/// parse a raw JSON payload into a typed Event.
86/// allocator is used for JSON structural data (ObjectMaps for record fields).
87/// string slices in the returned Event reference the source `payload` bytes.
88/// keep both `payload` and allocator-owned memory alive while using the Event.
89pub fn parseEvent(allocator: Allocator, payload: []const u8) !Event {
90 const parsed = try json.parseFromSlice(json.Value, allocator, payload, .{});
91 const root = parsed.value;
92
93 const kind_str = json_helpers.getString(root, "kind") orelse return error.MissingKind;
94 const did = json_helpers.getString(root, "did") orelse return error.MissingDid;
95 const time_us = json_helpers.getInt(root, "time_us") orelse return error.MissingTimeUs;
96
97 if (mem.eql(u8, kind_str, "commit")) {
98 const op_str = json_helpers.getString(root, "commit.operation") orelse return error.MissingOperation;
99 return .{ .commit = .{
100 .did = did,
101 .time_us = time_us,
102 .operation = CommitAction.parse(op_str) orelse return error.UnknownOperation,
103 .collection = json_helpers.getString(root, "commit.collection") orelse return error.MissingCollection,
104 .rkey = json_helpers.getString(root, "commit.rkey") orelse return error.MissingRkey,
105 .rev = json_helpers.getString(root, "commit.rev"),
106 .cid = json_helpers.getString(root, "commit.cid"),
107 .record = json_helpers.getPath(root, "commit.record"),
108 } };
109 } else if (mem.eql(u8, kind_str, "identity")) {
110 return .{ .identity = .{
111 .did = did,
112 .time_us = time_us,
113 .handle = json_helpers.getString(root, "identity.handle"),
114 .seq = json_helpers.getInt(root, "identity.seq"),
115 .time = json_helpers.getString(root, "identity.time"),
116 } };
117 } else if (mem.eql(u8, kind_str, "account")) {
118 const status_str = json_helpers.getString(root, "account.status");
119 return .{ .account = .{
120 .did = did,
121 .time_us = time_us,
122 .active = json_helpers.getBool(root, "account.active") orelse true,
123 .status = if (status_str) |s| AccountStatus.parse(s) else null,
124 .seq = json_helpers.getInt(root, "account.seq"),
125 .time = json_helpers.getString(root, "account.time"),
126 } };
127 }
128
129 return error.UnknownKind;
130}
131
132pub const JetstreamClient = struct {
133 allocator: Allocator,
134 options: Options,
135 last_time_us: ?i64 = null,
136
137 pub fn init(allocator: Allocator, options: Options) JetstreamClient {
138 return .{
139 .allocator = allocator,
140 .options = options,
141 .last_time_us = options.cursor,
142 };
143 }
144
145 pub fn deinit(_: *JetstreamClient) void {}
146
147 /// subscribe with a user-provided handler.
148 /// handler must implement: fn onEvent(*@TypeOf(handler), Event) void
149 /// optional: fn onError(*@TypeOf(handler), anyerror) void
150 /// optional: fn onConnect(*@TypeOf(handler), []const u8) void — called with host on connect
151 /// blocks forever — reconnects with exponential backoff on disconnect.
152 /// rotates through hosts on each reconnect attempt.
153 pub fn subscribe(self: *JetstreamClient, handler: anytype) void {
154 var backoff: u64 = 1;
155 var host_index: usize = 0;
156 const max_backoff: u64 = 60;
157 var prev_host_index: usize = 0;
158
159 while (true) {
160 const host = self.options.hosts[host_index % self.options.hosts.len];
161 const effective_index = host_index % self.options.hosts.len;
162
163 // rewind cursor by 10s on host switch (different instances may lag)
164 if (host_index > 0 and effective_index != prev_host_index) {
165 if (self.last_time_us) |t| {
166 self.last_time_us = t - 10_000_000;
167 }
168 backoff = 1;
169 }
170
171 log.info("connecting to host {d}/{d}: {s}", .{ effective_index + 1, self.options.hosts.len, host });
172
173 self.connectAndRead(host, handler) catch |err| {
174 if (comptime @hasDecl(@TypeOf(handler.*), "onError")) {
175 handler.onError(err);
176 } else {
177 log.err("jetstream error: {s}, reconnecting in {d}s...", .{ @errorName(err), backoff });
178 }
179 };
180
181 prev_host_index = effective_index;
182 host_index += 1;
183 posix.nanosleep(backoff, 0);
184 backoff = @min(backoff * 2, max_backoff);
185 }
186 }
187
188 fn connectAndRead(self: *JetstreamClient, host: []const u8, handler: anytype) !void {
189 var path_buf: [2048]u8 = undefined;
190 const path = try self.buildSubscribePath(&path_buf);
191
192 log.info("connecting to wss://{s}{s}", .{ host, path });
193
194 var client = try websocket.Client.init(self.allocator, .{
195 .host = host,
196 .port = 443,
197 .tls = true,
198 .max_size = self.options.max_message_size,
199 });
200 defer client.deinit();
201
202 var host_header_buf: [256]u8 = undefined;
203 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host;
204
205 try client.handshake(path, .{ .headers = host_header });
206 configureKeepalive(&client);
207
208 log.info("jetstream connected to {s}", .{host});
209
210 if (comptime @hasDecl(@TypeOf(handler.*), "onConnect")) {
211 handler.onConnect(host);
212 }
213
214 var ws_handler = WsHandler(@TypeOf(handler.*)){
215 .allocator = self.allocator,
216 .handler = handler,
217 .client_state = self,
218 };
219 try client.readLoop(&ws_handler);
220 }
221
222 fn buildSubscribePath(self: *JetstreamClient, buf: *[2048]u8) ![]const u8 {
223 var w: std.Io.Writer = .fixed(buf);
224
225 try w.writeAll("/subscribe");
226
227 var has_param = false;
228
229 for (self.options.wanted_collections) |col| {
230 try w.writeByte(if (!has_param) '?' else '&');
231 try w.writeAll("wantedCollections=");
232 try w.writeAll(col);
233 has_param = true;
234 }
235
236 for (self.options.wanted_dids) |did| {
237 try w.writeByte(if (!has_param) '?' else '&');
238 try w.writeAll("wantedDids=");
239 try w.writeAll(did);
240 has_param = true;
241 }
242
243 if (self.last_time_us) |cursor| {
244 try w.writeByte(if (!has_param) '?' else '&');
245 try w.print("cursor={d}", .{cursor});
246 }
247
248 return w.buffered();
249 }
250};
251
252fn WsHandler(comptime H: type) type {
253 return struct {
254 allocator: Allocator,
255 handler: *H,
256 client_state: *JetstreamClient,
257
258 const Self = @This();
259
260 pub fn serverMessage(self: *Self, data: []const u8) !void {
261 var arena = std.heap.ArenaAllocator.init(self.allocator);
262 defer arena.deinit();
263
264 const event = parseEvent(arena.allocator(), data) catch |err| {
265 log.debug("message parse error: {s}", .{@errorName(err)});
266 return;
267 };
268
269 self.client_state.last_time_us = event.timeUs();
270 self.handler.onEvent(event);
271 }
272
273 pub fn close(_: *Self) void {
274 log.info("jetstream connection closed", .{});
275 }
276 };
277}
278
279/// enable TCP keepalive so reads don't block forever when a peer
280/// disappears without FIN/RST (network partition, crash, power loss).
281/// detection time: 10s idle + 5s × 2 probes = 20s.
282fn configureKeepalive(client: *websocket.Client) void {
283 const fd = client.stream.stream.handle;
284 const builtin = @import("builtin");
285 posix.setsockopt(fd, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(i32, 1))) catch return;
286 const tcp: i32 = @intCast(posix.IPPROTO.TCP);
287 if (builtin.os.tag == .linux) {
288 posix.setsockopt(fd, tcp, posix.TCP.KEEPIDLE, &std.mem.toBytes(@as(i32, 10))) catch return;
289 } else if (builtin.os.tag == .macos) {
290 posix.setsockopt(fd, tcp, posix.TCP.KEEPALIVE, &std.mem.toBytes(@as(i32, 10))) catch return;
291 }
292 posix.setsockopt(fd, tcp, posix.TCP.KEEPINTVL, &std.mem.toBytes(@as(i32, 5))) catch return;
293 posix.setsockopt(fd, tcp, posix.TCP.KEEPCNT, &std.mem.toBytes(@as(i32, 2))) catch return;
294}
295
296// === tests ===
297
298test "parse commit event" {
299 const payload =
300 \\{
301 \\ "did": "did:plc:abc123",
302 \\ "time_us": 1700000000000,
303 \\ "kind": "commit",
304 \\ "commit": {
305 \\ "rev": "3mbspmpaidl2a",
306 \\ "operation": "create",
307 \\ "collection": "app.bsky.feed.post",
308 \\ "rkey": "xyz789",
309 \\ "cid": "bafyreitest",
310 \\ "record": {
311 \\ "text": "hello world",
312 \\ "$type": "app.bsky.feed.post"
313 \\ }
314 \\ }
315 \\}
316 ;
317
318 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
319 defer arena.deinit();
320
321 const event = try parseEvent(arena.allocator(), payload);
322 const commit = event.commit;
323
324 try std.testing.expectEqualStrings("did:plc:abc123", commit.did);
325 try std.testing.expectEqual(@as(i64, 1700000000000), commit.time_us);
326 try std.testing.expectEqualStrings("3mbspmpaidl2a", commit.rev.?);
327 try std.testing.expectEqual(CommitAction.create, commit.operation);
328 try std.testing.expectEqualStrings("app.bsky.feed.post", commit.collection);
329 try std.testing.expectEqualStrings("xyz789", commit.rkey);
330 try std.testing.expectEqualStrings("bafyreitest", commit.cid.?);
331 try std.testing.expect(commit.record != null);
332 try std.testing.expectEqualStrings("hello world", json_helpers.getString(commit.record.?, "text").?);
333}
334
335test "parse identity event" {
336 const payload =
337 \\{
338 \\ "did": "did:plc:abc123",
339 \\ "time_us": 1700000000000,
340 \\ "kind": "identity",
341 \\ "identity": {
342 \\ "handle": "alice.bsky.social",
343 \\ "seq": 42,
344 \\ "time": "2024-01-01T00:00:00Z"
345 \\ }
346 \\}
347 ;
348
349 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
350 defer arena.deinit();
351
352 const event = try parseEvent(arena.allocator(), payload);
353 const identity = event.identity;
354
355 try std.testing.expectEqualStrings("did:plc:abc123", identity.did);
356 try std.testing.expectEqual(@as(i64, 1700000000000), identity.time_us);
357 try std.testing.expectEqualStrings("alice.bsky.social", identity.handle.?);
358 try std.testing.expectEqual(@as(i64, 42), identity.seq.?);
359 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", identity.time.?);
360}
361
362test "parse account event" {
363 const payload =
364 \\{
365 \\ "did": "did:plc:abc123",
366 \\ "time_us": 1700000000000,
367 \\ "kind": "account",
368 \\ "account": {
369 \\ "active": false,
370 \\ "status": "suspended",
371 \\ "seq": 99,
372 \\ "time": "2024-01-01T00:00:00Z"
373 \\ }
374 \\}
375 ;
376
377 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
378 defer arena.deinit();
379
380 const event = try parseEvent(arena.allocator(), payload);
381 const account = event.account;
382
383 try std.testing.expectEqualStrings("did:plc:abc123", account.did);
384 try std.testing.expectEqual(@as(i64, 1700000000000), account.time_us);
385 try std.testing.expectEqual(false, account.active);
386 try std.testing.expectEqual(AccountStatus.suspended, account.status.?);
387 try std.testing.expectEqual(@as(i64, 99), account.seq.?);
388 try std.testing.expectEqualStrings("2024-01-01T00:00:00Z", account.time.?);
389}
390
391test "parse unknown kind returns error" {
392 const payload =
393 \\{
394 \\ "did": "did:plc:abc123",
395 \\ "time_us": 1700000000000,
396 \\ "kind": "unknown_kind"
397 \\}
398 ;
399
400 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
401 defer arena.deinit();
402
403 try std.testing.expectError(error.UnknownKind, parseEvent(arena.allocator(), payload));
404}
405
406test "parse commit with unknown operation returns error" {
407 const payload =
408 \\{
409 \\ "did": "did:plc:abc123",
410 \\ "time_us": 1700000000000,
411 \\ "kind": "commit",
412 \\ "commit": {
413 \\ "operation": "archive",
414 \\ "collection": "app.bsky.feed.post",
415 \\ "rkey": "xyz789"
416 \\ }
417 \\}
418 ;
419
420 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
421 defer arena.deinit();
422
423 try std.testing.expectError(error.UnknownOperation, parseEvent(arena.allocator(), payload));
424}
425
426test "cursor tracking via time_us" {
427 const payloads = [_][]const u8{
428 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"1"}}
429 ,
430 \\{"did":"did:plc:b","time_us":200,"kind":"commit","commit":{"operation":"create","collection":"app.bsky.feed.post","rkey":"2"}}
431 ,
432 };
433
434 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
435 defer arena.deinit();
436
437 const e1 = try parseEvent(arena.allocator(), payloads[0]);
438 const e2 = try parseEvent(arena.allocator(), payloads[1]);
439
440 try std.testing.expect(e1.timeUs() > 0);
441 try std.testing.expect(e2.timeUs() > e1.timeUs());
442}
443
444test "Event.timeUs works for all variants" {
445 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
446 defer arena.deinit();
447
448 const commit = try parseEvent(arena.allocator(),
449 \\{"did":"did:plc:a","time_us":100,"kind":"commit","commit":{"operation":"create","collection":"x","rkey":"1"}}
450 );
451 const identity = try parseEvent(arena.allocator(),
452 \\{"did":"did:plc:a","time_us":200,"kind":"identity","identity":{}}
453 );
454 const account = try parseEvent(arena.allocator(),
455 \\{"did":"did:plc:a","time_us":300,"kind":"account","account":{"active":true}}
456 );
457
458 try std.testing.expectEqual(@as(i64, 100), commit.timeUs());
459 try std.testing.expectEqual(@as(i64, 200), identity.timeUs());
460 try std.testing.expectEqual(@as(i64, 300), account.timeUs());
461}
462
463test "build subscribe path" {
464 var client = JetstreamClient.init(std.testing.allocator, .{
465 .wanted_collections = &.{"app.bsky.feed.post"},
466 });
467
468 var buf: [2048]u8 = undefined;
469 const path = try client.buildSubscribePath(&buf);
470 try std.testing.expectEqualStrings("/subscribe?wantedCollections=app.bsky.feed.post", path);
471}
472
473test "build subscribe path with multiple params" {
474 var client = JetstreamClient.init(std.testing.allocator, .{
475 .wanted_collections = &.{ "app.bsky.feed.post", "app.bsky.feed.like" },
476 .wanted_dids = &.{"did:plc:abc123"},
477 .cursor = 1700000000000,
478 });
479
480 var buf: [2048]u8 = undefined;
481 const path = try client.buildSubscribePath(&buf);
482 try std.testing.expectEqualStrings(
483 "/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like&wantedDids=did:plc:abc123&cursor=1700000000000",
484 path,
485 );
486}
487
488test "build subscribe path no params" {
489 var client = JetstreamClient.init(std.testing.allocator, .{});
490
491 var buf: [2048]u8 = undefined;
492 const path = try client.buildSubscribePath(&buf);
493 try std.testing.expectEqualStrings("/subscribe", path);
494}
495
496test "parse commit event with delete operation" {
497 const payload =
498 \\{
499 \\ "did": "did:plc:abc123",
500 \\ "time_us": 1700000000000,
501 \\ "kind": "commit",
502 \\ "commit": {
503 \\ "operation": "delete",
504 \\ "collection": "app.bsky.feed.post",
505 \\ "rkey": "xyz789"
506 \\ }
507 \\}
508 ;
509
510 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
511 defer arena.deinit();
512
513 const commit = (try parseEvent(arena.allocator(), payload)).commit;
514
515 try std.testing.expectEqual(CommitAction.delete, commit.operation);
516 try std.testing.expect(commit.record == null);
517 try std.testing.expect(commit.rev == null);
518 try std.testing.expect(commit.cid == null);
519}
520
521test "parse identity event with minimal fields" {
522 const payload =
523 \\{
524 \\ "did": "did:plc:abc123",
525 \\ "time_us": 1700000000000,
526 \\ "kind": "identity",
527 \\ "identity": {}
528 \\}
529 ;
530
531 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
532 defer arena.deinit();
533
534 const identity = (try parseEvent(arena.allocator(), payload)).identity;
535
536 try std.testing.expectEqualStrings("did:plc:abc123", identity.did);
537 try std.testing.expect(identity.handle == null);
538 try std.testing.expect(identity.seq == null);
539 try std.testing.expect(identity.time == null);
540}
541
542test "parse missing did returns error" {
543 const payload =
544 \\{
545 \\ "time_us": 1700000000000,
546 \\ "kind": "commit"
547 \\}
548 ;
549
550 var arena = std.heap.ArenaAllocator.init(std.testing.allocator);
551 defer arena.deinit();
552
553 try std.testing.expectError(error.MissingDid, parseEvent(arena.allocator(), payload));
554}
555
556test "default hosts contains known jetstream instances" {
557 try std.testing.expectEqual(@as(usize, 12), default_hosts.len);
558 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", default_hosts[0]);
559 try std.testing.expectEqualStrings("jetstream2.us-east.bsky.network", default_hosts[1]);
560 try std.testing.expectEqualStrings("jetstream1.us-west.bsky.network", default_hosts[2]);
561 try std.testing.expectEqualStrings("jetstream2.us-west.bsky.network", default_hosts[3]);
562 try std.testing.expectEqualStrings("jetstream.waow.tech", default_hosts[4]);
563 try std.testing.expectEqualStrings("jetstream.fire.hose.cam", default_hosts[5]);
564 try std.testing.expectEqualStrings("jet.firehose.stream", default_hosts[6]);
565 try std.testing.expectEqualStrings("chennai.firehose.stream", default_hosts[11]);
566}
567
568test "round-robin cycles through hosts" {
569 const hosts = [_][]const u8{ "host-a", "host-b", "host-c" };
570 // simulate the index logic from subscribe()
571 for (0..9) |i| {
572 const host = hosts[i % hosts.len];
573 const expected: []const u8 = switch (i % 3) {
574 0 => "host-a",
575 1 => "host-b",
576 2 => "host-c",
577 else => unreachable,
578 };
579 try std.testing.expectEqualStrings(expected, host);
580 }
581}
582
583test "options default hosts are used" {
584 const opts = Options{};
585 try std.testing.expectEqual(@as(usize, 12), opts.hosts.len);
586 try std.testing.expectEqualStrings("jetstream1.us-east.bsky.network", opts.hosts[0]);
587}
588
589test "options custom single host" {
590 const opts = Options{ .hosts = &.{"my-custom-host.example.com"} };
591 try std.testing.expectEqual(@as(usize, 1), opts.hosts.len);
592 try std.testing.expectEqualStrings("my-custom-host.example.com", opts.hosts[0]);
593}