refactor: split monolithic db.zig into focused modules

- store/db.zig: sqlite connection and schema only
- store/posts.zig: post CRUD with cursor pagination
- store/follows.zig: follows cache operations
- store/backfill.zig: backfill completion tracking
- bsky/auth.zig: JWT verification (from atproto.zig)
- bsky/api.zig: bluesky API calls (from atproto.zig)
- feed/skeleton.zig: feed response JSON building
- feed/backfill.zig: backfill orchestration
- jetstream.zig: moved to top level (was stream/)

removes 800+ lines of tangled code from stream/db.zig

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+160
src/bsky/api.zig
···
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + const zat = @import("zat"); 4 + 5 + const log = std.log.scoped(.bsky_api); 6 + 7 + const BSKY_PUBLIC_API = "https://public.api.bsky.app"; 8 + 9 + /// fetch all DIDs that a user follows (with pagination) 10 + pub fn getFollows(alloc: Allocator, actor_did: []const u8) ![][]const u8 { 11 + var client = zat.XrpcClient.init(alloc, BSKY_PUBLIC_API); 12 + defer client.deinit(); 13 + 14 + var result: std.ArrayList([]const u8) = .empty; 15 + errdefer { 16 + for (result.items) |item| alloc.free(item); 17 + result.deinit(alloc); 18 + } 19 + 20 + var cursor: ?[]const u8 = null; 21 + defer if (cursor) |c| alloc.free(c); 22 + 23 + const nsid = zat.Nsid.parse("app.bsky.graph.getFollows") orelse return error.InvalidNsid; 24 + 25 + while (true) { 26 + var params = std.StringHashMap([]const u8).init(alloc); 27 + defer params.deinit(); 28 + 29 + try params.put("actor", actor_did); 30 + try params.put("limit", "100"); 31 + if (cursor) |c| { 32 + try params.put("cursor", c); 33 + } 34 + 35 + var response = client.query(nsid, params) catch |err| { 36 + log.err("getFollows API error: {}", .{err}); 37 + return error.ApiFailed; 38 + }; 39 + defer response.deinit(); 40 + 41 + if (!response.ok()) { 42 + return error.ApiFailed; 43 + } 44 + 45 + var parsed = response.json() catch return error.InvalidJson; 46 + defer parsed.deinit(); 47 + 48 + const follows = zat.json.getArray(parsed.value, "follows") orelse break; 49 + 50 + for (follows) |item| { 51 + const did = zat.json.getString(item, "did") orelse continue; 52 + try result.append(alloc, try alloc.dupe(u8, did)); 53 + } 54 + 55 + // check for next page 56 + if (cursor) |c| alloc.free(c); 57 + cursor = null; 58 + 59 + if (zat.json.getString(parsed.value, "cursor")) |c| { 60 + if (c.len > 0) { 61 + cursor = try alloc.dupe(u8, c); 62 + } 63 + } 64 + 65 + if (cursor == null) break; 66 + } 67 + 68 + return try result.toOwnedSlice(alloc); 69 + } 70 + 71 + pub fn freeFollows(alloc: Allocator, follows: [][]const u8) void { 72 + for (follows) |did| alloc.free(did); 73 + alloc.free(follows); 74 + } 75 + 76 + /// post from getAuthorFeed 77 + pub const AuthorPost = struct { 78 + uri: []const u8, 79 + cid: []const u8, 80 + text: []const u8, 81 + embed_uri: ?[]const u8, 82 + }; 83 + 84 + /// struct for extracting post data from feed items 85 + const FeedPost = struct { 86 + uri: []const u8, 87 + cid: []const u8, 88 + record: struct { 89 + text: []const u8 = "", 90 + embed: ?struct { 91 + external: ?struct { uri: []const u8 } = null, 92 + } = null, 93 + }, 94 + }; 95 + 96 + /// fetch recent posts from an author's feed 97 + pub fn getAuthorFeed(alloc: Allocator, actor_did: []const u8, limit: usize) ![]AuthorPost { 98 + var client = zat.XrpcClient.init(alloc, BSKY_PUBLIC_API); 99 + defer client.deinit(); 100 + 101 + var post_list: std.ArrayList(AuthorPost) = .empty; 102 + errdefer { 103 + for (post_list.items) |p| freeAuthorPost(alloc, p); 104 + post_list.deinit(alloc); 105 + } 106 + 107 + const nsid = zat.Nsid.parse("app.bsky.feed.getAuthorFeed") orelse return error.InvalidNsid; 108 + 109 + var params = std.StringHashMap([]const u8).init(alloc); 110 + defer params.deinit(); 111 + 112 + var limit_buf: [8]u8 = undefined; 113 + const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{@min(limit, 100)}) catch return error.FormatError; 114 + 115 + try params.put("actor", actor_did); 116 + try params.put("limit", limit_str); 117 + try params.put("filter", "posts_no_replies"); 118 + 119 + var response = client.query(nsid, params) catch |err| { 120 + log.err("getAuthorFeed API error for {s}: {}", .{ actor_did, err }); 121 + return error.ApiFailed; 122 + }; 123 + defer response.deinit(); 124 + 125 + if (!response.ok()) { 126 + return error.ApiFailed; 127 + } 128 + 129 + var parsed = response.json() catch return error.InvalidJson; 130 + defer parsed.deinit(); 131 + 132 + const feed = zat.json.getArray(parsed.value, "feed") orelse return try post_list.toOwnedSlice(alloc); 133 + 134 + for (feed) |item| { 135 + const post = zat.json.extractAt(FeedPost, alloc, item, .{"post"}) catch continue; 136 + 137 + const embed_uri = if (post.record.embed) |e| if (e.external) |ext| ext.uri else null else null; 138 + 139 + try post_list.append(alloc, .{ 140 + .uri = try alloc.dupe(u8, post.uri), 141 + .cid = try alloc.dupe(u8, post.cid), 142 + .text = try alloc.dupe(u8, post.record.text), 143 + .embed_uri = if (embed_uri) |eu| try alloc.dupe(u8, eu) else null, 144 + }); 145 + } 146 + 147 + return try post_list.toOwnedSlice(alloc); 148 + } 149 + 150 + pub fn freeAuthorPost(alloc: Allocator, post: AuthorPost) void { 151 + alloc.free(post.uri); 152 + alloc.free(post.cid); 153 + alloc.free(post.text); 154 + if (post.embed_uri) |eu| alloc.free(eu); 155 + } 156 + 157 + pub fn freeAuthorFeed(alloc: Allocator, posts: []AuthorPost) void { 158 + for (posts) |p| freeAuthorPost(alloc, p); 159 + alloc.free(posts); 160 + }
+72
src/bsky/auth.zig
···
··· 1 + const std = @import("std"); 2 + const mem = std.mem; 3 + const Allocator = mem.Allocator; 4 + const zat = @import("zat"); 5 + 6 + const log = std.log.scoped(.bsky_auth); 7 + 8 + /// extract requester DID from Authorization header (claims-only, no signature verification) 9 + pub fn getRequesterDid(alloc: Allocator, auth_header: ?[]const u8, service_did: []const u8) ?[]const u8 { 10 + const auth = auth_header orelse return null; 11 + if (!mem.startsWith(u8, auth, "Bearer ")) return null; 12 + 13 + const token = auth["Bearer ".len..]; 14 + 15 + var jwt = zat.Jwt.parse(alloc, token) catch return null; 16 + defer jwt.deinit(); 17 + 18 + if (jwt.isExpired()) return null; 19 + if (!mem.eql(u8, jwt.payload.aud, service_did)) return null; 20 + 21 + return alloc.dupe(u8, jwt.payload.iss) catch null; 22 + } 23 + 24 + /// extract requester DID with full signature verification 25 + pub fn getRequesterDidVerified(alloc: Allocator, auth_header: ?[]const u8, service_did: []const u8) ?[]const u8 { 26 + const auth = auth_header orelse return null; 27 + if (!mem.startsWith(u8, auth, "Bearer ")) return null; 28 + 29 + const token = auth["Bearer ".len..]; 30 + 31 + var jwt = zat.Jwt.parse(alloc, token) catch |err| { 32 + log.debug("jwt parse failed: {}", .{err}); 33 + return null; 34 + }; 35 + defer jwt.deinit(); 36 + 37 + if (jwt.isExpired()) { 38 + log.debug("jwt expired", .{}); 39 + return null; 40 + } 41 + if (!mem.eql(u8, jwt.payload.aud, service_did)) { 42 + log.debug("jwt audience mismatch", .{}); 43 + return null; 44 + } 45 + 46 + // resolve issuer's DID document to get signing key 47 + const did = zat.Did.parse(jwt.payload.iss) orelse { 48 + log.debug("invalid issuer DID: {s}", .{jwt.payload.iss}); 49 + return null; 50 + }; 51 + 52 + var resolver = zat.DidResolver.init(alloc); 53 + defer resolver.deinit(); 54 + 55 + var doc = resolver.resolve(did) catch |err| { 56 + log.debug("DID resolution failed: {}", .{err}); 57 + return null; 58 + }; 59 + defer doc.deinit(); 60 + 61 + const signing_key = doc.signingKey() orelse { 62 + log.debug("no signing key in DID document", .{}); 63 + return null; 64 + }; 65 + 66 + jwt.verify(signing_key.public_key_multibase) catch |err| { 67 + log.debug("jwt signature verification failed: {}", .{err}); 68 + return null; 69 + }; 70 + 71 + return alloc.dupe(u8, jwt.payload.iss) catch null; 72 + }
+48
src/feed/backfill.zig
···
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + const api = @import("../bsky/api.zig"); 4 + const posts = @import("../store/posts.zig"); 5 + const backfill_store = @import("../store/backfill.zig"); 6 + const filter = @import("filter.zig"); 7 + 8 + const log = std.log.scoped(.backfill); 9 + 10 + /// backfill posts from a single account 11 + fn backfillAccount(alloc: Allocator, did: []const u8) void { 12 + if (backfill_store.isComplete(did)) return; 13 + 14 + log.debug("backfilling {s}...", .{did}); 15 + 16 + const author_posts = api.getAuthorFeed(alloc, did, 50) catch |err| { 17 + log.debug("backfill failed for {s}: {}", .{ did, err }); 18 + return; 19 + }; 20 + defer api.freeAuthorFeed(alloc, author_posts); 21 + 22 + var added: usize = 0; 23 + for (author_posts) |post| { 24 + const text_match = filter.textContainsMusicLink(post.text); 25 + const embed_match = if (post.embed_uri) |eu| filter.textContainsMusicLink(eu) else false; 26 + if (text_match or embed_match) { 27 + posts.add(post.uri, post.cid) catch continue; 28 + added += 1; 29 + } 30 + } 31 + 32 + backfill_store.markComplete(did); 33 + log.debug("backfilled {s}: {d} music posts from {d} total", .{ did, added, author_posts.len }); 34 + } 35 + 36 + /// trigger backfill for a list of DIDs (called after fetching follows) 37 + pub fn trigger(alloc: Allocator, dids: []const []const u8) void { 38 + const max_per_request = 10; 39 + var backfilled: usize = 0; 40 + 41 + for (dids) |did| { 42 + if (backfilled >= max_per_request) break; 43 + if (!backfill_store.isComplete(did)) { 44 + backfillAccount(alloc, did); 45 + backfilled += 1; 46 + } 47 + } 48 + }
+64
src/feed/skeleton.zig
···
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + const posts = @import("../store/posts.zig"); 4 + 5 + /// build a feed skeleton response JSON 6 + pub fn build( 7 + alloc: Allocator, 8 + post_list: []const posts.Post, 9 + next_cursor: ?posts.Cursor, 10 + ) ![]const u8 { 11 + var buf: std.ArrayList(u8) = .empty; 12 + errdefer buf.deinit(alloc); 13 + const w = buf.writer(alloc); 14 + 15 + try w.writeAll("{\"cursor\":"); 16 + 17 + if (next_cursor) |c| { 18 + var cursor_buf: [128]u8 = undefined; 19 + const cursor_str = c.format(&cursor_buf); 20 + try w.print("\"{s}\"", .{cursor_str}); 21 + } else { 22 + try w.writeAll("\"eof\""); 23 + } 24 + 25 + try w.writeAll(",\"feed\":["); 26 + 27 + for (post_list, 0..) |post, i| { 28 + if (i > 0) try w.writeAll(","); 29 + try w.print("{{\"post\":\"{s}\"}}", .{post.uri}); 30 + } 31 + 32 + try w.writeAll("]}"); 33 + 34 + return try buf.toOwnedSlice(alloc); 35 + } 36 + 37 + /// build an empty/EOF response 38 + pub fn empty(alloc: Allocator) ![]const u8 { 39 + return try alloc.dupe(u8, "{\"cursor\":\"eof\",\"feed\":[]}"); 40 + } 41 + 42 + test "build empty feed" { 43 + const alloc = std.testing.allocator; 44 + const result = try build(alloc, &.{}, null); 45 + defer alloc.free(result); 46 + try std.testing.expectEqualStrings("{\"cursor\":\"eof\",\"feed\":[]}", result); 47 + } 48 + 49 + test "build feed with posts" { 50 + const alloc = std.testing.allocator; 51 + 52 + const post_list = [_]posts.Post{ 53 + .{ .uri = "at://did:plc:abc/app.bsky.feed.post/123", .cid = "cid1", .indexed_at = 1000 }, 54 + .{ .uri = "at://did:plc:xyz/app.bsky.feed.post/456", .cid = "cid2", .indexed_at = 900 }, 55 + }; 56 + 57 + const cursor = posts.Cursor{ .timestamp = 900, .cid = "cid2" }; 58 + const result = try build(alloc, &post_list, cursor); 59 + defer alloc.free(result); 60 + 61 + try std.testing.expect(std.mem.indexOf(u8, result, "\"cursor\":\"900::cid2\"") != null); 62 + try std.testing.expect(std.mem.indexOf(u8, result, "at://did:plc:abc/app.bsky.feed.post/123") != null); 63 + try std.testing.expect(std.mem.indexOf(u8, result, "at://did:plc:xyz/app.bsky.feed.post/456") != null); 64 + }
+4 -2
src/main.zig
··· 4 const Thread = std.Thread; 5 6 const http = @import("server/http.zig"); 7 - const db = @import("stream/db.zig"); 8 - const jetstream = @import("stream/jetstream.zig"); 9 const stats = @import("server/stats.zig"); 10 11 const log = std.log.scoped(.main); ··· 78 79 test { 80 _ = @import("feed/filter.zig"); 81 _ = @import("server/stats.zig"); 82 }
··· 4 const Thread = std.Thread; 5 6 const http = @import("server/http.zig"); 7 + const db = @import("store/db.zig"); 8 + const jetstream = @import("jetstream.zig"); 9 const stats = @import("server/stats.zig"); 10 11 const log = std.log.scoped(.main); ··· 78 79 test { 80 _ = @import("feed/filter.zig"); 81 + _ = @import("feed/skeleton.zig"); 82 _ = @import("server/stats.zig"); 83 + _ = @import("store/posts.zig"); 84 }
+2 -2
src/server/dashboard.zig
··· 1 const std = @import("std"); 2 const stats = @import("stats.zig"); 3 - const db = @import("../stream/db.zig"); 4 5 fn formatNumber(buf: []u8, n: u64) []const u8 { 6 var temp: [32]u8 = undefined; ··· 25 26 pub fn render(alloc: std.mem.Allocator) ![]const u8 { 27 const s = stats.get(); 28 - const post_count = db.getPostCount(); 29 const lag_ms = s.getPostLagMs(); 30 const lag_sec = @divTrunc(lag_ms, 1000); 31 const status = s.getStatus();
··· 1 const std = @import("std"); 2 const stats = @import("stats.zig"); 3 + const posts = @import("../store/posts.zig"); 4 5 fn formatNumber(buf: []u8, n: u64) []const u8 { 6 var temp: [32]u8 = undefined; ··· 25 26 pub fn render(alloc: std.mem.Allocator) ![]const u8 { 27 const s = stats.get(); 28 + const post_count = posts.count(); 29 const lag_ms = s.getPostLagMs(); 30 const lag_sec = @divTrunc(lag_ms, 1000); 31 const status = s.getStatus();
+62 -10
src/server/http.zig
··· 4 const mem = std.mem; 5 const config = @import("../feed/config.zig"); 6 const dashboard = @import("dashboard.zig"); 7 - const atproto = @import("../stream/atproto.zig"); 8 const stats = @import("stats.zig"); 9 - const db = @import("../stream/db.zig"); 10 11 const HTTP_BUF_SIZE = 8192; 12 ··· 122 123 // use verified version for full signature verification 124 // falls back to claims-only if DID resolution fails 125 - var requester_did = atproto.getRequesterDidVerified(alloc, auth_header, service_did); 126 if (requester_did == null) { 127 // fallback to claims-only verification 128 - requester_did = atproto.getRequesterDid(alloc, auth_header, service_did); 129 } 130 131 // parse query params ··· 150 std.debug.print("feed request: {s} (anonymous)\n", .{@tagName(feed_type)}); 151 } 152 153 - const cursor = parseQueryParam(alloc, target, "cursor") catch null; 154 const limit_str = parseQueryParam(alloc, target, "limit") catch "20"; 155 const limit = std.fmt.parseInt(usize, limit_str, 10) catch 20; 156 157 - const result = db.getPosts(alloc, feed_type, requester_did, cursor, limit) catch |err| { 158 std.debug.print("feed error: {}\n", .{err}); 159 - try sendJson(request, .internal_server_error, 160 - \\{"error":"internal error"} 161 - ); 162 return; 163 }; 164 ··· 232 const last_event_ago = if (last_event_at > 0) @divTrunc(now_ms - last_event_at, 1000) else 0; 233 const last_match_ago = if (last_match_at > 0) @divTrunc(now_ms - last_match_at, 1000) else 0; 234 235 - const post_count = db.getPostCount(); 236 const status = s.getStatus(); 237 238 var buf: std.ArrayList(u8) = .{};
··· 4 const mem = std.mem; 5 const config = @import("../feed/config.zig"); 6 const dashboard = @import("dashboard.zig"); 7 + const auth = @import("../bsky/auth.zig"); 8 + const api = @import("../bsky/api.zig"); 9 const stats = @import("stats.zig"); 10 + const posts = @import("../store/posts.zig"); 11 + const skeleton = @import("../feed/skeleton.zig"); 12 + const backfill = @import("../feed/backfill.zig"); 13 14 const HTTP_BUF_SIZE = 8192; 15 ··· 125 126 // use verified version for full signature verification 127 // falls back to claims-only if DID resolution fails 128 + var requester_did = auth.getRequesterDidVerified(alloc, auth_header, service_did); 129 if (requester_did == null) { 130 // fallback to claims-only verification 131 + requester_did = auth.getRequesterDid(alloc, auth_header, service_did); 132 } 133 134 // parse query params ··· 153 std.debug.print("feed request: {s} (anonymous)\n", .{@tagName(feed_type)}); 154 } 155 156 + const cursor_str = parseQueryParam(alloc, target, "cursor") catch null; 157 const limit_str = parseQueryParam(alloc, target, "limit") catch "20"; 158 const limit = std.fmt.parseInt(usize, limit_str, 10) catch 20; 159 160 + // handle eof cursor 161 + if (cursor_str) |c| { 162 + if (mem.eql(u8, c, "eof")) { 163 + const empty_response = skeleton.empty(alloc) catch { 164 + try sendJson(request, .internal_server_error, "{\"error\":\"internal error\"}"); 165 + return; 166 + }; 167 + try sendJson(request, .ok, empty_response); 168 + return; 169 + } 170 + } 171 + 172 + // parse cursor 173 + const cursor = if (cursor_str) |c| posts.Cursor.parse(c) else null; 174 + 175 + // get author filter for following feed 176 + var author_filter: ?[][]const u8 = null; 177 + defer if (author_filter) |f| api.freeFollows(alloc, f); 178 + 179 + if (feed_type == .following) { 180 + if (requester_did) |did| { 181 + // fetch follows 182 + author_filter = api.getFollows(alloc, did) catch |err| blk: { 183 + std.debug.print("failed to fetch follows: {}\n", .{err}); 184 + break :blk null; 185 + }; 186 + 187 + // trigger backfill for followed accounts 188 + if (author_filter) |follows| { 189 + backfill.trigger(alloc, follows); 190 + } 191 + } else { 192 + // anonymous request to following feed - return empty 193 + const empty_response = skeleton.empty(alloc) catch { 194 + try sendJson(request, .internal_server_error, "{\"error\":\"internal error\"}"); 195 + return; 196 + }; 197 + try sendJson(request, .ok, empty_response); 198 + return; 199 + } 200 + } 201 + 202 + // query posts 203 + const query_result = posts.query(alloc, cursor, limit, author_filter) catch |err| { 204 std.debug.print("feed error: {}\n", .{err}); 205 + try sendJson(request, .internal_server_error, "{\"error\":\"internal error\"}"); 206 + return; 207 + }; 208 + defer posts.freeQueryResult(alloc, query_result.posts); 209 + 210 + // build response 211 + const result = skeleton.build(alloc, query_result.posts, query_result.next_cursor) catch |err| { 212 + std.debug.print("skeleton build error: {}\n", .{err}); 213 + try sendJson(request, .internal_server_error, "{\"error\":\"internal error\"}"); 214 return; 215 }; 216 ··· 284 const last_event_ago = if (last_event_at > 0) @divTrunc(now_ms - last_event_at, 1000) else 0; 285 const last_match_ago = if (last_match_at > 0) @divTrunc(now_ms - last_match_at, 1000) else 0; 286 287 + const post_count = posts.count(); 288 const status = s.getStatus(); 289 290 var buf: std.ArrayList(u8) = .{};
+30
src/store/backfill.zig
···
··· 1 + const std = @import("std"); 2 + const db = @import("db.zig"); 3 + 4 + /// check if an account has been backfilled 5 + pub fn isComplete(did: []const u8) bool { 6 + db.lock(); 7 + defer db.unlock(); 8 + 9 + const conn = db.getConn() orelse return false; 10 + var rows = conn.rows( 11 + "SELECT 1 FROM backfilled_accounts WHERE did = ?", 12 + .{did}, 13 + ) catch return false; 14 + defer rows.deinit(); 15 + 16 + return rows.next() != null; 17 + } 18 + 19 + /// mark an account as backfilled 20 + pub fn markComplete(did: []const u8) void { 21 + db.lock(); 22 + defer db.unlock(); 23 + 24 + const conn = db.getConn() orelse return; 25 + const now = std.time.milliTimestamp(); 26 + conn.exec( 27 + "INSERT OR REPLACE INTO backfilled_accounts (did, backfilled_at) VALUES (?, ?)", 28 + .{ did, now }, 29 + ) catch {}; 30 + }
+105
src/store/db.zig
···
··· 1 + const std = @import("std"); 2 + const posix = std.posix; 3 + const zqlite = @import("zqlite"); 4 + 5 + var conn: ?zqlite.Conn = null; 6 + var mutex: std.Thread.Mutex = .{}; 7 + 8 + var path_buf: [256]u8 = undefined; 9 + 10 + pub fn init() !void { 11 + const path_env = posix.getenv("DATABASE_PATH") orelse "/data/feed.db"; 12 + 13 + @memcpy(path_buf[0..path_env.len], path_env); 14 + path_buf[path_env.len] = 0; 15 + const path: [*:0]const u8 = path_buf[0..path_env.len :0]; 16 + 17 + std.debug.print("opening database: {s}\n", .{path_env}); 18 + 19 + const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 20 + conn = zqlite.open(path, flags) catch |err| { 21 + std.debug.print("failed to open database: {}\n", .{err}); 22 + return err; 23 + }; 24 + 25 + // enable WAL mode for better concurrency 26 + _ = conn.?.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 27 + _ = conn.?.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 28 + 29 + // create tables 30 + try initSchema(); 31 + 32 + std.debug.print("database initialized\n", .{}); 33 + } 34 + 35 + fn initSchema() !void { 36 + const c = conn orelse return error.NotInitialized; 37 + 38 + c.exec( 39 + \\CREATE TABLE IF NOT EXISTS posts ( 40 + \\ uri TEXT PRIMARY KEY, 41 + \\ cid TEXT NOT NULL, 42 + \\ indexed_at INTEGER NOT NULL, 43 + \\ author_did TEXT 44 + \\) 45 + , .{}) catch |err| { 46 + std.debug.print("failed to create posts table: {}\n", .{err}); 47 + return err; 48 + }; 49 + 50 + c.exec( 51 + "CREATE INDEX IF NOT EXISTS idx_posts_indexed_at ON posts(indexed_at DESC)", 52 + .{}, 53 + ) catch {}; 54 + 55 + c.exec( 56 + "CREATE INDEX IF NOT EXISTS idx_posts_author ON posts(author_did)", 57 + .{}, 58 + ) catch {}; 59 + 60 + c.exec( 61 + \\CREATE TABLE IF NOT EXISTS follows_cache ( 62 + \\ requester_did TEXT NOT NULL, 63 + \\ follows_did TEXT NOT NULL, 64 + \\ cached_at INTEGER NOT NULL, 65 + \\ PRIMARY KEY (requester_did, follows_did) 66 + \\) 67 + , .{}) catch |err| { 68 + std.debug.print("failed to create follows_cache table: {}\n", .{err}); 69 + return err; 70 + }; 71 + 72 + c.exec( 73 + "CREATE INDEX IF NOT EXISTS idx_follows_cache_requester ON follows_cache(requester_did, cached_at)", 74 + .{}, 75 + ) catch {}; 76 + 77 + c.exec( 78 + \\CREATE TABLE IF NOT EXISTS backfilled_accounts ( 79 + \\ did TEXT PRIMARY KEY, 80 + \\ backfilled_at INTEGER NOT NULL 81 + \\) 82 + , .{}) catch |err| { 83 + std.debug.print("failed to create backfilled_accounts table: {}\n", .{err}); 84 + return err; 85 + }; 86 + } 87 + 88 + pub fn getConn() ?zqlite.Conn { 89 + return conn; 90 + } 91 + 92 + pub fn lock() void { 93 + mutex.lock(); 94 + } 95 + 96 + pub fn unlock() void { 97 + mutex.unlock(); 98 + } 99 + 100 + /// execute with mutex held 101 + pub fn withLock(comptime func: fn (?zqlite.Conn) anyerror!void) !void { 102 + mutex.lock(); 103 + defer mutex.unlock(); 104 + try func(conn); 105 + }
+72
src/store/follows.zig
···
··· 1 + const std = @import("std"); 2 + const db = @import("db.zig"); 3 + 4 + /// cache follows for a requester (replaces existing) 5 + pub fn cache(requester_did: []const u8, follows: []const []const u8) !void { 6 + db.lock(); 7 + defer db.unlock(); 8 + 9 + const conn = db.getConn() orelse return error.NotInitialized; 10 + const now = std.time.milliTimestamp(); 11 + 12 + // clear old entries for this requester 13 + conn.exec( 14 + "DELETE FROM follows_cache WHERE requester_did = ?", 15 + .{requester_did}, 16 + ) catch {}; 17 + 18 + // insert new follows 19 + for (follows) |did| { 20 + conn.exec( 21 + "INSERT INTO follows_cache (requester_did, follows_did, cached_at) VALUES (?, ?, ?)", 22 + .{ requester_did, did, now }, 23 + ) catch {}; 24 + } 25 + } 26 + 27 + /// get cached follows as slice of DIDs (caller owns memory) 28 + pub fn get(alloc: std.mem.Allocator, requester_did: []const u8) ![][]const u8 { 29 + db.lock(); 30 + defer db.unlock(); 31 + 32 + const conn = db.getConn() orelse return error.NotInitialized; 33 + 34 + var result = std.ArrayList([]const u8).init(alloc); 35 + errdefer { 36 + for (result.items) |item| alloc.free(item); 37 + result.deinit(); 38 + } 39 + 40 + var rows = conn.rows( 41 + "SELECT follows_did FROM follows_cache WHERE requester_did = ?", 42 + .{requester_did}, 43 + ) catch return error.QueryFailed; 44 + defer rows.deinit(); 45 + 46 + while (rows.next()) |row| { 47 + const did = try alloc.dupe(u8, row.text(0)); 48 + try result.append(did); 49 + } 50 + 51 + return try result.toOwnedSlice(); 52 + } 53 + 54 + pub fn freeFollows(alloc: std.mem.Allocator, follows: [][]const u8) void { 55 + for (follows) |did| alloc.free(did); 56 + alloc.free(follows); 57 + } 58 + 59 + /// check if we have cached follows for a requester 60 + pub fn hasCached(requester_did: []const u8) bool { 61 + db.lock(); 62 + defer db.unlock(); 63 + 64 + const conn = db.getConn() orelse return false; 65 + var rows = conn.rows( 66 + "SELECT 1 FROM follows_cache WHERE requester_did = ? LIMIT 1", 67 + .{requester_did}, 68 + ) catch return false; 69 + defer rows.deinit(); 70 + 71 + return rows.next() != null; 72 + }
+190
src/store/posts.zig
···
··· 1 + const std = @import("std"); 2 + const zat = @import("zat"); 3 + const db = @import("db.zig"); 4 + 5 + pub fn add(uri: []const u8, cid: []const u8) !void { 6 + try addWithTimestamp(uri, cid, null); 7 + } 8 + 9 + pub fn addWithTimestamp(uri: []const u8, cid: []const u8, timestamp: ?i64) !void { 10 + db.lock(); 11 + defer db.unlock(); 12 + 13 + const conn = db.getConn() orelse return error.NotInitialized; 14 + 15 + const ts = timestamp orelse parseTimestampFromUri(uri) orelse std.time.milliTimestamp(); 16 + const author_did = extractDidFromUri(uri); 17 + 18 + conn.exec( 19 + "INSERT OR IGNORE INTO posts (uri, cid, indexed_at, author_did) VALUES (?, ?, ?, ?)", 20 + .{ uri, cid, ts, author_did }, 21 + ) catch |err| { 22 + std.debug.print("failed to insert post: {}\n", .{err}); 23 + return err; 24 + }; 25 + } 26 + 27 + pub fn count() usize { 28 + db.lock(); 29 + defer db.unlock(); 30 + 31 + const conn = db.getConn() orelse return 0; 32 + var rows = conn.rows("SELECT COUNT(*) FROM posts", .{}) catch return 0; 33 + defer rows.deinit(); 34 + 35 + if (rows.next()) |row| { 36 + return @intCast(row.int(0)); 37 + } 38 + return 0; 39 + } 40 + 41 + /// row from posts table 42 + pub const Post = struct { 43 + uri: []const u8, 44 + cid: []const u8, 45 + indexed_at: i64, 46 + }; 47 + 48 + /// cursor for pagination 49 + pub const Cursor = struct { 50 + timestamp: i64, 51 + cid: []const u8, 52 + 53 + pub fn parse(cursor_str: []const u8) ?Cursor { 54 + const sep = std.mem.indexOf(u8, cursor_str, "::") orelse return null; 55 + const ts = std.fmt.parseInt(i64, cursor_str[0..sep], 10) catch return null; 56 + return .{ 57 + .timestamp = ts, 58 + .cid = cursor_str[sep + 2 ..], 59 + }; 60 + } 61 + 62 + pub fn format(self: Cursor, buf: []u8) []const u8 { 63 + return std.fmt.bufPrint(buf, "{d}::{s}", .{ self.timestamp, self.cid }) catch buf[0..0]; 64 + } 65 + }; 66 + 67 + /// query posts with cursor-based pagination 68 + pub fn query( 69 + alloc: std.mem.Allocator, 70 + cursor: ?Cursor, 71 + limit: usize, 72 + author_filter: ?[]const []const u8, 73 + ) !struct { posts: []Post, next_cursor: ?Cursor } { 74 + db.lock(); 75 + defer db.unlock(); 76 + 77 + const conn = db.getConn() orelse return error.NotInitialized; 78 + 79 + const cursor_ts = if (cursor) |c| c.timestamp else std.math.maxInt(i64); 80 + const cursor_cid = if (cursor) |c| c.cid else ""; 81 + 82 + var post_list: std.ArrayList(Post) = .empty; 83 + errdefer post_list.deinit(alloc); 84 + 85 + var next_cursor: ?Cursor = null; 86 + 87 + if (author_filter) |authors| { 88 + if (authors.len == 0) { 89 + return .{ .posts = try post_list.toOwnedSlice(alloc), .next_cursor = null }; 90 + } 91 + 92 + // build IN clause query 93 + var query_buf: std.ArrayList(u8) = .empty; 94 + defer query_buf.deinit(alloc); 95 + const qw = query_buf.writer(alloc); 96 + 97 + try qw.writeAll("SELECT uri, cid, indexed_at FROM posts WHERE author_did IN ("); 98 + 99 + for (authors, 0..) |did, i| { 100 + if (i > 0) try qw.writeAll(","); 101 + try qw.print("'{s}'", .{did}); 102 + } 103 + 104 + try qw.print( 105 + ") AND (indexed_at < {d} OR (indexed_at = {d} AND cid < '{s}')) ORDER BY indexed_at DESC, cid DESC LIMIT {d}", 106 + .{ cursor_ts, cursor_ts, cursor_cid, limit }, 107 + ); 108 + 109 + try query_buf.append(alloc, 0); 110 + const query_z: [:0]const u8 = query_buf.items[0 .. query_buf.items.len - 1 :0]; 111 + 112 + var rows = conn.rows(query_z, .{}) catch return error.QueryFailed; 113 + defer rows.deinit(); 114 + 115 + while (rows.next()) |row| { 116 + const uri = try alloc.dupe(u8, row.text(0)); 117 + const cid = try alloc.dupe(u8, row.text(1)); 118 + const ts = row.int(2); 119 + 120 + try post_list.append(alloc, .{ .uri = uri, .cid = cid, .indexed_at = ts }); 121 + next_cursor = .{ .timestamp = ts, .cid = cid }; 122 + } 123 + } else { 124 + var rows = conn.rows( 125 + \\SELECT uri, cid, indexed_at FROM posts 126 + \\WHERE indexed_at < ? OR (indexed_at = ? AND cid < ?) 127 + \\ORDER BY indexed_at DESC, cid DESC 128 + \\LIMIT ? 129 + , .{ cursor_ts, cursor_ts, cursor_cid, limit }) catch return error.QueryFailed; 130 + defer rows.deinit(); 131 + 132 + while (rows.next()) |row| { 133 + const uri = try alloc.dupe(u8, row.text(0)); 134 + const cid = try alloc.dupe(u8, row.text(1)); 135 + const ts = row.int(2); 136 + 137 + try post_list.append(alloc, .{ .uri = uri, .cid = cid, .indexed_at = ts }); 138 + next_cursor = .{ .timestamp = ts, .cid = cid }; 139 + } 140 + } 141 + 142 + return .{ .posts = try post_list.toOwnedSlice(alloc), .next_cursor = next_cursor }; 143 + } 144 + 145 + pub fn freeQueryResult(alloc: std.mem.Allocator, posts: []Post) void { 146 + for (posts) |p| { 147 + alloc.free(p.uri); 148 + alloc.free(p.cid); 149 + } 150 + alloc.free(posts); 151 + } 152 + 153 + // helpers 154 + 155 + /// parse TID (rkey) to get timestamp in milliseconds 156 + pub fn parseTidTimestamp(rkey: []const u8) i64 { 157 + const tid = zat.Tid.parse(rkey) orelse return 0; 158 + return @intCast(tid.timestamp() / 1000); // microseconds -> milliseconds 159 + } 160 + 161 + fn parseTimestampFromUri(uri: []const u8) ?i64 { 162 + const at_uri = zat.AtUri.parse(uri) orelse return null; 163 + const rkey = at_uri.rkey() orelse return null; 164 + const tid = zat.Tid.parse(rkey) orelse return null; 165 + const ts: i64 = @intCast(tid.timestamp() / 1000); 166 + return if (ts > 0) ts else null; 167 + } 168 + 169 + fn extractDidFromUri(uri: []const u8) ?[]const u8 { 170 + const at_uri = zat.AtUri.parse(uri) orelse return null; 171 + return at_uri.authority(); 172 + } 173 + 174 + // tests 175 + 176 + test "Cursor.parse" { 177 + const c = Cursor.parse("1234567890::abc123") orelse unreachable; 178 + try std.testing.expectEqual(1234567890, c.timestamp); 179 + try std.testing.expectEqualStrings("abc123", c.cid); 180 + 181 + try std.testing.expect(Cursor.parse("invalid") == null); 182 + try std.testing.expect(Cursor.parse("notanumber::cid") == null); 183 + } 184 + 185 + test "Cursor.format" { 186 + var buf: [64]u8 = undefined; 187 + const c = Cursor{ .timestamp = 1234567890, .cid = "abc123" }; 188 + const formatted = c.format(&buf); 189 + try std.testing.expectEqualStrings("1234567890::abc123", formatted); 190 + }
-228
src/stream/atproto.zig
··· 1 - const std = @import("std"); 2 - const mem = std.mem; 3 - const Allocator = mem.Allocator; 4 - const zat = @import("zat"); 5 - 6 - // ============================================================================= 7 - // atproto utilities 8 - // 9 - // JWT verification and API helpers using zat SDK. 10 - // ============================================================================= 11 - 12 - const log = std.log.scoped(.atproto); 13 - 14 - /// extract requester DID from an HTTP Authorization header (claims-only verification). 15 - pub fn getRequesterDid(allocator: Allocator, auth_header: ?[]const u8, service_did: []const u8) ?[]const u8 { 16 - const auth = auth_header orelse return null; 17 - if (!mem.startsWith(u8, auth, "Bearer ")) return null; 18 - 19 - const token = auth["Bearer ".len..]; 20 - 21 - var jwt = zat.Jwt.parse(allocator, token) catch return null; 22 - defer jwt.deinit(); 23 - 24 - // check claims 25 - if (jwt.isExpired()) return null; 26 - if (!mem.eql(u8, jwt.payload.aud, service_did)) return null; 27 - 28 - return allocator.dupe(u8, jwt.payload.iss) catch null; 29 - } 30 - 31 - /// extract requester DID with full signature verification. 32 - pub fn getRequesterDidVerified(allocator: Allocator, auth_header: ?[]const u8, service_did: []const u8) ?[]const u8 { 33 - const auth = auth_header orelse return null; 34 - if (!mem.startsWith(u8, auth, "Bearer ")) return null; 35 - 36 - const token = auth["Bearer ".len..]; 37 - 38 - var jwt = zat.Jwt.parse(allocator, token) catch |err| { 39 - log.debug("jwt parse failed: {}", .{err}); 40 - return null; 41 - }; 42 - defer jwt.deinit(); 43 - 44 - // check claims 45 - if (jwt.isExpired()) { 46 - log.debug("jwt expired", .{}); 47 - return null; 48 - } 49 - if (!mem.eql(u8, jwt.payload.aud, service_did)) { 50 - log.debug("jwt audience mismatch", .{}); 51 - return null; 52 - } 53 - 54 - // resolve issuer's DID document to get signing key 55 - const did = zat.Did.parse(jwt.payload.iss) orelse { 56 - log.debug("invalid issuer DID: {s}", .{jwt.payload.iss}); 57 - return null; 58 - }; 59 - 60 - var resolver = zat.DidResolver.init(allocator); 61 - defer resolver.deinit(); 62 - 63 - var doc = resolver.resolve(did) catch |err| { 64 - log.debug("DID resolution failed: {}", .{err}); 65 - return null; 66 - }; 67 - defer doc.deinit(); 68 - 69 - const signing_key = doc.signingKey() orelse { 70 - log.debug("no signing key in DID document", .{}); 71 - return null; 72 - }; 73 - 74 - // verify signature 75 - jwt.verify(signing_key.public_key_multibase) catch |err| { 76 - log.debug("jwt signature verification failed: {}", .{err}); 77 - return null; 78 - }; 79 - 80 - return allocator.dupe(u8, jwt.payload.iss) catch null; 81 - } 82 - 83 - // ----------------------------------------------------------------------------- 84 - // social graph API (using zat.XrpcClient) 85 - // ----------------------------------------------------------------------------- 86 - 87 - const BSKY_PUBLIC_API = "https://public.api.bsky.app"; 88 - 89 - /// fetch all DIDs that a user follows (with pagination). 90 - /// returns newline-separated DIDs. 91 - pub fn getFollows(allocator: Allocator, actor_did: []const u8) ![]u8 { 92 - var client = zat.XrpcClient.init(allocator, BSKY_PUBLIC_API); 93 - defer client.deinit(); 94 - 95 - var result: std.ArrayList(u8) = .empty; 96 - errdefer result.deinit(allocator); 97 - 98 - var cursor: ?[]const u8 = null; 99 - defer if (cursor) |c| allocator.free(c); 100 - 101 - const nsid = zat.Nsid.parse("app.bsky.graph.getFollows") orelse return error.InvalidNsid; 102 - 103 - while (true) { 104 - var params = std.StringHashMap([]const u8).init(allocator); 105 - defer params.deinit(); 106 - 107 - try params.put("actor", actor_did); 108 - try params.put("limit", "100"); 109 - if (cursor) |c| { 110 - try params.put("cursor", c); 111 - } 112 - 113 - var response = client.query(nsid, params) catch |err| { 114 - log.err("getFollows API error: {}", .{err}); 115 - return error.ApiFailed; 116 - }; 117 - defer response.deinit(); 118 - 119 - if (!response.ok()) { 120 - return error.ApiFailed; 121 - } 122 - 123 - var parsed = response.json() catch return error.InvalidJson; 124 - defer parsed.deinit(); 125 - 126 - // extract follows array using zat.json helpers 127 - const follows = zat.json.getArray(parsed.value, "follows") orelse break; 128 - 129 - for (follows) |item| { 130 - const did = zat.json.getString(item, "did") orelse continue; 131 - try result.appendSlice(allocator, did); 132 - try result.append(allocator, '\n'); 133 - } 134 - 135 - // check for next page 136 - if (cursor) |c| allocator.free(c); 137 - cursor = null; 138 - 139 - if (zat.json.getString(parsed.value, "cursor")) |c| { 140 - if (c.len > 0) { 141 - cursor = try allocator.dupe(u8, c); 142 - } 143 - } 144 - 145 - if (cursor == null) break; 146 - } 147 - 148 - return try result.toOwnedSlice(allocator); 149 - } 150 - 151 - /// a post from getAuthorFeed 152 - pub const AuthorPost = struct { 153 - uri: []const u8, 154 - cid: []const u8, 155 - text: []const u8, 156 - embed_uri: ?[]const u8, 157 - }; 158 - 159 - /// struct for extracting post data from feed items 160 - const FeedPost = struct { 161 - uri: []const u8, 162 - cid: []const u8, 163 - record: struct { 164 - text: []const u8 = "", 165 - embed: ?struct { 166 - external: ?struct { uri: []const u8 } = null, 167 - } = null, 168 - }, 169 - }; 170 - 171 - /// fetch recent posts from an author's feed. 172 - pub fn getAuthorFeed(allocator: Allocator, actor_did: []const u8, limit: usize) ![]AuthorPost { 173 - var client = zat.XrpcClient.init(allocator, BSKY_PUBLIC_API); 174 - defer client.deinit(); 175 - 176 - var posts: std.ArrayList(AuthorPost) = .empty; 177 - errdefer { 178 - for (posts.items) |p| { 179 - allocator.free(p.uri); 180 - allocator.free(p.cid); 181 - allocator.free(p.text); 182 - if (p.embed_uri) |eu| allocator.free(eu); 183 - } 184 - posts.deinit(allocator); 185 - } 186 - 187 - const nsid = zat.Nsid.parse("app.bsky.feed.getAuthorFeed") orelse return error.InvalidNsid; 188 - 189 - var params = std.StringHashMap([]const u8).init(allocator); 190 - defer params.deinit(); 191 - 192 - var limit_buf: [8]u8 = undefined; 193 - const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{@min(limit, 100)}) catch return error.FormatError; 194 - 195 - try params.put("actor", actor_did); 196 - try params.put("limit", limit_str); 197 - try params.put("filter", "posts_no_replies"); 198 - 199 - var response = client.query(nsid, params) catch |err| { 200 - log.err("getAuthorFeed API error for {s}: {}", .{ actor_did, err }); 201 - return error.ApiFailed; 202 - }; 203 - defer response.deinit(); 204 - 205 - if (!response.ok()) { 206 - return error.ApiFailed; 207 - } 208 - 209 - var parsed = response.json() catch return error.InvalidJson; 210 - defer parsed.deinit(); 211 - 212 - const feed = zat.json.getArray(parsed.value, "feed") orelse return try posts.toOwnedSlice(allocator); 213 - 214 - for (feed) |item| { 215 - const post = zat.json.extractAt(FeedPost, allocator, item, .{"post"}) catch continue; 216 - 217 - const embed_uri = if (post.record.embed) |e| if (e.external) |ext| ext.uri else null else null; 218 - 219 - try posts.append(allocator, .{ 220 - .uri = try allocator.dupe(u8, post.uri), 221 - .cid = try allocator.dupe(u8, post.cid), 222 - .text = try allocator.dupe(u8, post.record.text), 223 - .embed_uri = if (embed_uri) |eu| try allocator.dupe(u8, eu) else null, 224 - }); 225 - } 226 - 227 - return try posts.toOwnedSlice(allocator); 228 - }
···
-400
src/stream/db.zig
··· 1 - const std = @import("std"); 2 - const posix = std.posix; 3 - const zqlite = @import("zqlite"); 4 - const zat = @import("zat"); 5 - const config = @import("../feed/config.zig"); 6 - const atproto = @import("atproto.zig"); 7 - 8 - var db: ?zqlite.Conn = null; 9 - var mutex: std.Thread.Mutex = .{}; 10 - 11 - // static buffer for db path (env vars return slices, zqlite needs sentinel) 12 - var db_path_buf: [256]u8 = undefined; 13 - 14 - pub fn init() !void { 15 - const db_path_env = posix.getenv("DATABASE_PATH") orelse "/data/feed.db"; 16 - 17 - // copy to null-terminated buffer 18 - @memcpy(db_path_buf[0..db_path_env.len], db_path_env); 19 - db_path_buf[db_path_env.len] = 0; 20 - const db_path: [*:0]const u8 = db_path_buf[0..db_path_env.len :0]; 21 - 22 - std.debug.print("opening database: {s}\n", .{db_path_env}); 23 - 24 - const flags = zqlite.OpenFlags.Create | zqlite.OpenFlags.ReadWrite; 25 - db = zqlite.open(db_path, flags) catch |err| { 26 - std.debug.print("failed to open database: {}\n", .{err}); 27 - return err; 28 - }; 29 - 30 - // enable WAL mode for better concurrency 31 - _ = db.?.exec("PRAGMA journal_mode=WAL", .{}) catch {}; 32 - _ = db.?.exec("PRAGMA busy_timeout=5000", .{}) catch {}; 33 - 34 - // create tables 35 - db.?.exec( 36 - \\CREATE TABLE IF NOT EXISTS posts ( 37 - \\ uri TEXT PRIMARY KEY, 38 - \\ cid TEXT NOT NULL, 39 - \\ indexed_at INTEGER NOT NULL 40 - \\) 41 - , .{}) catch |err| { 42 - std.debug.print("failed to create posts table: {}\n", .{err}); 43 - return err; 44 - }; 45 - 46 - db.?.exec( 47 - "CREATE INDEX IF NOT EXISTS idx_posts_indexed_at ON posts(indexed_at DESC)", 48 - .{}, 49 - ) catch {}; 50 - 51 - // follows cache table 52 - db.?.exec( 53 - \\CREATE TABLE IF NOT EXISTS follows_cache ( 54 - \\ requester_did TEXT NOT NULL, 55 - \\ follows_did TEXT NOT NULL, 56 - \\ cached_at INTEGER NOT NULL, 57 - \\ PRIMARY KEY (requester_did, follows_did) 58 - \\) 59 - , .{}) catch |err| { 60 - std.debug.print("failed to create follows_cache table: {}\n", .{err}); 61 - return err; 62 - }; 63 - 64 - db.?.exec( 65 - "CREATE INDEX IF NOT EXISTS idx_follows_cache_requester ON follows_cache(requester_did, cached_at)", 66 - .{}, 67 - ) catch {}; 68 - 69 - // backfilled accounts table - track which accounts we've fetched historical posts from 70 - db.?.exec( 71 - \\CREATE TABLE IF NOT EXISTS backfilled_accounts ( 72 - \\ did TEXT PRIMARY KEY, 73 - \\ backfilled_at INTEGER NOT NULL 74 - \\) 75 - , .{}) catch |err| { 76 - std.debug.print("failed to create backfilled_accounts table: {}\n", .{err}); 77 - return err; 78 - }; 79 - 80 - std.debug.print("database initialized\n", .{}); 81 - } 82 - 83 - pub fn addPost(uri: []const u8, cid: []const u8) !void { 84 - try addPostWithTimestamp(uri, cid, null); 85 - } 86 - 87 - /// add a post with explicit timestamp (for backfill) or current time 88 - pub fn addPostWithTimestamp(uri: []const u8, cid: []const u8, timestamp: ?i64) !void { 89 - mutex.lock(); 90 - defer mutex.unlock(); 91 - 92 - const conn = db orelse return error.NotInitialized; 93 - 94 - // use provided timestamp, or try to parse from URI, or fall back to now 95 - const ts = timestamp orelse parseTimestampFromUri(uri) orelse std.time.milliTimestamp(); 96 - 97 - // extract author DID from URI 98 - const author_did = extractDidFromUri(uri); 99 - 100 - conn.exec( 101 - "INSERT OR IGNORE INTO posts (uri, cid, indexed_at, author_did) VALUES (?, ?, ?, ?)", 102 - .{ uri, cid, ts, author_did }, 103 - ) catch |err| { 104 - std.debug.print("failed to insert post: {}\n", .{err}); 105 - return err; 106 - }; 107 - } 108 - 109 - /// parse TID to get timestamp in milliseconds 110 - pub fn parseTidTimestamp(rkey: []const u8) i64 { 111 - const tid = zat.Tid.parse(rkey) orelse return 0; 112 - return @intCast(tid.timestamp() / 1000); // microseconds -> milliseconds 113 - } 114 - 115 - /// parse TID from AT URI to get post timestamp 116 - fn parseTimestampFromUri(uri: []const u8) ?i64 { 117 - const at_uri = zat.AtUri.parse(uri) orelse return null; 118 - const rkey = at_uri.rkey() orelse return null; 119 - const ts = parseTidTimestamp(rkey); 120 - return if (ts > 0) ts else null; 121 - } 122 - 123 - pub fn getPosts( 124 - alloc: std.mem.Allocator, 125 - feed_type: config.FeedType, 126 - requester_did: ?[]const u8, 127 - cursor: ?[]const u8, 128 - limit: usize, 129 - ) ![]const u8 { 130 - // for following feed, fetch follows first (before mutex) 131 - var follows_list: ?[]const u8 = null; 132 - defer if (follows_list) |f| alloc.free(f); 133 - 134 - if (feed_type == .following) { 135 - if (requester_did) |did| { 136 - follows_list = getFollowsList(alloc, did) catch null; 137 - } else { 138 - return try alloc.dupe(u8, "{\"cursor\":\"eof\",\"feed\":[]}"); 139 - } 140 - } 141 - 142 - mutex.lock(); 143 - defer mutex.unlock(); 144 - 145 - const conn = db orelse return error.NotInitialized; 146 - 147 - var buf: std.ArrayList(u8) = .{}; 148 - defer buf.deinit(alloc); 149 - const w = buf.writer(alloc); 150 - 151 - const CURSOR_EOF = "eof"; 152 - 153 - if (cursor) |c| { 154 - if (std.mem.eql(u8, c, CURSOR_EOF)) { 155 - try w.writeAll("{\"cursor\":\"eof\",\"feed\":[]}"); 156 - return try alloc.dupe(u8, buf.items); 157 - } 158 - } 159 - 160 - // parse cursor (format: timestamp::cid) 161 - var cursor_ts: i64 = std.math.maxInt(i64); 162 - var cursor_cid: []const u8 = ""; 163 - if (cursor) |c| { 164 - if (std.mem.indexOf(u8, c, "::")) |sep| { 165 - cursor_ts = std.fmt.parseInt(i64, c[0..sep], 10) catch std.math.maxInt(i64); 166 - cursor_cid = c[sep + 2 ..]; 167 - } 168 - } 169 - 170 - // build and execute query 171 - var posts_json: std.ArrayList(u8) = .{}; 172 - defer posts_json.deinit(alloc); 173 - const pw = posts_json.writer(alloc); 174 - 175 - var cursor_buf: [128]u8 = undefined; 176 - var cursor_len: usize = 0; 177 - var count: usize = 0; 178 - 179 - if (feed_type == .following) { 180 - // for following feed: query by author_did using IN clause 181 - if (follows_list) |follows| { 182 - if (follows.len == 0) { 183 - try w.writeAll("{\"cursor\":\"eof\",\"feed\":[]}"); 184 - return try alloc.dupe(u8, buf.items); 185 - } 186 - 187 - // build query with IN clause 188 - var query_buf: std.ArrayList(u8) = .{}; 189 - defer query_buf.deinit(alloc); 190 - const qw = query_buf.writer(alloc); 191 - 192 - try qw.writeAll("SELECT uri, cid, indexed_at FROM posts WHERE author_did IN ("); 193 - 194 - var first = true; 195 - var it = std.mem.splitScalar(u8, follows, '\n'); 196 - while (it.next()) |did| { 197 - if (did.len == 0) continue; 198 - if (!first) try qw.writeAll(","); 199 - try qw.print("'{s}'", .{did}); 200 - first = false; 201 - } 202 - 203 - try qw.print(") AND (indexed_at < {d} OR (indexed_at = {d} AND cid < '{s}')) ORDER BY indexed_at DESC, cid DESC LIMIT {d}", .{ cursor_ts, cursor_ts, cursor_cid, limit }); 204 - 205 - // need null-terminated string for zqlite 206 - try query_buf.append(alloc, 0); 207 - const query_z: [:0]const u8 = query_buf.items[0 .. query_buf.items.len - 1 :0]; 208 - 209 - var rows = conn.rows(query_z, .{}) catch |err| { 210 - std.debug.print("following query failed: {}\n", .{err}); 211 - return error.QueryFailed; 212 - }; 213 - defer rows.deinit(); 214 - 215 - while (rows.next()) |row| { 216 - const uri = row.text(0); 217 - const cid = row.text(1); 218 - const ts = row.int(2); 219 - 220 - if (count > 0) try pw.writeAll(","); 221 - try pw.print("{{\"post\":\"{s}\"}}", .{uri}); 222 - cursor_len = (std.fmt.bufPrint(&cursor_buf, "{d}::{s}", .{ ts, cid }) catch &cursor_buf).len; 223 - count += 1; 224 - } 225 - 226 - std.debug.print("following feed: returning {d} posts\n", .{count}); 227 - } 228 - } else { 229 - // for all feed: simple query 230 - var rows = conn.rows( 231 - \\SELECT uri, cid, indexed_at FROM posts 232 - \\WHERE indexed_at < ? OR (indexed_at = ? AND cid < ?) 233 - \\ORDER BY indexed_at DESC, cid DESC 234 - \\LIMIT ? 235 - , .{ cursor_ts, cursor_ts, cursor_cid, limit }) catch |err| { 236 - std.debug.print("query failed: {}\n", .{err}); 237 - return error.QueryFailed; 238 - }; 239 - defer rows.deinit(); 240 - 241 - while (rows.next()) |row| { 242 - const uri = row.text(0); 243 - const cid = row.text(1); 244 - const ts = row.int(2); 245 - 246 - if (count > 0) try pw.writeAll(","); 247 - try pw.print("{{\"post\":\"{s}\"}}", .{uri}); 248 - cursor_len = (std.fmt.bufPrint(&cursor_buf, "{d}::{s}", .{ ts, cid }) catch &cursor_buf).len; 249 - count += 1; 250 - } 251 - } 252 - 253 - // build final response 254 - try w.writeAll("{\"cursor\":"); 255 - if (count > 0) { 256 - try w.print("\"{s}\"", .{cursor_buf[0..cursor_len]}); 257 - } else { 258 - try w.writeAll("\"eof\""); 259 - } 260 - 261 - try w.writeAll(",\"feed\":["); 262 - try w.writeAll(posts_json.items); 263 - try w.writeAll("]}"); 264 - 265 - return try alloc.dupe(u8, buf.items); 266 - } 267 - 268 - /// extract DID from post URI (at://did:plc:xxx/app.bsky.feed.post/yyy) 269 - fn extractDidFromUri(uri: []const u8) ?[]const u8 { 270 - const at_uri = zat.AtUri.parse(uri) orelse return null; 271 - return at_uri.authority(); 272 - } 273 - 274 - /// get follows as newline-separated string (caller owns returned memory) 275 - fn getFollowsList(alloc: std.mem.Allocator, requester_did: []const u8) ![]const u8 { 276 - std.debug.print("fetching follows for {s}...\n", .{requester_did}); 277 - const follows = atproto.getFollows(alloc, requester_did) catch |err| { 278 - std.debug.print("failed to fetch follows: {}\n", .{err}); 279 - return error.FetchFailed; 280 - }; 281 - 282 - // count follows for logging 283 - var count: usize = 0; 284 - var it = std.mem.splitScalar(u8, follows, '\n'); 285 - while (it.next()) |did| { 286 - if (did.len > 0) count += 1; 287 - } 288 - std.debug.print("fetched {d} follows for {s}\n", .{ count, requester_did }); 289 - 290 - // trigger backfill in background 291 - var dids_to_backfill: std.ArrayList([]const u8) = .empty; 292 - defer dids_to_backfill.deinit(alloc); 293 - var it2 = std.mem.splitScalar(u8, follows, '\n'); 294 - while (it2.next()) |did| { 295 - if (did.len > 0) { 296 - dids_to_backfill.append(alloc, did) catch continue; 297 - } 298 - } 299 - if (dids_to_backfill.items.len > 0) { 300 - triggerBackfill(alloc, dids_to_backfill.items); 301 - } 302 - 303 - return follows; 304 - } 305 - 306 - pub fn getPostCount() usize { 307 - mutex.lock(); 308 - defer mutex.unlock(); 309 - 310 - const conn = db orelse return 0; 311 - var rows = conn.rows("SELECT COUNT(*) FROM posts", .{}) catch return 0; 312 - defer rows.deinit(); 313 - 314 - if (rows.next()) |row| { 315 - return @intCast(row.int(0)); 316 - } 317 - return 0; 318 - } 319 - 320 - // ----------------------------------------------------------------------------- 321 - // lazy backfill 322 - // ----------------------------------------------------------------------------- 323 - 324 - const filter = @import("../feed/filter.zig"); 325 - 326 - /// check if an account has been backfilled 327 - fn isBackfilled(did: []const u8) bool { 328 - mutex.lock(); 329 - defer mutex.unlock(); 330 - 331 - const conn = db orelse return false; 332 - var rows = conn.rows( 333 - "SELECT 1 FROM backfilled_accounts WHERE did = ?", 334 - .{did}, 335 - ) catch return false; 336 - defer rows.deinit(); 337 - 338 - return rows.next() != null; 339 - } 340 - 341 - /// mark an account as backfilled 342 - fn markBackfilled(did: []const u8) void { 343 - mutex.lock(); 344 - defer mutex.unlock(); 345 - 346 - const conn = db orelse return; 347 - const now = std.time.milliTimestamp(); 348 - conn.exec( 349 - "INSERT OR REPLACE INTO backfilled_accounts (did, backfilled_at) VALUES (?, ?)", 350 - .{ did, now }, 351 - ) catch {}; 352 - } 353 - 354 - /// backfill posts from a single account 355 - fn backfillAccount(alloc: std.mem.Allocator, did: []const u8) void { 356 - if (isBackfilled(did)) return; 357 - 358 - std.debug.print("backfilling {s}...\n", .{did}); 359 - 360 - const posts = atproto.getAuthorFeed(alloc, did, 50) catch |err| { 361 - std.debug.print("backfill failed for {s}: {}\n", .{ did, err }); 362 - return; 363 - }; 364 - defer { 365 - for (posts) |p| { 366 - alloc.free(p.uri); 367 - alloc.free(p.cid); 368 - alloc.free(p.text); 369 - if (p.embed_uri) |eu| alloc.free(eu); 370 - } 371 - alloc.free(posts); 372 - } 373 - 374 - var added: usize = 0; 375 - for (posts) |post| { 376 - const text_match = filter.textContainsMusicLink(post.text); 377 - const embed_match = if (post.embed_uri) |eu| filter.textContainsMusicLink(eu) else false; 378 - if (text_match or embed_match) { 379 - addPost(post.uri, post.cid) catch continue; 380 - added += 1; 381 - } 382 - } 383 - 384 - markBackfilled(did); 385 - std.debug.print("backfilled {s}: {d} music posts from {d} total\n", .{ did, added, posts.len }); 386 - } 387 - 388 - /// trigger backfill for a list of DIDs (called after fetching follows) 389 - pub fn triggerBackfill(alloc: std.mem.Allocator, dids: []const []const u8) void { 390 - var backfilled: usize = 0; 391 - const max_per_request = 10; // limit how many we backfill per request 392 - 393 - for (dids) |did| { 394 - if (backfilled >= max_per_request) break; 395 - if (!isBackfilled(did)) { 396 - backfillAccount(alloc, did); 397 - backfilled += 1; 398 - } 399 - } 400 - }
···
+5 -5
src/stream/jetstream.zig src/jetstream.zig
··· 4 const posix = std.posix; 5 const Allocator = mem.Allocator; 6 const websocket = @import("websocket"); 7 - const filter = @import("../feed/filter.zig"); 8 - const stats = @import("../server/stats.zig"); 9 - const db = @import("db.zig"); 10 11 // turbostream: hydrated jetstream from graze.social 12 const TURBOSTREAM_HOST = "api.graze.social"; ··· 136 if (rkey_val != .string) return error.NotAPost; 137 const rkey = rkey_val.string; 138 139 - const post_time_ms = db.parseTidTimestamp(rkey); 140 141 // get time_us from message 142 const time_us: i64 = if (msg.get("time_us")) |t| (if (t == .integer) t.integer else 0) else 0; ··· 158 if (at_uri != .string) return error.NotAPost; 159 160 // add to feed 161 - db.addPost(at_uri.string, cid_val.string) catch |err| { 162 std.debug.print("failed to add post: {}\n", .{err}); 163 return; 164 };
··· 4 const posix = std.posix; 5 const Allocator = mem.Allocator; 6 const websocket = @import("websocket"); 7 + const filter = @import("feed/filter.zig"); 8 + const stats = @import("server/stats.zig"); 9 + const posts = @import("store/posts.zig"); 10 11 // turbostream: hydrated jetstream from graze.social 12 const TURBOSTREAM_HOST = "api.graze.social"; ··· 136 if (rkey_val != .string) return error.NotAPost; 137 const rkey = rkey_val.string; 138 139 + const post_time_ms = posts.parseTidTimestamp(rkey); 140 141 // get time_us from message 142 const time_us: i64 = if (msg.get("time_us")) |t| (if (t == .integer) t.integer else 0) else 0; ··· 158 if (at_uri != .string) return error.NotAPost; 159 160 // add to feed 161 + posts.add(at_uri.string, cid_val.string) catch |err| { 162 std.debug.print("failed to add post: {}\n", .{err}); 163 return; 164 };