at main 6.7 kB view raw
1const std = @import("std"); 2const mem = std.mem; 3const json = std.json; 4const posix = std.posix; 5const Allocator = mem.Allocator; 6const websocket = @import("websocket"); 7const filter = @import("feed/filter.zig"); 8const stats = @import("server/stats.zig"); 9const posts = @import("store/posts.zig"); 10 11// turbostream: hydrated jetstream from graze.social 12const TURBOSTREAM_HOST = "api.graze.social"; 13const TURBOSTREAM_PATH = "/app/api/v1/turbostream/turbostream"; 14 15pub const Post = struct { 16 uri: []const u8, 17 cid: []const u8, 18 text: []const u8, 19 did: []const u8, 20 rkey: []const u8, 21}; 22 23pub fn consumer(allocator: Allocator) void { 24 var backoff: u64 = 1; 25 const max_backoff: u64 = 60; 26 27 while (true) { 28 connect(allocator) catch |err| { 29 std.debug.print("turbostream error: {}, reconnecting in {}s...\n", .{ err, backoff }); 30 }; 31 posix.nanosleep(backoff, 0); 32 backoff = @min(backoff * 2, max_backoff); 33 } 34} 35 36fn connect(allocator: Allocator) !void { 37 const host = posix.getenv("TURBOSTREAM_HOST") orelse TURBOSTREAM_HOST; 38 const path = TURBOSTREAM_PATH; 39 40 std.debug.print("connecting to wss://{s}{s}\n", .{ host, path }); 41 42 var client = websocket.Client.init(allocator, .{ 43 .host = host, 44 .port = 443, 45 .tls = true, 46 .max_size = 1024 * 1024 * 4, // 4MB for batched messages 47 }) catch |err| { 48 std.debug.print("websocket client init failed: {}\n", .{err}); 49 return err; 50 }; 51 defer client.deinit(); 52 53 var host_header_buf: [256]u8 = undefined; 54 const host_header = std.fmt.bufPrint(&host_header_buf, "Host: {s}\r\n", .{host}) catch host; 55 56 client.handshake(path, .{ .headers = host_header }) catch |err| { 57 std.debug.print("websocket handshake failed: {}\n", .{err}); 58 return err; 59 }; 60 61 std.debug.print("turbostream connected!\n", .{}); 62 stats.get().recordConnected(); 63 64 var handler = Handler{ .allocator = allocator }; 65 client.readLoop(&handler) catch |err| { 66 std.debug.print("websocket read loop error: {}\n", .{err}); 67 return err; 68 }; 69} 70 71const Handler = struct { 72 allocator: Allocator, 73 msg_count: usize = 0, 74 last_summary_time: i64 = 0, 75 76 pub fn serverMessage(self: *Handler, data: []const u8) !void { 77 self.msg_count += 1; 78 79 self.processRecord(data) catch |err| { 80 if (err != error.NotAPost and err != error.NoMatch) { 81 std.debug.print("processing error: {}\n", .{err}); 82 } 83 }; 84 85 // periodic summary every 30 seconds 86 const now = std.time.timestamp(); 87 if (now - self.last_summary_time >= 30) { 88 self.last_summary_time = now; 89 const s = stats.get(); 90 const lag_ms = s.getPostLagMs(); 91 const lag_sec = @divTrunc(lag_ms, 1000); 92 const status = s.getStatus(); 93 std.debug.print("turbostream: msgs={} matches={} lag={}s status={s}\n", .{ 94 s.getMessages(), 95 s.getMatches(), 96 lag_sec, 97 status, 98 }); 99 } 100 } 101 102 pub fn close(_: *Handler) void { 103 std.debug.print("turbostream connection closed\n", .{}); 104 } 105 106 fn processRecord(self: *Handler, payload: []const u8) !void { 107 const parsed = json.parseFromSlice(json.Value, self.allocator, payload, .{}) catch return error.ParseError; 108 defer parsed.deinit(); 109 110 if (parsed.value != .object) return error.NotAPost; 111 const record_obj = parsed.value.object; 112 113 // turbostream format: { at_uri, did, time_us, message: {...}, hydrated_metadata: {...} } 114 const message = record_obj.get("message") orelse return error.NotAPost; 115 if (message != .object) return error.NotAPost; 116 const msg = message.object; 117 118 // check kind 119 const kind = msg.get("kind") orelse return error.NotAPost; 120 if (kind != .string or !mem.eql(u8, kind.string, "commit")) return error.NotAPost; 121 122 // get commit 123 const commit = msg.get("commit") orelse return error.NotAPost; 124 if (commit != .object) return error.NotAPost; 125 126 // check collection - turbostream sends all collections, filter to posts 127 const collection = commit.object.get("collection") orelse return error.NotAPost; 128 if (collection != .string or !mem.eql(u8, collection.string, "app.bsky.feed.post")) return error.NotAPost; 129 130 // check operation (create only) 131 const operation = commit.object.get("operation") orelse return error.NotAPost; 132 if (operation != .string or !mem.eql(u8, operation.string, "create")) return error.NotAPost; 133 134 // get rkey and parse TID for timestamp 135 const rkey_val = commit.object.get("rkey") orelse return error.NotAPost; 136 if (rkey_val != .string) return error.NotAPost; 137 const rkey = rkey_val.string; 138 139 const post_time_ms = posts.parseTidTimestamp(rkey); 140 141 // get time_us from message 142 const time_us: i64 = if (msg.get("time_us")) |t| (if (t == .integer) t.integer else 0) else 0; 143 stats.get().recordEvent(time_us, post_time_ms); 144 145 // get cid 146 const cid_val = commit.object.get("cid") orelse return error.NotAPost; 147 if (cid_val != .string) return error.NotAPost; 148 149 // get post record 150 const post_record = commit.object.get("record") orelse return error.NotAPost; 151 if (post_record != .object) return error.NotAPost; 152 153 // check if post matches filter criteria 154 if (!filter.matches(post_record.object)) return error.NoMatch; 155 156 // get uri from turbostream (it provides at_uri directly) 157 const at_uri = record_obj.get("at_uri") orelse return error.NotAPost; 158 if (at_uri != .string) return error.NotAPost; 159 160 // add to feed 161 posts.add(at_uri.string, cid_val.string) catch |err| { 162 std.debug.print("failed to add post: {}\n", .{err}); 163 return; 164 }; 165 166 const s = stats.get(); 167 s.recordMatch(); 168 169 // track all platforms in post 170 const platforms = filter.detectAllPlatformsFromRecord(post_record.object); 171 if (platforms.soundcloud) s.recordPlatform(.soundcloud); 172 if (platforms.bandcamp) s.recordPlatform(.bandcamp); 173 if (platforms.spotify) s.recordPlatform(.spotify); 174 if (platforms.plyr) s.recordPlatform(.plyr); 175 if (platforms.apple) s.recordPlatform(.apple); 176 177 // track match types 178 if (filter.isQuoteMatch(post_record.object)) s.recordQuoteMatch(); 179 if (platforms.count() >= 2) s.recordMultiPlatform(); 180 181 std.debug.print("added: {s}\n", .{at_uri.string}); 182 } 183};