bsky feeds about music
music-atmosphere-feed.plyr.fm/
bsky
feed
zig
1const std = @import("std");
2const mem = std.mem;
3const json = std.json;
4const posix = std.posix;
5const Allocator = mem.Allocator;
6const websocket = @import("websocket");
7const filter = @import("feed/filter.zig");
8const stats = @import("server/stats.zig");
9const posts = @import("store/posts.zig");
10
11// turbostream: hydrated jetstream from graze.social
12const TURBOSTREAM_HOST = "api.graze.social";
13const TURBOSTREAM_PATH = "/app/api/v1/turbostream/turbostream";
14
15pub const Post = struct {
16 uri: []const u8,
17 cid: []const u8,
18 text: []const u8,
19 did: []const u8,
20 rkey: []const u8,
21};
22
23pub fn consumer(allocator: Allocator) void {
24 var backoff: u64 = 1;
25 const max_backoff: u64 = 60;
26
27 while (true) {
28 connect(allocator) catch |err| {
29 std.debug.print("turbostream error: {}, reconnecting in {}s...\n", .{ err, backoff });
30 };
31 posix.nanosleep(backoff, 0);
32 backoff = @min(backoff * 2, max_backoff);
33 }
34}
35
36fn connect(allocator: Allocator) !void {
37 const host = posix.getenv("TURBOSTREAM_HOST") orelse TURBOSTREAM_HOST;
38 const path = TURBOSTREAM_PATH;
39
40 std.debug.print("connecting to wss://{s}{s}\n", .{ host, path });
41
42 var client = websocket.Client.init(allocator, .{
43 .host = host,
44 .port = 443,
45 .tls = true,
46 .max_size = 1024 * 1024 * 4, // 4MB for batched messages
47 }) catch |err| {
48 std.debug.print("websocket client init failed: {}\n", .{err});
49 return err;
50 };
51 defer client.deinit();
52
53 var host_header_buf: [256]u8 = undefined;
54 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host;
55
56 client.handshake(path, .{ .headers = host_header }) catch |err| {
57 std.debug.print("websocket handshake failed: {}\n", .{err});
58 return err;
59 };
60
61 std.debug.print("turbostream connected!\n", .{});
62 stats.get().recordConnected();
63
64 var handler = Handler{ .allocator = allocator };
65 client.readLoop(&handler) catch |err| {
66 std.debug.print("websocket read loop error: {}\n", .{err});
67 return err;
68 };
69}
70
71const Handler = struct {
72 allocator: Allocator,
73 msg_count: usize = 0,
74 last_summary_time: i64 = 0,
75
76 pub fn serverMessage(self: *Handler, data: []const u8) !void {
77 self.msg_count += 1;
78
79 self.processRecord(data) catch |err| {
80 if (err != error.NotAPost and err != error.NoMatch) {
81 std.debug.print("processing error: {}\n", .{err});
82 }
83 };
84
85 // periodic summary every 30 seconds
86 const now = std.time.timestamp();
87 if (now - self.last_summary_time >= 30) {
88 self.last_summary_time = now;
89 const s = stats.get();
90 const lag_ms = s.getPostLagMs();
91 const lag_sec = @divTrunc(lag_ms, 1000);
92 const status = s.getStatus();
93 std.debug.print("turbostream: msgs={} matches={} lag={}s status={s}\n", .{
94 s.getMessages(),
95 s.getMatches(),
96 lag_sec,
97 status,
98 });
99 }
100 }
101
102 pub fn close(_: *Handler) void {
103 std.debug.print("turbostream connection closed\n", .{});
104 }
105
106 fn processRecord(self: *Handler, payload: []const u8) !void {
107 const parsed = json.parseFromSlice(json.Value, self.allocator, payload, .{}) catch return error.ParseError;
108 defer parsed.deinit();
109
110 if (parsed.value != .object) return error.NotAPost;
111 const record_obj = parsed.value.object;
112
113 // turbostream format: { at_uri, did, time_us, message: {...}, hydrated_metadata: {...} }
114 const message = record_obj.get("message") orelse return error.NotAPost;
115 if (message != .object) return error.NotAPost;
116 const msg = message.object;
117
118 // check kind
119 const kind = msg.get("kind") orelse return error.NotAPost;
120 if (kind != .string or !mem.eql(u8, kind.string, "commit")) return error.NotAPost;
121
122 // get commit
123 const commit = msg.get("commit") orelse return error.NotAPost;
124 if (commit != .object) return error.NotAPost;
125
126 // check collection - turbostream sends all collections, filter to posts
127 const collection = commit.object.get("collection") orelse return error.NotAPost;
128 if (collection != .string or !mem.eql(u8, collection.string, "app.bsky.feed.post")) return error.NotAPost;
129
130 // check operation (create only)
131 const operation = commit.object.get("operation") orelse return error.NotAPost;
132 if (operation != .string or !mem.eql(u8, operation.string, "create")) return error.NotAPost;
133
134 // get rkey and parse TID for timestamp
135 const rkey_val = commit.object.get("rkey") orelse return error.NotAPost;
136 if (rkey_val != .string) return error.NotAPost;
137 const rkey = rkey_val.string;
138
139 const post_time_ms = posts.parseTidTimestamp(rkey);
140
141 // get time_us from message
142 const time_us: i64 = if (msg.get("time_us")) |t| (if (t == .integer) t.integer else 0) else 0;
143 stats.get().recordEvent(time_us, post_time_ms);
144
145 // get cid
146 const cid_val = commit.object.get("cid") orelse return error.NotAPost;
147 if (cid_val != .string) return error.NotAPost;
148
149 // get post record
150 const post_record = commit.object.get("record") orelse return error.NotAPost;
151 if (post_record != .object) return error.NotAPost;
152
153 // check if post matches filter criteria
154 if (!filter.matches(post_record.object)) return error.NoMatch;
155
156 // get uri from turbostream (it provides at_uri directly)
157 const at_uri = record_obj.get("at_uri") orelse return error.NotAPost;
158 if (at_uri != .string) return error.NotAPost;
159
160 // add to feed
161 posts.add(at_uri.string, cid_val.string) catch |err| {
162 std.debug.print("failed to add post: {}\n", .{err});
163 return;
164 };
165
166 const s = stats.get();
167 s.recordMatch();
168
169 // track all platforms in post
170 const platforms = filter.detectAllPlatformsFromRecord(post_record.object);
171 if (platforms.soundcloud) s.recordPlatform(.soundcloud);
172 if (platforms.bandcamp) s.recordPlatform(.bandcamp);
173 if (platforms.spotify) s.recordPlatform(.spotify);
174 if (platforms.plyr) s.recordPlatform(.plyr);
175 if (platforms.apple) s.recordPlatform(.apple);
176
177 // track match types
178 if (filter.isQuoteMatch(post_record.object)) s.recordQuoteMatch();
179 if (platforms.count() >= 2) s.recordMultiPlatform();
180
181 std.debug.print("added: {s}\n", .{at_uri.string});
182 }
183};