A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Updated tests

Add unit tests for the firehose subscriber

Adds unit tests for the firehose subscriber to test the label events and
reconnection logic.

Skywatch a9ac82d2 916753e2

Changed files
+222 -1
.claude
tests
+2 -1
.claude/settings.local.json
··· 4 4 "Bash(git add:*)", 5 5 "Bash(git commit:*)", 6 6 "mcp__git-mcp-server__git_push", 7 - "mcp__git-mcp-server__git_log" 7 + "mcp__git-mcp-server__git_log", 8 + "mcp__git-mcp-server__git_add" 8 9 ], 9 10 "deny": [], 10 11 "ask": []
+220
tests/unit/subscriber.test.ts
··· 1 + import { describe, test, expect, beforeEach, afterEach, mock } from "bun:test"; 2 + import { FirehoseSubscriber } from "../../src/firehose/subscriber.js"; 3 + import { EventEmitter } from "events"; 4 + import WebSocket from "ws"; 5 + 6 + // Mock WebSocket class 7 + class MockWebSocket extends EventEmitter { 8 + constructor(url: string) { 9 + super(); 10 + } 11 + close() {} 12 + } 13 + 14 + // Mock the entire 'ws' module 15 + mock.module("ws", () => ({ 16 + default: MockWebSocket, 17 + })); 18 + 19 + describe("FirehoseSubscriber", () => { 20 + let subscriber: FirehoseSubscriber; 21 + let mockWsInstance: MockWebSocket; 22 + 23 + beforeEach(() => { 24 + subscriber = new FirehoseSubscriber(); 25 + // Mock the connect method to control the WebSocket instance 26 + (subscriber as any).connect = () => { 27 + const url = new URL("ws://localhost:1234"); 28 + if ((subscriber as any).cursor !== null) { 29 + url.searchParams.set("cursor", (subscriber as any).cursor.toString()); 30 + } 31 + const ws = new WebSocket(url.toString()); 32 + (subscriber as any).ws = ws; 33 + mockWsInstance = ws as any; 34 + 35 + ws.on("open", () => { 36 + subscriber.emit("connected"); 37 + }); 38 + ws.on("message", async (data: Buffer) => { 39 + try { 40 + const message = JSON.parse(data.toString()); 41 + if (message.seq) { 42 + await (subscriber as any).saveCursor(message.seq); 43 + } 44 + if (message.t === "#labels") { 45 + for (const label of message.labels) { 46 + subscriber.emit("label", label); 47 + } 48 + } 49 + } catch (error) { 50 + subscriber.emit("error", error); 51 + } 52 + }); 53 + ws.on("close", () => { 54 + (subscriber as any).ws = null; 55 + subscriber.emit("disconnected"); 56 + if ((subscriber as any).shouldReconnect) { 57 + (subscriber as any).scheduleReconnect(); 58 + } 59 + }); 60 + ws.on("error", (error) => { 61 + subscriber.emit("error", error); 62 + }); 63 + }; 64 + }); 65 + 66 + afterEach(() => { 67 + subscriber.stop(); 68 + }); 69 + 70 + test("should attempt to connect on start", async (done) => { 71 + subscriber.on("connected", () => { 72 + done(); 73 + }); 74 + await subscriber.start(); 75 + mockWsInstance.emit("open"); 76 + }); 77 + 78 + test("should emit label event on label for post", async (done) => { 79 + subscriber.on("label", (label) => { 80 + expect(label.uri).toBe("at://did:plc:user/app.bsky.feed.post/123"); 81 + done(); 82 + }); 83 + await subscriber.start(); 84 + mockWsInstance.emit( 85 + "message", 86 + Buffer.from( 87 + JSON.stringify({ 88 + op: 1, 89 + t: "#labels", 90 + labels: [ 91 + { 92 + src: "did:plc:labeler", 93 + uri: "at://did:plc:user/app.bsky.feed.post/123", 94 + val: "spam", 95 + cts: "2025-01-15T12:00:00Z", 96 + }, 97 + ], 98 + }) 99 + ) 100 + ); 101 + }); 102 + 103 + test("should emit label event on label for profile", async (done) => { 104 + subscriber.on("label", (label) => { 105 + expect(label.uri).toBe("did:plc:user"); 106 + done(); 107 + }); 108 + await subscriber.start(); 109 + mockWsInstance.emit( 110 + "message", 111 + Buffer.from( 112 + JSON.stringify({ 113 + op: 1, 114 + t: "#labels", 115 + labels: [ 116 + { 117 + src: "did:plc:labeler", 118 + uri: "did:plc:user", 119 + val: "spam", 120 + cts: "2025-01-15T12:00:00Z", 121 + }, 122 + ], 123 + }) 124 + ) 125 + ); 126 + }); 127 + 128 + test("should handle multiple labels in one message", async () => { 129 + let labelCount = 0; 130 + subscriber.on("label", () => { 131 + labelCount++; 132 + }); 133 + await subscriber.start(); 134 + mockWsInstance.emit( 135 + "message", 136 + Buffer.from( 137 + JSON.stringify({ 138 + op: 1, 139 + t: "#labels", 140 + labels: [ 141 + { 142 + src: "did:plc:labeler", 143 + uri: "at://did:plc:user/app.bsky.feed.post/123", 144 + val: "spam", 145 + cts: "2025-01-15T12:00:00Z", 146 + }, 147 + { 148 + src: "did:plc:labeler", 149 + uri: "did:plc:user", 150 + val: "spam", 151 + cts: "2025-01-15T12:00:00Z", 152 + }, 153 + ], 154 + }) 155 + ) 156 + ); 157 + expect(labelCount).toBe(2); 158 + }); 159 + 160 + test("should attempt to reconnect on close", (done) => { 161 + let connectAttempts = 0; 162 + (subscriber as any).connect = () => { 163 + connectAttempts++; 164 + if (connectAttempts > 1) { 165 + done(); 166 + } 167 + const url = new URL("ws://localhost:1234"); 168 + const ws = new WebSocket(url.toString()); 169 + (subscriber as any).ws = ws; 170 + mockWsInstance = ws as any; 171 + ws.on("close", () => { 172 + (subscriber as any).ws = null; 173 + subscriber.emit("disconnected"); 174 + if ((subscriber as any).shouldReconnect) { 175 + (subscriber as any).scheduleReconnect(); 176 + } 177 + }); 178 + }; 179 + 180 + subscriber.start(); 181 + mockWsInstance.emit("close"); 182 + }); 183 + 184 + test("should stop reconnecting after stop() is called", async (done) => { 185 + let connectAttempts = 0; 186 + (subscriber as any).connect = () => { 187 + connectAttempts++; 188 + const url = new URL("ws://localhost:1234"); 189 + const ws = new WebSocket(url.toString()); 190 + (subscriber as any).ws = ws; 191 + mockWsInstance = ws as any; 192 + ws.on("close", () => { 193 + (subscriber as any).ws = null; 194 + subscriber.emit("disconnected"); 195 + if ((subscriber as any).shouldReconnect) { 196 + (subscriber as any).scheduleReconnect(); 197 + } 198 + }); 199 + }; 200 + 201 + await subscriber.start(); 202 + subscriber.stop(); 203 + mockWsInstance.emit("close"); 204 + 205 + setTimeout(() => { 206 + expect(connectAttempts).toBe(1); 207 + done(); 208 + }, 2000); 209 + }); 210 + 211 + test("should increase backoff delay on multiple reconnects", () => { 212 + subscriber.start(); 213 + (subscriber as any).reconnectAttempts = 0; 214 + (subscriber as any).scheduleReconnect(); 215 + const initialBackoff = (subscriber as any).reconnectAttempts; 216 + (subscriber as any).scheduleReconnect(); 217 + const secondBackoff = (subscriber as any).reconnectAttempts; 218 + expect(secondBackoff).toBeGreaterThan(initialBackoff); 219 + }); 220 + });