add observability, redesign dashboard, update docs

- add /stats endpoint with lag, uptime, post count
- redesign dashboard: feeds prominent, status bar with live indicator
- add periodic jetstream logging (every 30s)
- track actual post creation time from TID for accurate lag
- deduplicate parseTidTimestamp (now in db.zig)
- update readme and docs for two-feed setup
- add scripts/feed_status.py CLI for checking feed health

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+17 -3
README.md
··· 1 1 # music-atmosphere-feed 2 2 3 - a bluesky feed that surfaces posts with music links. 3 + bluesky feeds for music links. 4 + 5 + ## feeds 6 + 7 + - **[music atmosphere](https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-atmosphere)** - all music posts 8 + - **[music (following)](https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-following)** - only from people you follow 9 + 10 + ## how it works 11 + 12 + connects to [jetstream](https://docs.bsky.app/blog/jetstream) and indexes posts containing links to soundcloud, bandcamp, spotify, or plyr.fm. excludes posts with nsfw labels. 13 + 14 + the following feed uses the viewer's JWT to identify them, fetches their follows via the AT Protocol API, and filters posts accordingly. 15 + 16 + edit `src/filter.zig` to change inclusion criteria. 17 + 18 + ## dashboard 4 19 5 - connects to [jetstream](https://docs.bsky.app/blog/jetstream), includes posts containing links (in facets) to soundcloud, bandcamp, or plyr.fm. excludes posts with nsfw labels. edit `src/filter.zig` to change the criteria. 20 + https://zig-bsky-feed.fly.dev/ 6 21 7 22 <details> 8 23 <summary>running locally</summary> ··· 23 38 ```bash 24 39 fly launch --no-deploy 25 40 fly volumes create feed_data --region ord --size 1 26 - fly secrets set JETSTREAM_HOST=jetstream2.us-east.bsky.network 27 41 fly secrets set PUBLISHER_DID=did:plc:your-did 28 42 fly secrets set FEED_HOSTNAME=your-app.fly.dev 29 43 fly deploy
+9 -1
docs/social-graph-filtering-research.md
··· 61 61 62 62 ## status 63 63 64 - parked for now. feed works fine showing all music posts. revisit if we want to add personalization. 64 + **implemented** - we added a "music (following)" feed that filters to N=1 (direct follows only). 65 + 66 + implementation: 67 + - extract viewer DID from JWT in authorization header 68 + - fetch viewer's follows via `app.bsky.graph.getFollows` (paginated) 69 + - store `author_did` on each post in sqlite 70 + - query with `WHERE author_did IN (...)` for the viewer's follows 71 + 72 + this avoids the complexity of N>=2 while providing useful personalization.
+122
scripts/feed_status.py
··· 1 + #!/usr/bin/env python3 2 + """check feed status and lag""" 3 + 4 + import argparse 5 + import json 6 + import urllib.request 7 + from datetime import datetime, timezone 8 + 9 + FEED_URL = "https://zig-bsky-feed.fly.dev" 10 + BSKY_API = "https://public.api.bsky.app" 11 + 12 + def fetch_json(url): 13 + req = urllib.request.Request(url, headers={"Accept": "application/json"}) 14 + with urllib.request.urlopen(req, timeout=10) as resp: 15 + return json.loads(resp.read()) 16 + 17 + def get_feed_posts(feed_name="music-atmosphere", limit=5): 18 + feed_uri = f"at://did:plc:vs3hnzq2daqbszxlysywzy54/app.bsky.feed.generator/{feed_name}" 19 + url = f"{FEED_URL}/xrpc/app.bsky.feed.getFeedSkeleton?feed={feed_uri}&limit={limit}" 20 + data = fetch_json(url) 21 + return [p["post"] for p in data.get("feed", [])] 22 + 23 + def get_post_details(uri): 24 + url = f"{BSKY_API}/xrpc/app.bsky.feed.getPosts?uris={uri}" 25 + data = fetch_json(url) 26 + if data.get("posts"): 27 + return data["posts"][0] 28 + return None 29 + 30 + def parse_time(ts): 31 + return datetime.fromisoformat(ts.replace("Z", "+00:00")) 32 + 33 + def format_ago(td): 34 + secs = int(td.total_seconds()) 35 + if secs < 60: 36 + return f"{secs}s" 37 + mins = secs // 60 38 + if mins < 60: 39 + return f"{mins}m {secs % 60}s" 40 + hours = mins // 60 41 + return f"{hours}h {mins % 60}m" 42 + 43 + def cmd_status(args): 44 + now = datetime.now(timezone.utc) 45 + print(f"current time: {now.strftime('%H:%M:%S')} UTC") 46 + print() 47 + 48 + # get stats 49 + try: 50 + stats = fetch_json(f"{FEED_URL}/stats") 51 + print(f"jetstream status: {stats.get('status', 'unknown')}") 52 + print(f"event lag: {stats.get('lag_seconds', '?')}s") 53 + print(f"messages: {stats.get('messages_total', '?'):,}") 54 + print(f"matches: {stats.get('matches_total', '?')}") 55 + print(f"posts in db: {stats.get('posts_in_db', '?'):,}") 56 + print() 57 + except Exception as e: 58 + print(f"stats error: {e}") 59 + print() 60 + 61 + # get latest post 62 + print("latest posts:") 63 + posts = get_feed_posts(args.feed, limit=3) 64 + for uri in posts: 65 + post = get_post_details(uri) 66 + if post: 67 + created = parse_time(post["record"]["createdAt"]) 68 + lag = now - created 69 + rkey = uri.split("/")[-1] 70 + handle = post.get("author", {}).get("handle", "?") 71 + print(f" {rkey} by @{handle}") 72 + print(f" created: {created.strftime('%H:%M:%S')} ({format_ago(lag)} ago)") 73 + else: 74 + print(f" {uri} - could not fetch") 75 + 76 + if posts: 77 + post = get_post_details(posts[0]) 78 + if post: 79 + created = parse_time(post["record"]["createdAt"]) 80 + lag = now - created 81 + print() 82 + print(f"actual feed lag: {format_ago(lag)}") 83 + if lag.total_seconds() > 60: 84 + print("STATUS: BEHIND") 85 + else: 86 + print("STATUS: LIVE") 87 + 88 + def cmd_recent(args): 89 + """show recent posts with their timestamps""" 90 + posts = get_feed_posts(args.feed, limit=args.limit) 91 + now = datetime.now(timezone.utc) 92 + 93 + for uri in posts: 94 + post = get_post_details(uri) 95 + if post: 96 + created = parse_time(post["record"]["createdAt"]) 97 + lag = now - created 98 + rkey = uri.split("/")[-1] 99 + handle = post.get("author", {}).get("handle", "?") 100 + text = post.get("record", {}).get("text", "")[:50].replace("\n", " ") 101 + print(f"{rkey} | {format_ago(lag):>8} ago | @{handle}: {text}...") 102 + 103 + def main(): 104 + parser = argparse.ArgumentParser(description="feed status checker") 105 + parser.add_argument("--feed", default="music-atmosphere", help="feed name") 106 + 107 + subs = parser.add_subparsers(dest="cmd") 108 + 109 + status_p = subs.add_parser("status", help="show feed status") 110 + 111 + recent_p = subs.add_parser("recent", help="show recent posts") 112 + recent_p.add_argument("-n", "--limit", type=int, default=10) 113 + 114 + args = parser.parse_args() 115 + 116 + if args.cmd == "recent": 117 + cmd_recent(args) 118 + else: 119 + cmd_status(args) 120 + 121 + if __name__ == "__main__": 122 + main()
+97 -79
src/dashboard.zig
··· 1 1 const std = @import("std"); 2 + const posix = std.posix; 2 3 const stats = @import("stats.zig"); 3 4 const db = @import("db.zig"); 5 + 6 + const DEFAULT_JETSTREAM = "jetstream2.us-east.bsky.network"; 4 7 5 8 fn formatNumber(buf: []u8, n: u64) []const u8 { 6 9 var temp: [32]u8 = undefined; ··· 26 29 pub fn render(alloc: std.mem.Allocator) ![]const u8 { 27 30 const s = stats.get(); 28 31 const post_count = db.getPostCount(); 32 + const lag_ms = s.getPostLagMs(); 33 + const lag_sec = @divTrunc(lag_ms, 1000); 34 + const status = s.getStatus(); 35 + const jetstream_host = posix.getenv("JETSTREAM_HOST") orelse DEFAULT_JETSTREAM; 29 36 30 37 var buf: std.ArrayList(u8) = .{}; 31 38 const w = buf.writer(alloc); 32 39 33 40 var posts_buf: [32]u8 = undefined; 34 - var msgs_buf: [32]u8 = undefined; 35 41 const posts_fmt = formatNumber(&posts_buf, post_count); 36 - const msgs_fmt = formatNumber(&msgs_buf, s.getMessages()); 42 + 43 + // status color 44 + const status_color = if (std.mem.eql(u8, status, "live")) "#22c55e" else "#ef4444"; 37 45 38 46 try w.writeAll( 39 47 \\<!DOCTYPE html> ··· 53 61 \\ font-size: 13px; 54 62 \\ line-height: 1.6; 55 63 \\ } 56 - \\ .container { max-width: 440px; margin: 0 auto; } 64 + \\ .container { max-width: 420px; margin: 0 auto; } 57 65 \\ a { color: #7c3aed; text-decoration: none; } 58 66 \\ a:hover { color: #a78bfa; } 59 - \\ h1 { 60 - \\ font-size: 14px; 61 - \\ font-weight: normal; 62 - \\ color: #ccc; 63 - \\ margin-bottom: 0.5rem; 67 + \\ h1 { font-size: 15px; font-weight: normal; color: #ccc; } 68 + \\ .tagline { color: #555; margin-bottom: 2rem; } 69 + \\ .feeds { margin-bottom: 2rem; } 70 + \\ .feed-item { 71 + \\ display: flex; 72 + \\ justify-content: space-between; 73 + \\ padding: 0.6rem 0; 74 + \\ border-bottom: 1px solid #1a1a1a; 64 75 \\ } 65 - \\ .desc { 66 - \\ color: #555; 76 + \\ .feed-item:last-child { border-bottom: none; } 77 + \\ .feed-name { color: #ccc; } 78 + \\ .feed-desc { color: #555; font-size: 11px; } 79 + \\ .status-bar { 80 + \\ display: flex; 81 + \\ align-items: center; 82 + \\ gap: 0.5rem; 83 + \\ padding: 0.75rem; 84 + \\ background: #111; 85 + \\ border-radius: 4px; 67 86 \\ margin-bottom: 2rem; 87 + \\ font-size: 12px; 68 88 \\ } 89 + \\ .status-dot { 90 + \\ width: 8px; 91 + \\ height: 8px; 92 + \\ border-radius: 50%; 93 + \\ } 94 + \\ .status-text { color: #888; } 95 + \\ .status-text span { color: #ccc; } 69 96 \\ .stats { 70 97 \\ display: flex; 71 98 \\ gap: 2rem; 72 99 \\ margin-bottom: 2rem; 73 100 \\ } 74 - \\ .stat-value { 75 - \\ font-size: 20px; 76 - \\ color: #fff; 77 - \\ } 78 - \\ .stat-label { 79 - \\ font-size: 11px; 80 - \\ color: #555; 81 - \\ } 82 - \\ .section { 83 - \\ margin-bottom: 1.5rem; 84 - \\ } 85 - \\ .section-title { 86 - \\ font-size: 11px; 87 - \\ color: #444; 88 - \\ margin-bottom: 0.5rem; 89 - \\ } 101 + \\ .stat-value { font-size: 18px; color: #fff; } 102 + \\ .stat-label { font-size: 11px; color: #444; } 90 103 \\ .criteria { 91 - \\ color: #666; 104 + \\ margin-bottom: 1.5rem; 105 + \\ font-size: 12px; 92 106 \\ } 93 - \\ .criteria li { 94 - \\ margin-left: 1rem; 95 - \\ padding: 0.15rem 0; 96 - \\ } 107 + \\ .criteria-title { color: #444; margin-bottom: 0.25rem; } 108 + \\ .criteria-list { color: #666; } 97 109 \\ footer { 98 - \\ margin-top: 2rem; 99 110 \\ padding-top: 1rem; 100 111 \\ border-top: 1px solid #1a1a1a; 101 112 \\ font-size: 11px; 102 113 \\ color: #444; 114 + \\ display: flex; 115 + \\ justify-content: space-between; 103 116 \\ } 117 + \\ footer a { color: #555; } 104 118 \\ </style> 105 119 \\</head> 106 120 \\<body> 107 121 \\ <div class="container"> 108 122 \\ <h1>music atmosphere</h1> 109 - \\ <p class="desc">a bluesky feed of posts containing music links</p> 123 + \\ <p class="tagline">bluesky feeds for music links</p> 110 124 \\ 111 - \\ <div class="stats"> 112 - \\ <div> 113 - \\ <div class="stat-value" id="uptime">--</div> 114 - \\ <div class="stat-label">uptime</div> 125 + \\ <div class="feeds"> 126 + \\ <div class="feed-item"> 127 + \\ <div> 128 + \\ <a href="https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-atmosphere" class="feed-name">music atmosphere</a> 129 + \\ <div class="feed-desc">all music posts</div> 130 + \\ </div> 115 131 \\ </div> 116 - \\ <div> 117 - \\ <div class="stat-value"> 132 + \\ <div class="feed-item"> 133 + \\ <div> 134 + \\ <a href="https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-following" class="feed-name">music (following)</a> 135 + \\ <div class="feed-desc">only from people you follow</div> 136 + \\ </div> 137 + \\ </div> 138 + \\ </div> 139 + \\ 140 + \\ <div class="status-bar"> 141 + \\ <div class="status-dot" style="background: 118 142 ); 119 - 120 - try w.writeAll(posts_fmt); 143 + try w.writeAll(status_color); 121 144 try w.writeAll( 122 - \\</div> 123 - \\ <div class="stat-label">posts</div> 124 - \\ </div> 125 - \\ <div> 126 - \\ <div class="stat-value"> 145 + \\"></div> 146 + \\ <div class="status-text"> 147 + ); 148 + try w.writeAll(status); 149 + try w.writeAll( 150 + \\ &middot; <span> 127 151 ); 128 - 129 - try w.writeAll(msgs_fmt); 152 + try w.print("{d}s", .{lag_sec}); 130 153 try w.writeAll( 131 - \\</div> 132 - \\ <div class="stat-label">firehose messages</div> 154 + \\</span> lag &middot; <span id="uptime">--</span> uptime &middot; <span> 155 + ); 156 + try w.writeAll(posts_fmt); 157 + try w.writeAll( 158 + \\</span> posts 133 159 \\ </div> 134 160 \\ </div> 135 161 \\ 136 - \\ <div class="section"> 137 - \\ <div class="section-title">included if post contains a link to</div> 138 - \\ <ul class="criteria"> 139 - \\ <li>soundcloud.com</li> 140 - \\ <li>bandcamp.com</li> 141 - \\ <li>plyr.fm</li> 142 - \\ <li>open.spotify.com</li> 143 - \\ </ul> 144 - \\ </div> 145 - \\ 146 - \\ <div class="section"> 147 - \\ <div class="section-title">excluded if post has <a href="https://docs.bsky.app/docs/advanced-guides/moderation">labels</a></div> 148 - \\ <ul class="criteria"> 149 - \\ <li>porn, sexual, nudity, nsfl, gore</li> 150 - \\ </ul> 162 + \\ <div class="criteria"> 163 + \\ <div class="criteria-title">included</div> 164 + \\ <div class="criteria-list">soundcloud, bandcamp, spotify, plyr.fm links</div> 151 165 \\ </div> 152 166 \\ 153 - \\ <div class="section"> 154 - \\ <div class="section-title">feeds</div> 155 - \\ <ul class="criteria"> 156 - \\ <li><a href="https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-atmosphere">music atmosphere</a> - all music posts</li> 157 - \\ <li><a href="https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-following">music (following)</a> - from people you follow</li> 158 - \\ </ul> 167 + \\ <div class="criteria"> 168 + \\ <div class="criteria-title">excluded</div> 169 + \\ <div class="criteria-list">posts with nsfw <a href="https://docs.bsky.app/docs/advanced-guides/moderation">labels</a></div> 159 170 \\ </div> 160 171 \\ 161 172 \\ <footer> 162 173 \\ <a href="https://tangled.sh/@zzstoatzz.io/music-atmosphere-feed">source</a> 174 + \\ <span>relay: <a href="https:// 175 + ); 176 + try w.writeAll(jetstream_host); 177 + try w.writeAll( 178 + \\" target="_blank"> 179 + ); 180 + try w.writeAll(jetstream_host); 181 + try w.writeAll( 182 + \\</a></span> 163 183 \\ </footer> 164 184 \\ </div> 165 185 \\ ··· 167 187 \\ const startedAt = 168 188 ); 169 189 170 - // virtual start time so JS calculates cumulative uptime correctly 171 190 const now = std.time.timestamp(); 172 191 const virtual_start = now - s.totalUptime(); 173 192 try w.print("{d}", .{virtual_start}); 174 193 try w.writeAll( 175 194 \\ * 1000; 176 - \\ function formatAge(ms) { 195 + \\ function fmt(ms) { 177 196 \\ const s = Math.floor(ms / 1000); 178 197 \\ const d = Math.floor(s / 86400); 179 198 \\ const h = Math.floor((s % 86400) / 3600); 180 199 \\ const m = Math.floor((s % 3600) / 60); 181 200 \\ if (d > 0) return d + 'd ' + h + 'h'; 182 201 \\ if (h > 0) return h + 'h ' + m + 'm'; 183 - \\ return m + 'm ' + (s % 60) + 's'; 202 + \\ return m + 'm'; 184 203 \\ } 185 - \\ function update() { 186 - \\ document.getElementById('uptime').textContent = formatAge(Date.now() - startedAt); 187 - \\ } 188 - \\ update(); 189 - \\ setInterval(update, 1000); 204 + \\ setInterval(() => { 205 + \\ document.getElementById('uptime').textContent = fmt(Date.now() - startedAt); 206 + \\ }, 1000); 207 + \\ document.getElementById('uptime').textContent = fmt(Date.now() - startedAt); 190 208 \\ </script> 191 209 \\</body> 192 210 \\</html>
+106 -72
src/db.zig
··· 94 94 // use provided timestamp, or try to parse from URI, or fall back to now 95 95 const ts = timestamp orelse parseTimestampFromUri(uri) orelse std.time.milliTimestamp(); 96 96 97 + // extract author DID from URI 98 + const author_did = extractDidFromUri(uri); 99 + 97 100 conn.exec( 98 - "INSERT OR IGNORE INTO posts (uri, cid, indexed_at) VALUES (?, ?, ?)", 99 - .{ uri, cid, ts }, 101 + "INSERT OR IGNORE INTO posts (uri, cid, indexed_at, author_did) VALUES (?, ?, ?, ?)", 102 + .{ uri, cid, ts, author_did }, 100 103 ) catch |err| { 101 104 std.debug.print("failed to insert post: {}\n", .{err}); 102 105 return err; 103 106 }; 104 107 } 105 108 106 - /// parse TID from AT URI to get post timestamp 107 - /// URI format: at://did:plc:xxx/app.bsky.feed.post/3mbj55b3pfs2l 108 - /// TID is base32-sortable, first 53 bits are microseconds since epoch 109 - fn parseTimestampFromUri(uri: []const u8) ?i64 { 110 - // find last / to get rkey 111 - const last_slash = std.mem.lastIndexOf(u8, uri, "/") orelse return null; 112 - const rkey = uri[last_slash + 1 ..]; 113 - if (rkey.len != 13) return null; // TIDs are 13 chars 109 + /// parse TID (base32-sortable) to get timestamp in milliseconds 110 + /// TID is 13 chars, first 53 bits are microseconds since epoch 111 + pub fn parseTidTimestamp(rkey: []const u8) i64 { 112 + if (rkey.len != 13) return 0; 114 113 115 - // decode base32-sortable (charset: 234567abcdefghijklmnopqrstuvwxyz) 116 114 const charset = "234567abcdefghijklmnopqrstuvwxyz"; 117 115 var value: u64 = 0; 118 116 for (rkey) |c| { 119 - const idx = std.mem.indexOf(u8, charset, &[_]u8{c}) orelse return null; 117 + const idx = std.mem.indexOf(u8, charset, &[_]u8{c}) orelse return 0; 120 118 value = value * 32 + idx; 121 119 } 122 120 ··· 125 123 return @intCast(micros / 1000); 126 124 } 127 125 126 + /// parse TID from AT URI to get post timestamp 127 + fn parseTimestampFromUri(uri: []const u8) ?i64 { 128 + const last_slash = std.mem.lastIndexOf(u8, uri, "/") orelse return null; 129 + const rkey = uri[last_slash + 1 ..]; 130 + const ts = parseTidTimestamp(rkey); 131 + return if (ts > 0) ts else null; 132 + } 133 + 128 134 pub fn getPosts( 129 135 alloc: std.mem.Allocator, 130 136 feed_type: config.FeedType, ··· 132 138 cursor: ?[]const u8, 133 139 limit: usize, 134 140 ) ![]const u8 { 135 - // for following feed, we need the requester's follows 136 - var follows_set: ?std.StringHashMap(void) = null; 137 - defer if (follows_set) |*set| set.deinit(); 141 + // for following feed, fetch follows first (before mutex) 142 + var follows_list: ?[]const u8 = null; 143 + defer if (follows_list) |f| alloc.free(f); 138 144 139 145 if (feed_type == .following) { 140 146 if (requester_did) |did| { 141 - follows_set = try getFollowsSet(alloc, did); 147 + follows_list = getFollowsList(alloc, did) catch null; 142 148 } else { 143 - // no auth = empty feed for following type 144 149 return try alloc.dupe(u8, "{\"cursor\":\"eof\",\"feed\":[]}"); 145 150 } 146 151 } ··· 156 161 157 162 const CURSOR_EOF = "eof"; 158 163 159 - // handle EOF cursor 160 164 if (cursor) |c| { 161 165 if (std.mem.eql(u8, c, CURSOR_EOF)) { 162 - try w.writeAll( 163 - \\{"cursor":"eof","feed":[]} 164 - ); 166 + try w.writeAll("{\"cursor\":\"eof\",\"feed\":[]}"); 165 167 return try alloc.dupe(u8, buf.items); 166 168 } 167 169 } ··· 176 178 } 177 179 } 178 180 179 - // query more posts than needed if filtering, since some may be excluded 180 - const query_limit = if (feed_type == .following) limit * 10 else limit; 181 - 182 - // query posts 183 - var rows = conn.rows( 184 - \\SELECT uri, cid, indexed_at FROM posts 185 - \\WHERE indexed_at < ? OR (indexed_at = ? AND cid < ?) 186 - \\ORDER BY indexed_at DESC, cid DESC 187 - \\LIMIT ? 188 - , .{ cursor_ts, cursor_ts, cursor_cid, query_limit }) catch |err| { 189 - std.debug.print("query failed: {}\n", .{err}); 190 - return error.QueryFailed; 191 - }; 192 - defer rows.deinit(); 193 - 194 - // collect posts and build cursor 181 + // build and execute query 195 182 var posts_json: std.ArrayList(u8) = .{}; 196 183 defer posts_json.deinit(alloc); 197 184 const pw = posts_json.writer(alloc); ··· 200 187 var cursor_len: usize = 0; 201 188 var count: usize = 0; 202 189 203 - while (rows.next()) |row| { 204 - const uri = row.text(0); 205 - const cid = row.text(1); 206 - const ts = row.int(2); 190 + if (feed_type == .following) { 191 + // for following feed: query by author_did using IN clause 192 + if (follows_list) |follows| { 193 + if (follows.len == 0) { 194 + try w.writeAll("{\"cursor\":\"eof\",\"feed\":[]}"); 195 + return try alloc.dupe(u8, buf.items); 196 + } 207 197 208 - // for following feed, filter by author 209 - if (feed_type == .following) { 210 - if (follows_set) |set| { 211 - const author_did = extractDidFromUri(uri) orelse continue; 212 - if (!set.contains(author_did)) continue; 198 + // build query with IN clause 199 + var query_buf: std.ArrayList(u8) = .{}; 200 + defer query_buf.deinit(alloc); 201 + const qw = query_buf.writer(alloc); 202 + 203 + try qw.writeAll("SELECT uri, cid, indexed_at FROM posts WHERE author_did IN ("); 204 + 205 + var first = true; 206 + var it = std.mem.splitScalar(u8, follows, '\n'); 207 + while (it.next()) |did| { 208 + if (did.len == 0) continue; 209 + if (!first) try qw.writeAll(","); 210 + try qw.print("'{s}'", .{did}); 211 + first = false; 213 212 } 213 + 214 + try qw.print(") AND (indexed_at < {d} OR (indexed_at = {d} AND cid < '{s}')) ORDER BY indexed_at DESC, cid DESC LIMIT {d}", .{ cursor_ts, cursor_ts, cursor_cid, limit }); 215 + 216 + // need null-terminated string for zqlite 217 + try query_buf.append(alloc, 0); 218 + const query_z: [:0]const u8 = query_buf.items[0 .. query_buf.items.len - 1 :0]; 219 + 220 + var rows = conn.rows(query_z, .{}) catch |err| { 221 + std.debug.print("following query failed: {}\n", .{err}); 222 + return error.QueryFailed; 223 + }; 224 + defer rows.deinit(); 225 + 226 + while (rows.next()) |row| { 227 + const uri = row.text(0); 228 + const cid = row.text(1); 229 + const ts = row.int(2); 230 + 231 + if (count > 0) try pw.writeAll(","); 232 + try pw.print("{{\"post\":\"{s}\"}}", .{uri}); 233 + cursor_len = (std.fmt.bufPrint(&cursor_buf, "{d}::{s}", .{ ts, cid }) catch &cursor_buf).len; 234 + count += 1; 235 + } 236 + 237 + std.debug.print("following feed: returning {d} posts\n", .{count}); 214 238 } 239 + } else { 240 + // for all feed: simple query 241 + var rows = conn.rows( 242 + \\SELECT uri, cid, indexed_at FROM posts 243 + \\WHERE indexed_at < ? OR (indexed_at = ? AND cid < ?) 244 + \\ORDER BY indexed_at DESC, cid DESC 245 + \\LIMIT ? 246 + , .{ cursor_ts, cursor_ts, cursor_cid, limit }) catch |err| { 247 + std.debug.print("query failed: {}\n", .{err}); 248 + return error.QueryFailed; 249 + }; 250 + defer rows.deinit(); 215 251 216 - if (count > 0) try pw.writeAll(","); 217 - try pw.print("{{\"post\":\"{s}\"}}", .{uri}); 252 + while (rows.next()) |row| { 253 + const uri = row.text(0); 254 + const cid = row.text(1); 255 + const ts = row.int(2); 218 256 219 - // build cursor from last row (overwritten each iteration) 220 - cursor_len = (std.fmt.bufPrint(&cursor_buf, "{d}::{s}", .{ ts, cid }) catch &cursor_buf).len; 221 - count += 1; 222 - 223 - // stop when we have enough 224 - if (count >= limit) break; 257 + if (count > 0) try pw.writeAll(","); 258 + try pw.print("{{\"post\":\"{s}\"}}", .{uri}); 259 + cursor_len = (std.fmt.bufPrint(&cursor_buf, "{d}::{s}", .{ ts, cid }) catch &cursor_buf).len; 260 + count += 1; 261 + } 225 262 } 226 263 227 - // now build final response 264 + // build final response 228 265 try w.writeAll("{\"cursor\":"); 229 266 if (count > 0) { 230 267 try w.print("\"{s}\"", .{cursor_buf[0..cursor_len]}); ··· 247 284 return rest[0..end]; 248 285 } 249 286 250 - /// get follows set for a DID 251 - fn getFollowsSet(alloc: std.mem.Allocator, requester_did: []const u8) !std.StringHashMap(void) { 252 - var set = std.StringHashMap(void).init(alloc); 253 - errdefer set.deinit(); 254 - 287 + /// get follows as newline-separated string (caller owns returned memory) 288 + fn getFollowsList(alloc: std.mem.Allocator, requester_did: []const u8) ![]const u8 { 255 289 std.debug.print("fetching follows for {s}...\n", .{requester_did}); 256 290 const follows = atproto.getFollows(alloc, requester_did) catch |err| { 257 291 std.debug.print("failed to fetch follows: {}\n", .{err}); 258 - return set; 292 + return error.FetchFailed; 259 293 }; 260 - defer alloc.free(follows); 261 294 295 + // count follows for logging 296 + var count: usize = 0; 262 297 var it = std.mem.splitScalar(u8, follows, '\n'); 263 298 while (it.next()) |did| { 264 - if (did.len == 0) continue; 265 - const owned = try alloc.dupe(u8, did); 266 - try set.put(owned, {}); 299 + if (did.len > 0) count += 1; 267 300 } 301 + std.debug.print("fetched {d} follows for {s}\n", .{ count, requester_did }); 268 302 269 - std.debug.print("fetched {d} follows for {s}\n", .{ set.count(), requester_did }); 270 - 271 - // trigger backfill for follows we haven't seen before 303 + // trigger backfill in background 272 304 var dids_to_backfill: std.ArrayList([]const u8) = .empty; 273 305 defer dids_to_backfill.deinit(alloc); 274 - var key_it = set.keyIterator(); 275 - while (key_it.next()) |key| { 276 - dids_to_backfill.append(alloc, key.*) catch continue; 306 + var it2 = std.mem.splitScalar(u8, follows, '\n'); 307 + while (it2.next()) |did| { 308 + if (did.len > 0) { 309 + dids_to_backfill.append(alloc, did) catch continue; 310 + } 277 311 } 278 312 if (dids_to_backfill.items.len > 0) { 279 313 triggerBackfill(alloc, dids_to_backfill.items); 280 314 } 281 315 282 - return set; 316 + return follows; 283 317 } 284 318 285 319 pub fn getPostCount() usize {
+46
src/http.zig
··· 6 6 const config = @import("config.zig"); 7 7 const dashboard = @import("dashboard.zig"); 8 8 const atproto = @import("atproto.zig"); 9 + const stats = @import("stats.zig"); 10 + const db = @import("db.zig"); 9 11 10 12 const HTTP_BUF_SIZE = 8192; 11 13 ··· 56 58 try sendJson(request, .ok, 57 59 \\{"status":"ok"} 58 60 ); 61 + } else if (mem.eql(u8, target, "/stats")) { 62 + try handleStats(request); 59 63 } else { 60 64 try sendJson(request, .not_found, 61 65 \\{"error":"not found"} ··· 210 214 .{ .name = "access-control-allow-headers", .value = "content-type" }, 211 215 }, 212 216 }); 217 + } 218 + 219 + fn handleStats(request: *http.Server.Request) !void { 220 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 221 + defer arena.deinit(); 222 + const alloc = arena.allocator(); 223 + 224 + const s = stats.get(); 225 + const now_ms = std.time.milliTimestamp(); 226 + 227 + const lag_ms = s.getPostLagMs(); 228 + const connected_at = s.getConnectedAt(); 229 + const last_event_at = s.getLastEventReceivedAt(); 230 + const last_match_at = s.getLastMatchTime(); 231 + 232 + const connected_ago = if (connected_at > 0) @divTrunc(now_ms - connected_at, 1000) else 0; 233 + const last_event_ago = if (last_event_at > 0) @divTrunc(now_ms - last_event_at, 1000) else 0; 234 + const last_match_ago = if (last_match_at > 0) @divTrunc(now_ms - last_match_at, 1000) else 0; 235 + 236 + const post_count = db.getPostCount(); 237 + const status = s.getStatus(); 238 + 239 + var buf: std.ArrayList(u8) = .{}; 240 + defer buf.deinit(alloc); 241 + const w = buf.writer(alloc); 242 + 243 + try w.print( 244 + \\{{"status":"{s}","lag_ms":{},"lag_seconds":{},"connected_seconds_ago":{},"last_event_seconds_ago":{},"last_match_seconds_ago":{},"messages_total":{},"matches_total":{},"posts_in_db":{},"uptime_seconds":{}}} 245 + , .{ 246 + status, 247 + lag_ms, 248 + @divTrunc(lag_ms, 1000), 249 + connected_ago, 250 + last_event_ago, 251 + last_match_ago, 252 + s.getMessages(), 253 + s.getMatches(), 254 + post_count, 255 + s.totalUptime(), 256 + }); 257 + 258 + try sendJson(request, .ok, buf.items); 213 259 } 214 260 215 261 fn handleDashboard(request: *http.Server.Request) !void {
+32 -10
src/jetstream.zig
··· 7 7 const feed = @import("feed.zig"); 8 8 const filter = @import("filter.zig"); 9 9 const stats = @import("stats.zig"); 10 + const db = @import("db.zig"); 10 11 11 12 const JETSTREAM_HOST = "jetstream2.us-east.bsky.network"; 12 13 ··· 57 58 }; 58 59 59 60 std.debug.print("jetstream connected!\n", .{}); 61 + stats.get().recordConnected(); 60 62 61 63 var handler = Handler{ .allocator = allocator }; 62 64 client.readLoop(&handler) catch |err| { ··· 68 70 69 71 const Handler = struct { 70 72 allocator: Allocator, 71 - log_interval: usize = 0, 73 + msg_count: usize = 0, 74 + last_summary_time: i64 = 0, 72 75 73 76 pub fn serverMessage(self: *Handler, data: []const u8) !void { 74 - const s = stats.get(); 75 - s.recordMessage(); 76 - 77 - self.log_interval += 1; 78 - if (self.log_interval % 10000 == 0) { 79 - std.debug.print("jetstream: processed {} messages\n", .{s.getMessages()}); 80 - } 77 + self.msg_count += 1; 81 78 82 79 self.processMessage(data) catch |err| { 83 80 if (err != error.NotAPost and err != error.NoMatch) { 84 81 std.debug.print("message processing error: {}\n", .{err}); 85 82 } 86 83 }; 84 + 85 + // periodic summary every 30 seconds 86 + const now = std.time.timestamp(); 87 + if (now - self.last_summary_time >= 30) { 88 + self.last_summary_time = now; 89 + const s = stats.get(); 90 + const lag_ms = s.getPostLagMs(); 91 + const lag_sec = @divTrunc(lag_ms, 1000); 92 + const status = s.getStatus(); 93 + std.debug.print("jetstream: msgs={} matches={} lag={}s status={s}\n", .{ 94 + s.getMessages(), 95 + s.getMatches(), 96 + lag_sec, 97 + status, 98 + }); 99 + } 87 100 } 88 101 89 102 pub fn close(_: *Handler) void { ··· 116 129 const operation = commit.object.get("operation") orelse return error.NotAPost; 117 130 if (operation != .string or !mem.eql(u8, operation.string, "create")) return error.NotAPost; 118 131 119 - // get rkey 132 + // get rkey and parse TID to get post creation time 120 133 const rkey_val = commit.object.get("rkey") orelse return error.NotAPost; 121 134 if (rkey_val != .string) return error.NotAPost; 135 + const rkey = rkey_val.string; 136 + 137 + // parse TID from rkey to get actual post creation timestamp 138 + const post_time_ms = db.parseTidTimestamp(rkey); 139 + 140 + // get time_us from event for event tracking 141 + const time_us: i64 = if (root.get("time_us")) |t| (if (t == .integer) t.integer else 0) else 0; 142 + stats.get().recordEvent(time_us, post_time_ms); 122 143 123 144 // get cid 124 145 const cid_val = commit.object.get("cid") orelse return error.NotAPost; ··· 133 154 134 155 // construct uri 135 156 var uri_buf: [256]u8 = undefined; 136 - const uri = std.fmt.bufPrint(&uri_buf, "at://{s}/app.bsky.feed.post/{s}", .{ did_val.string, rkey_val.string }) catch return error.UriTooLong; 157 + const uri = std.fmt.bufPrint(&uri_buf, "at://{s}/app.bsky.feed.post/{s}", .{ did_val.string, rkey }) catch return error.UriTooLong; 137 158 138 159 // add to feed 139 160 feed.addPost(uri, cid_val.string) catch |err| { ··· 141 162 return; 142 163 }; 143 164 165 + stats.get().recordMatch(); 144 166 std.debug.print("added: {s}\n", .{uri}); 145 167 } 146 168 };
+72
src/stats.zig
··· 10 10 started_at: i64, 11 11 prior_uptime: u64 = 0, 12 12 messages: Atomic(u64), 13 + matches: Atomic(u64), 14 + last_event_time_us: Atomic(i64), 15 + last_event_received_at: Atomic(i64), 16 + last_match_time: Atomic(i64), 17 + connected_at: Atomic(i64), 18 + last_post_time_ms: Atomic(i64), // actual post creation time from TID 19 + 20 + // for lag trend tracking 21 + prev_lag_ms: Atomic(i64), 13 22 14 23 pub fn init() Stats { 15 24 var self = Stats{ 16 25 .started_at = std.time.timestamp(), 17 26 .messages = Atomic(u64).init(0), 27 + .matches = Atomic(u64).init(0), 28 + .last_event_time_us = Atomic(i64).init(0), 29 + .last_event_received_at = Atomic(i64).init(0), 30 + .last_match_time = Atomic(i64).init(0), 31 + .connected_at = Atomic(i64).init(0), 32 + .last_post_time_ms = Atomic(i64).init(0), 33 + .prev_lag_ms = Atomic(i64).init(0), 18 34 }; 19 35 self.load(); 20 36 return self; ··· 72 88 73 89 pub fn getMessages(self: *const Stats) u64 { 74 90 return self.messages.load(.monotonic); 91 + } 92 + 93 + pub fn recordEvent(self: *Stats, time_us: i64, post_time_ms: i64) void { 94 + _ = self.messages.fetchAdd(1, .monotonic); 95 + self.last_event_time_us.store(time_us, .monotonic); 96 + self.last_event_received_at.store(std.time.milliTimestamp(), .monotonic); 97 + // track post creation time (from TID) for actual lag 98 + if (post_time_ms > 0) { 99 + self.last_post_time_ms.store(post_time_ms, .monotonic); 100 + } 101 + } 102 + 103 + pub fn getPostLagMs(self: *Stats) i64 { 104 + const post_time = self.last_post_time_ms.load(.monotonic); 105 + if (post_time == 0) return 0; 106 + const now_ms = std.time.milliTimestamp(); 107 + return now_ms - post_time; 108 + } 109 + 110 + pub fn recordMatch(self: *Stats) void { 111 + _ = self.matches.fetchAdd(1, .monotonic); 112 + self.last_match_time.store(std.time.milliTimestamp(), .monotonic); 113 + } 114 + 115 + pub fn recordConnected(self: *Stats) void { 116 + self.connected_at.store(std.time.milliTimestamp(), .monotonic); 117 + } 118 + 119 + pub fn getLagMs(self: *Stats) i64 { 120 + const event_time_us = self.last_event_time_us.load(.monotonic); 121 + if (event_time_us == 0) return 0; 122 + const event_time_ms = @divTrunc(event_time_us, 1000); 123 + const now_ms = std.time.milliTimestamp(); 124 + return now_ms - event_time_ms; 125 + } 126 + 127 + pub fn getMatches(self: *const Stats) u64 { 128 + return self.matches.load(.monotonic); 129 + } 130 + 131 + pub fn getLastEventReceivedAt(self: *const Stats) i64 { 132 + return self.last_event_received_at.load(.monotonic); 133 + } 134 + 135 + pub fn getLastMatchTime(self: *const Stats) i64 { 136 + return self.last_match_time.load(.monotonic); 137 + } 138 + 139 + pub fn getConnectedAt(self: *const Stats) i64 { 140 + return self.connected_at.load(.monotonic); 141 + } 142 + 143 + pub fn getStatus(self: *Stats) []const u8 { 144 + const lag = self.getPostLagMs(); 145 + if (lag > 60000) return "catching_up"; 146 + return "live"; 75 147 } 76 148 }; 77 149