A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Compare changes

Choose any two refs to compare.

+2 -1
.claude/settings.local.json
··· 4 4 "Bash(git add:*)", 5 5 "Bash(git commit:*)", 6 6 "mcp__git-mcp-server__git_push", 7 - "mcp__git-mcp-server__git_log" 7 + "mcp__git-mcp-server__git_log", 8 + "mcp__git-mcp-server__git_add" 8 9 ], 9 10 "deny": [], 10 11 "ask": []
+164
docs/profile-blob-hydration.md
··· 1 + # Profile Blob Hydration - Implementation Notes 2 + 3 + ## Overview 4 + 5 + This document captures key learnings from implementing avatar and banner blob hydration for Bluesky profiles. 6 + 7 + ## Key Discoveries 8 + 9 + ### 1. CID Deserialization in @atproto/api 10 + 11 + The `@atproto/api` library deserializes blob references from their JSON `$link` representation into CID class objects. 12 + 13 + **Raw JSON from API:** 14 + ```json 15 + { 16 + "avatar": { 17 + "$type": "blob", 18 + "ref": { 19 + "$link": "bafkreigg3s6plegjncmxubeufbohj3qasbm4r23q2x7zlivdhccfqfypve" 20 + }, 21 + "mimeType": "image/jpeg", 22 + "size": 101770 23 + } 24 + } 25 + ``` 26 + 27 + **What you get in TypeScript:** 28 + ```typescript 29 + record.avatar.ref // CID object with { code, version, hash, ... } 30 + ``` 31 + 32 + **Solution:** 33 + ```typescript 34 + const cid = record.avatar.ref.toString(); // "bafkrei..." 35 + ``` 36 + 37 + ### 2. PDS Endpoint Resolution 38 + 39 + Users can be on different Personal Data Servers (PDS), not just `bsky.social`. Blobs must be fetched from the user's actual PDS. 40 + 41 + **Process:** 42 + 1. Query PLC directory for DID document: `https://plc.wtf/${did}` 43 + 2. Find service with `id: "#atproto_pds"` and `type: "AtprotoPersonalDataServer"` 44 + 3. Extract `serviceEndpoint` URL 45 + 4. Use that endpoint for `com.atproto.sync.getBlob` 46 + 47 + **Example:** 48 + ```typescript 49 + const didDoc = await fetch(`https://plc.wtf/${did}`).then(r => r.json()); 50 + const pdsService = didDoc.service?.find(s => 51 + s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 52 + ); 53 + const pdsEndpoint = pdsService.serviceEndpoint; // e.g., "https://waxcap.us-west.host.bsky.network" 54 + ``` 55 + 56 + ### 3. Correct Blob Fetching 57 + 58 + **Don't use CDN paths** - they don't work reliably for all blobs and require authentication context. 59 + 60 + **Use the AT Protocol API:** 61 + ```typescript 62 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 63 + const response = await fetch(blobUrl); 64 + const blobData = Buffer.from(await response.arrayBuffer()); 65 + ``` 66 + 67 + ### 4. Database Schema Design 68 + 69 + **Separate tables for different blob types:** 70 + 71 + - `blobs` table: Post images with FK to `posts(uri)` 72 + - `profile_blobs` table: Avatars/banners with FK to `profiles(did)` 73 + 74 + This allows proper relational queries and analysis. 75 + 76 + **Profile blobs schema:** 77 + ```sql 78 + CREATE TABLE profile_blobs ( 79 + did TEXT NOT NULL, 80 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 81 + blob_cid TEXT NOT NULL, 82 + sha256 TEXT NOT NULL, 83 + phash TEXT, 84 + storage_path TEXT, 85 + mimetype TEXT, 86 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 87 + PRIMARY KEY (did, blob_type, captured_at), 88 + FOREIGN KEY (did) REFERENCES profiles(did) 89 + ); 90 + ``` 91 + 92 + ### 5. Change Tracking 93 + 94 + Including `captured_at` in the primary key allows tracking when users change their avatars/banners. 95 + 96 + **Query latest state:** 97 + ```sql 98 + SELECT * FROM profile_blobs 99 + WHERE did = ? AND blob_type = ? 100 + ORDER BY captured_at DESC 101 + LIMIT 1 102 + ``` 103 + 104 + **Only insert if changed:** 105 + ```typescript 106 + const latest = await findLatestByDidAndType(did, type); 107 + if (latest && latest.blob_cid === cid) { 108 + return; // No change, skip 109 + } 110 + // Insert new row with current timestamp 111 + ``` 112 + 113 + ### 6. Sentinel Values for Missing Data 114 + 115 + Use empty string (`""`) to distinguish "we checked, user has no avatar" from NULL "we haven't checked yet". 116 + 117 + ```typescript 118 + if (record.avatar?.ref) { 119 + avatarCid = record.avatar.ref.toString(); 120 + } else { 121 + avatarCid = ""; // Explicitly checked, not present 122 + } 123 + ``` 124 + 125 + This prevents infinite re-hydration loops for profiles without avatars. 126 + 127 + ### 7. Profile Re-hydration Logic 128 + 129 + ```typescript 130 + const existingProfile = await findByDid(did); 131 + const needsRehydration = existingProfile && 132 + (existingProfile.avatar_cid === null || existingProfile.banner_cid === null); 133 + 134 + if (existingProfile && !needsRehydration) { 135 + return; // Skip 136 + } 137 + ``` 138 + 139 + ## Configuration 140 + 141 + - `PLC_ENDPOINT`: DID resolution endpoint (default: `https://plc.wtf`) 142 + - Can be changed to `https://plc.directory` or custom instance 143 + - plc.wtf is faster but unofficial 144 + 145 + ## Common Errors 146 + 147 + ### "RepoNotFound" 148 + - **Cause:** Querying wrong PDS endpoint 149 + - **Solution:** Resolve correct PDS from DID document 150 + 151 + ### Foreign Key Constraint Violation 152 + - **Cause:** Trying to insert profile blobs into `blobs` table 153 + - **Solution:** Use separate `profile_blobs` table 154 + 155 + ### Missing CIDs Despite API Returning Them 156 + - **Cause:** Trying to access `ref.$link` when ref is a CID object 157 + - **Solution:** Call `.toString()` on the CID object 158 + 159 + ## Related Files 160 + 161 + - `src/hydration/profiles.service.ts` - Main hydration logic 162 + - `src/database/profile-blobs.repository.ts` - Profile blob persistence 163 + - `src/database/schema.ts` - Table definitions 164 + - `src/config/index.ts` - PLC endpoint configuration
+10 -2
src/blobs/processor.ts
··· 116 116 postUri: string, 117 117 ref: BlobReference 118 118 ): Promise<void> { 119 - const existing = await this.blobsRepo.findBySha256(ref.cid); 119 + const existing = await this.blobsRepo.findByCid(ref.cid); 120 120 if (existing) { 121 + await this.blobsRepo.insert({ 122 + post_uri: postUri, 123 + blob_cid: ref.cid, 124 + sha256: existing.sha256, 125 + phash: existing.phash, 126 + storage_path: existing.storage_path, 127 + mimetype: existing.mimetype, 128 + }); 121 129 logger.debug( 122 130 { postUri, cid: ref.cid }, 123 - "Blob already processed, skipping" 131 + "Blob already processed, reusing hashes" 124 132 ); 125 133 return; 126 134 }
+17
src/database/blobs.repository.ts
··· 103 103 ); 104 104 }); 105 105 } 106 + 107 + async findByCid(cid: string): Promise<Blob | null> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`, 111 + cid, 112 + (err, rows: Blob[]) => { 113 + if (err) { 114 + logger.error({ err, cid }, "Failed to find blob by CID"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows?.[0] || null); 119 + } 120 + ); 121 + }); 122 + } 106 123 }
+123
src/database/profile-blobs.repository.ts
··· 1 + import { Database } from "duckdb"; 2 + import { logger } from "../logger/index.js"; 3 + 4 + export interface ProfileBlob { 5 + did: string; 6 + blob_type: "avatar" | "banner"; 7 + blob_cid: string; 8 + sha256: string; 9 + phash?: string; 10 + storage_path?: string; 11 + mimetype?: string; 12 + captured_at?: Date; 13 + } 14 + 15 + export class ProfileBlobsRepository { 16 + constructor(private db: Database) {} 17 + 18 + async insert(blob: ProfileBlob): Promise<void> { 19 + return new Promise((resolve, reject) => { 20 + this.db.prepare( 21 + ` 22 + INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at) 23 + VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP)) 24 + `, 25 + (err, stmt) => { 26 + if (err) { 27 + logger.error({ err }, "Failed to prepare profile blob insert statement"); 28 + reject(err); 29 + return; 30 + } 31 + 32 + stmt.run( 33 + blob.did, 34 + blob.blob_type, 35 + blob.blob_cid, 36 + blob.sha256, 37 + blob.phash || null, 38 + blob.storage_path || null, 39 + blob.mimetype || null, 40 + blob.captured_at || null, 41 + (err) => { 42 + if (err) { 43 + logger.error({ err, blob }, "Failed to insert profile blob"); 44 + reject(err); 45 + return; 46 + } 47 + resolve(); 48 + } 49 + ); 50 + } 51 + ); 52 + }); 53 + } 54 + 55 + async findByDid(did: string): Promise<ProfileBlob[]> { 56 + return new Promise((resolve, reject) => { 57 + this.db.all( 58 + `SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`, 59 + did, 60 + (err, rows: ProfileBlob[]) => { 61 + if (err) { 62 + logger.error({ err, did }, "Failed to find profile blobs by DID"); 63 + reject(err); 64 + return; 65 + } 66 + resolve(rows || []); 67 + } 68 + ); 69 + }); 70 + } 71 + 72 + async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> { 73 + return new Promise((resolve, reject) => { 74 + this.db.all( 75 + `SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`, 76 + did, 77 + blobType, 78 + (err, rows: ProfileBlob[]) => { 79 + if (err) { 80 + logger.error({ err, did, blobType }, "Failed to find latest profile blob"); 81 + reject(err); 82 + return; 83 + } 84 + resolve(rows?.[0] || null); 85 + } 86 + ); 87 + }); 88 + } 89 + 90 + async findBySha256(sha256: string): Promise<ProfileBlob | null> { 91 + return new Promise((resolve, reject) => { 92 + this.db.all( 93 + `SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`, 94 + sha256, 95 + (err, rows: ProfileBlob[]) => { 96 + if (err) { 97 + logger.error({ err, sha256 }, "Failed to find profile blob by SHA256"); 98 + reject(err); 99 + return; 100 + } 101 + resolve(rows?.[0] || null); 102 + } 103 + ); 104 + }); 105 + } 106 + 107 + async findByPhash(phash: string): Promise<ProfileBlob[]> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM profile_blobs WHERE phash = $1`, 111 + phash, 112 + (err, rows: ProfileBlob[]) => { 113 + if (err) { 114 + logger.error({ err, phash }, "Failed to find profile blobs by pHash"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows || []); 119 + } 120 + ); 121 + }); 122 + } 123 + }
+20 -46
src/database/schema.ts
··· 39 39 banner_cid TEXT 40 40 ); 41 41 42 - -- Blobs table: stores information about image blobs found in posts and profiles 42 + -- Blobs table: stores information about image blobs found in posts 43 43 CREATE TABLE IF NOT EXISTS blobs ( 44 44 post_uri TEXT NOT NULL, 45 45 blob_cid TEXT NOT NULL, ··· 47 47 phash TEXT, 48 48 storage_path TEXT, 49 49 mimetype TEXT, 50 - PRIMARY KEY (post_uri, blob_cid) 50 + PRIMARY KEY (post_uri, blob_cid), 51 + FOREIGN KEY (post_uri) REFERENCES posts(uri) 52 + ); 53 + 54 + -- Profile blobs table: stores avatar and banner blobs for profiles 55 + CREATE TABLE IF NOT EXISTS profile_blobs ( 56 + did TEXT NOT NULL, 57 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 58 + blob_cid TEXT NOT NULL, 59 + sha256 TEXT NOT NULL, 60 + phash TEXT, 61 + storage_path TEXT, 62 + mimetype TEXT, 63 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 64 + PRIMARY KEY (did, blob_type, captured_at), 65 + FOREIGN KEY (did) REFERENCES profiles(did) 51 66 ); 52 67 53 68 -- Indexes for performance ··· 55 70 CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val); 56 71 CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts); 57 72 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 73 + CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid); 58 74 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 59 75 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 76 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256); 77 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash); 60 78 `; 61 79 62 80 async function migrateProfilesTable(): Promise<void> { ··· 105 123 }); 106 124 } 107 125 108 - async function migrateBlobsTableConstraint(): Promise<void> { 109 - const db = getDatabase(); 110 - 111 - return new Promise((resolve, reject) => { 112 - db.all( 113 - `SELECT constraint_name FROM information_schema.table_constraints 114 - WHERE table_name = 'blobs' AND constraint_type = 'FOREIGN KEY'`, 115 - (err, rows: any[]) => { 116 - if (err) { 117 - logger.error({ err }, "Failed to check blobs table constraints"); 118 - reject(err); 119 - return; 120 - } 121 - 122 - if (rows && rows.length > 0) { 123 - logger.info("Migrating blobs table to remove foreign key constraint"); 124 - 125 - const migration = ` 126 - CREATE TABLE blobs_new AS SELECT * FROM blobs; 127 - DROP TABLE blobs; 128 - ALTER TABLE blobs_new RENAME TO blobs; 129 - CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 130 - CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 131 - `; 132 - 133 - db.exec(migration, (err) => { 134 - if (err) { 135 - logger.error({ err }, "Failed to migrate blobs table"); 136 - reject(err); 137 - return; 138 - } 139 - logger.info("Blobs table migration completed"); 140 - resolve(); 141 - }); 142 - } else { 143 - logger.debug("Blobs table already has no foreign key constraint"); 144 - resolve(); 145 - } 146 - } 147 - ); 148 - }); 149 - } 150 - 151 126 export async function initializeSchema(): Promise<void> { 152 127 const db = getDatabase(); 153 128 ··· 162 137 163 138 try { 164 139 await migrateProfilesTable(); 165 - await migrateBlobsTableConstraint(); 166 140 resolve(); 167 141 } catch (migrationErr) { 168 142 reject(migrationErr);
+5 -1
src/hydration/posts.service.ts
··· 3 3 import { PostsRepository } from "../database/posts.repository.js"; 4 4 import { BlobProcessor } from "../blobs/processor.js"; 5 5 import { pRateLimit } from "p-ratelimit"; 6 - import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 6 + import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js"; 7 7 import { logger } from "../logger/index.js"; 8 8 import { config } from "../config/index.js"; 9 9 ··· 110 110 } 111 111 } 112 112 } catch (error) { 113 + if (isRecordNotFoundError(error)) { 114 + logger.warn({ uri }, "Post record not found, skipping"); 115 + return; 116 + } 113 117 logger.error({ error, uri }, "Failed to hydrate post"); 114 118 throw error; 115 119 }
+40 -11
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 - import { BlobsRepository } from "../database/blobs.repository.js"; 4 + import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js"; 5 5 import { computeBlobHashes } from "../blobs/hasher.js"; 6 + import { LocalBlobStorage } from "../blobs/storage/local.js"; 7 + import { S3BlobStorage } from "../blobs/storage/s3.js"; 8 + import { BlobStorage } from "../blobs/processor.js"; 6 9 import { pRateLimit } from "p-ratelimit"; 7 - import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js"; 10 + import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js"; 8 11 import { logger } from "../logger/index.js"; 9 12 import { config } from "../config/index.js"; 10 13 11 14 export class ProfileHydrationService { 12 15 private agent: AtpAgent; 13 16 private profilesRepo: ProfilesRepository; 14 - private blobsRepo: BlobsRepository; 17 + private profileBlobsRepo: ProfileBlobsRepository; 18 + private storage: BlobStorage | null = null; 15 19 private limit: ReturnType<typeof pRateLimit>; 16 20 17 21 constructor(db: Database) { 18 22 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 19 23 this.profilesRepo = new ProfilesRepository(db); 20 - this.blobsRepo = new BlobsRepository(db); 24 + this.profileBlobsRepo = new ProfileBlobsRepository(db); 25 + 26 + if (config.blobs.hydrateBlobs) { 27 + if (config.blobs.storage.type === "s3") { 28 + this.storage = new S3BlobStorage( 29 + config.blobs.storage.s3Bucket!, 30 + config.blobs.storage.s3Region! 31 + ); 32 + } else { 33 + this.storage = new LocalBlobStorage( 34 + config.blobs.storage.localPath 35 + ); 36 + } 37 + } 38 + 21 39 this.limit = pRateLimit({ 22 40 interval: 300000, 23 41 rate: 3000, ··· 152 170 153 171 logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully"); 154 172 } catch (error) { 173 + if (isRecordNotFoundError(error)) { 174 + logger.warn({ did }, "Profile record not found, skipping"); 175 + return; 176 + } 155 177 logger.error({ error, did }, "Failed to hydrate profile"); 156 178 throw error; 157 179 } ··· 187 209 cid: string, 188 210 type: "avatar" | "banner" 189 211 ): Promise<void> { 190 - const postUri = `profile://${did}/${type}`; 191 - const existing = await this.blobsRepo.findByPostUri(postUri); 212 + const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type); 192 213 193 - if (existing.length > 0 && existing.some(b => b.blob_cid === cid)) { 194 - logger.debug({ did, cid, type }, "Blob already processed, skipping"); 214 + if (latestBlob && latestBlob.blob_cid === cid) { 215 + logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping"); 195 216 return; 196 217 } 197 218 ··· 210 231 } 211 232 212 233 const blobData = Buffer.from(await blobResponse.arrayBuffer()); 234 + 235 + let storagePath: string | undefined; 236 + if (this.storage && config.blobs.hydrateBlobs) { 237 + storagePath = await this.storage.store(cid, blobData, "image/jpeg"); 238 + } 239 + 213 240 const hashes = await computeBlobHashes(blobData, "image/jpeg"); 214 241 215 - await this.blobsRepo.insert({ 216 - post_uri: postUri, 242 + await this.profileBlobsRepo.insert({ 243 + did, 244 + blob_type: type, 217 245 blob_cid: cid, 218 246 sha256: hashes.sha256, 219 247 phash: hashes.phash, 248 + storage_path: storagePath, 220 249 mimetype: "image/jpeg", 221 250 }); 222 251 223 - logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint }, "Profile blob processed successfully"); 252 + logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully"); 224 253 } 225 254 }
+7
src/utils/retry.ts
··· 93 93 export function isServerError(error: any): boolean { 94 94 return error?.status >= 500 && error?.status < 600; 95 95 } 96 + 97 + export function isRecordNotFoundError(error: any): boolean { 98 + return ( 99 + error?.error === "RecordNotFound" || 100 + error?.message?.includes("RecordNotFound") 101 + ); 102 + }
+6
tests/integration/database.test.ts
··· 195 195 const found = await blobsRepo.findByPhash("deadbeef"); 196 196 expect(found.length).toBeGreaterThan(0); 197 197 }); 198 + 199 + test("should find blob by CID", async () => { 200 + const found = await blobsRepo.findByCid("bafytest123"); 201 + expect(found).not.toBeNull(); 202 + expect(found?.sha256).toBe("abc123def456"); 203 + }); 198 204 }); 199 205 });
+220
tests/unit/subscriber.test.ts
··· 1 + import { describe, test, expect, beforeEach, afterEach, mock } from "bun:test"; 2 + import { FirehoseSubscriber } from "../../src/firehose/subscriber.js"; 3 + import { EventEmitter } from "events"; 4 + import WebSocket from "ws"; 5 + 6 + // Mock WebSocket class 7 + class MockWebSocket extends EventEmitter { 8 + constructor(url: string) { 9 + super(); 10 + } 11 + close() {} 12 + } 13 + 14 + // Mock the entire 'ws' module 15 + mock.module("ws", () => ({ 16 + default: MockWebSocket, 17 + })); 18 + 19 + describe("FirehoseSubscriber", () => { 20 + let subscriber: FirehoseSubscriber; 21 + let mockWsInstance: MockWebSocket; 22 + 23 + beforeEach(() => { 24 + subscriber = new FirehoseSubscriber(); 25 + // Mock the connect method to control the WebSocket instance 26 + (subscriber as any).connect = () => { 27 + const url = new URL("ws://localhost:1234"); 28 + if ((subscriber as any).cursor !== null) { 29 + url.searchParams.set("cursor", (subscriber as any).cursor.toString()); 30 + } 31 + const ws = new WebSocket(url.toString()); 32 + (subscriber as any).ws = ws; 33 + mockWsInstance = ws as any; 34 + 35 + ws.on("open", () => { 36 + subscriber.emit("connected"); 37 + }); 38 + ws.on("message", async (data: Buffer) => { 39 + try { 40 + const message = JSON.parse(data.toString()); 41 + if (message.seq) { 42 + await (subscriber as any).saveCursor(message.seq); 43 + } 44 + if (message.t === "#labels") { 45 + for (const label of message.labels) { 46 + subscriber.emit("label", label); 47 + } 48 + } 49 + } catch (error) { 50 + subscriber.emit("error", error); 51 + } 52 + }); 53 + ws.on("close", () => { 54 + (subscriber as any).ws = null; 55 + subscriber.emit("disconnected"); 56 + if ((subscriber as any).shouldReconnect) { 57 + (subscriber as any).scheduleReconnect(); 58 + } 59 + }); 60 + ws.on("error", (error) => { 61 + subscriber.emit("error", error); 62 + }); 63 + }; 64 + }); 65 + 66 + afterEach(() => { 67 + subscriber.stop(); 68 + }); 69 + 70 + test("should attempt to connect on start", async (done) => { 71 + subscriber.on("connected", () => { 72 + done(); 73 + }); 74 + await subscriber.start(); 75 + mockWsInstance.emit("open"); 76 + }); 77 + 78 + test("should emit label event on label for post", async (done) => { 79 + subscriber.on("label", (label) => { 80 + expect(label.uri).toBe("at://did:plc:user/app.bsky.feed.post/123"); 81 + done(); 82 + }); 83 + await subscriber.start(); 84 + mockWsInstance.emit( 85 + "message", 86 + Buffer.from( 87 + JSON.stringify({ 88 + op: 1, 89 + t: "#labels", 90 + labels: [ 91 + { 92 + src: "did:plc:labeler", 93 + uri: "at://did:plc:user/app.bsky.feed.post/123", 94 + val: "spam", 95 + cts: "2025-01-15T12:00:00Z", 96 + }, 97 + ], 98 + }) 99 + ) 100 + ); 101 + }); 102 + 103 + test("should emit label event on label for profile", async (done) => { 104 + subscriber.on("label", (label) => { 105 + expect(label.uri).toBe("did:plc:user"); 106 + done(); 107 + }); 108 + await subscriber.start(); 109 + mockWsInstance.emit( 110 + "message", 111 + Buffer.from( 112 + JSON.stringify({ 113 + op: 1, 114 + t: "#labels", 115 + labels: [ 116 + { 117 + src: "did:plc:labeler", 118 + uri: "did:plc:user", 119 + val: "spam", 120 + cts: "2025-01-15T12:00:00Z", 121 + }, 122 + ], 123 + }) 124 + ) 125 + ); 126 + }); 127 + 128 + test("should handle multiple labels in one message", async () => { 129 + let labelCount = 0; 130 + subscriber.on("label", () => { 131 + labelCount++; 132 + }); 133 + await subscriber.start(); 134 + mockWsInstance.emit( 135 + "message", 136 + Buffer.from( 137 + JSON.stringify({ 138 + op: 1, 139 + t: "#labels", 140 + labels: [ 141 + { 142 + src: "did:plc:labeler", 143 + uri: "at://did:plc:user/app.bsky.feed.post/123", 144 + val: "spam", 145 + cts: "2025-01-15T12:00:00Z", 146 + }, 147 + { 148 + src: "did:plc:labeler", 149 + uri: "did:plc:user", 150 + val: "spam", 151 + cts: "2025-01-15T12:00:00Z", 152 + }, 153 + ], 154 + }) 155 + ) 156 + ); 157 + expect(labelCount).toBe(2); 158 + }); 159 + 160 + test("should attempt to reconnect on close", (done) => { 161 + let connectAttempts = 0; 162 + (subscriber as any).connect = () => { 163 + connectAttempts++; 164 + if (connectAttempts > 1) { 165 + done(); 166 + } 167 + const url = new URL("ws://localhost:1234"); 168 + const ws = new WebSocket(url.toString()); 169 + (subscriber as any).ws = ws; 170 + mockWsInstance = ws as any; 171 + ws.on("close", () => { 172 + (subscriber as any).ws = null; 173 + subscriber.emit("disconnected"); 174 + if ((subscriber as any).shouldReconnect) { 175 + (subscriber as any).scheduleReconnect(); 176 + } 177 + }); 178 + }; 179 + 180 + subscriber.start(); 181 + mockWsInstance.emit("close"); 182 + }); 183 + 184 + test("should stop reconnecting after stop() is called", async (done) => { 185 + let connectAttempts = 0; 186 + (subscriber as any).connect = () => { 187 + connectAttempts++; 188 + const url = new URL("ws://localhost:1234"); 189 + const ws = new WebSocket(url.toString()); 190 + (subscriber as any).ws = ws; 191 + mockWsInstance = ws as any; 192 + ws.on("close", () => { 193 + (subscriber as any).ws = null; 194 + subscriber.emit("disconnected"); 195 + if ((subscriber as any).shouldReconnect) { 196 + (subscriber as any).scheduleReconnect(); 197 + } 198 + }); 199 + }; 200 + 201 + await subscriber.start(); 202 + subscriber.stop(); 203 + mockWsInstance.emit("close"); 204 + 205 + setTimeout(() => { 206 + expect(connectAttempts).toBe(1); 207 + done(); 208 + }, 2000); 209 + }); 210 + 211 + test("should increase backoff delay on multiple reconnects", () => { 212 + subscriber.start(); 213 + (subscriber as any).reconnectAttempts = 0; 214 + (subscriber as any).scheduleReconnect(); 215 + const initialBackoff = (subscriber as any).reconnectAttempts; 216 + (subscriber as any).scheduleReconnect(); 217 + const secondBackoff = (subscriber as any).reconnectAttempts; 218 + expect(secondBackoff).toBeGreaterThan(initialBackoff); 219 + }); 220 + });