A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Compare changes

Choose any two refs to compare.

+4 -1
.claude/settings.local.json
··· 2 2 "permissions": { 3 3 "allow": [ 4 4 "Bash(git add:*)", 5 - "Bash(git commit:*)" 5 + "Bash(git commit:*)", 6 + "mcp__git-mcp-server__git_push", 7 + "mcp__git-mcp-server__git_log", 8 + "mcp__git-mcp-server__git_add" 6 9 ], 7 10 "deny": [], 8 11 "ask": []
+3
.env.example
··· 6 6 PDS=bsky.social 7 7 WSS_URL=wss://your-labeler-service.com/xrpc/com.atproto.label.subscribeLabels 8 8 9 + # PLC Directory (for DID resolution) 10 + PLC_ENDPOINT=https://plc.wtf 11 + 9 12 # Blob & Image Handling 10 13 HYDRATE_BLOBS=false # Set to true to download images/videos 11 14 BLOB_STORAGE_TYPE=local # 'local' or 's3'
+21
LICENSE
··· 1 + MIT License 2 + 3 + Copyright (c) 2025 scarnecchia 4 + 5 + Permission is hereby granted, free of charge, to any person obtaining a copy 6 + of this software and associated documentation files (the "Software"), to deal 7 + in the Software without restriction, including without limitation the rights 8 + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 + copies of the Software, and to permit persons to whom the Software is 10 + furnished to do so, subject to the following conditions: 11 + 12 + The above copyright notice and this permission notice shall be included in all 13 + copies or substantial portions of the Software. 14 + 15 + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 21 + SOFTWARE.
+32 -13
README.md
··· 6 6 7 7 - **Real-time Label Capture**: Subscribe to any Bluesky labeler's firehose via WebSocket 8 8 - **Automatic Content Hydration**: Fetch full post records and user profiles for labeled content 9 + - **Blob Processing**: SHA-256 and perceptual hashing for images/videos with optional download 9 10 - **Intelligent Filtering**: Optionally filter labels by type to capture only what you need 11 + - **Rate Limiting**: Respects Bluesky API limits (3000 req/5min) with p-ratelimit 12 + - **Retry Logic**: Automatic retry with exponential backoff for transient failures 10 13 - **Cursor Persistence**: Resume from where you left off after restarts 11 - - **Automatic Reconnection**: Exponential backoff reconnection (1s-30s) for stability 14 + - **Automatic Reconnection**: Exponential backoff reconnection (1s-60s) for stability 12 15 - **DuckDB Storage**: Embedded analytics database optimized for ML pipelines 13 16 - **Docker Ready**: Containerized deployment with volume persistence 14 17 - **Type-Safe**: Full TypeScript implementation with Zod validation ··· 18 21 ``` 19 22 Firehose โ†’ Label Event โ†’ Filter โ†’ Store Label โ†’ Hydration Queue 20 23 โ†“ โ†“ 21 - DuckDB โ† [Post/Profile Fetch] 24 + DuckDB โ† [Post/Profile Fetch] โ†’ Blob Processing 25 + โ†“ 26 + Hash + Store 22 27 ``` 23 28 24 29 ### Components 25 30 26 31 - **Firehose Subscriber**: WebSocket client with DAG-CBOR decoding 27 32 - **Label Filter**: Configurable allow-list for label types 28 - - **Hydration Services**: Automatic post and profile data fetching 33 + - **Hydration Services**: Automatic post and profile data fetching with rate limiting 34 + - **Blob Processor**: SHA-256 and perceptual hash computation with optional download 29 35 - **Hydration Queue**: Async queue with deduplication 36 + - **Rate Limiter**: p-ratelimit enforcing 3000 requests per 5 minutes 37 + - **Retry Logic**: Exponential backoff for transient failures 30 38 - **Repository Layer**: Clean database abstraction for all entities 31 39 32 40 ## Quick Start ··· 60 68 WSS_URL=wss://your-labeler.com/xrpc/com.atproto.label.subscribeLabels 61 69 62 70 # Optional: Filter specific labels 63 - CAPTURE_LABELS=spam,hate-speech,csam 71 + CAPTURE_LABELS=spam,hate-speech 64 72 65 73 # Logging 66 74 LOG_LEVEL=info ··· 104 112 - `CAPTURE_LABELS`: Comma-separated list of label values to capture 105 113 - `DB_PATH`: Path to DuckDB database file (default: `./data/skywatch.duckdb`) 106 114 - `LOG_LEVEL`: Logging level (default: `info`) 107 - - `HYDRATE_BLOBS`: Enable blob download (default: `false`) - Phase 4 108 - - `BLOB_STORAGE_TYPE`: Storage backend for blobs (`local` or `s3`) - Phase 4 109 - - `BLOB_STORAGE_PATH`: Local path for blob storage - Phase 4 115 + - `HYDRATE_BLOBS`: Enable blob download (default: `false`) 116 + - `BLOB_STORAGE_TYPE`: Storage backend for blobs (`local` or `s3`) 117 + - `BLOB_STORAGE_PATH`: Local path for blob storage (default: `./data/blobs`) 110 118 111 - ### S3 Configuration (Phase 4, Optional) 119 + ### S3 Configuration (Optional) 112 120 113 121 - `S3_BUCKET`: S3 bucket name 114 122 - `S3_REGION`: AWS region ··· 150 158 - `display_name`: Display name 151 159 - `description`: Bio/description 152 160 153 - ### Blobs Table (Phase 4) 161 + ### Blobs Table 154 162 Image and video blob metadata. 155 163 156 164 - `post_uri`: Associated post URI ··· 165 173 Filter labels by providing a comma-separated list in `CAPTURE_LABELS`: 166 174 167 175 ```env 168 - CAPTURE_LABELS=spam,hate-speech,csam,scam 176 + CAPTURE_LABELS=spam,hate-speech,scam 169 177 ``` 170 178 171 179 If not set, all labels are captured. ··· 188 196 - `Received label`: Label captured and stored 189 197 - `Post hydrated successfully`: Post data fetched 190 198 - `Profile hydrated successfully`: Profile data fetched 199 + - `Blob processed`: Blob hashed and optionally stored 200 + 201 + ## Rate Limiting 202 + 203 + The service implements p-ratelimit to respect Bluesky's API limits: 204 + - **Limit**: 3000 requests per 5 minutes per IP address 205 + - **Concurrency**: Up to 48 concurrent requests 206 + - **Backoff**: Automatic delays when approaching limits 207 + - **Retry Logic**: Exponential backoff for rate limit errors (1s-10s) 191 208 192 209 ## Development 193 210 ··· 196 213 ``` 197 214 skywatch-tail/ 198 215 โ”œโ”€โ”€ src/ 216 + โ”‚ โ”œโ”€โ”€ blobs/ # Blob processing and storage 199 217 โ”‚ โ”œโ”€โ”€ config/ # Environment validation 200 218 โ”‚ โ”œโ”€โ”€ database/ # Schema and repositories 201 219 โ”‚ โ”œโ”€โ”€ firehose/ # WebSocket subscriber 202 220 โ”‚ โ”œโ”€โ”€ hydration/ # Content hydration services 203 221 โ”‚ โ”œโ”€โ”€ logger/ # Pino logger setup 222 + โ”‚ โ”œโ”€โ”€ utils/ # Retry logic and helpers 204 223 โ”‚ โ””โ”€โ”€ index.ts # Main entry point 205 224 โ”œโ”€โ”€ tests/ 206 225 โ”‚ โ”œโ”€โ”€ integration/ # Database integration tests ··· 244 263 - [x] Phase 1: Core infrastructure (Docker, config, database, logging) 245 264 - [x] Phase 2: Firehose connection and label capture 246 265 - [x] Phase 3: Content hydration (posts and profiles) 247 - - [ ] Phase 4: Blob processing (image/video hashing and storage) 248 - - [ ] Phase 5: Rate limiting and optimization 266 + - [x] Phase 4: Blob processing (image/video hashing and storage) 267 + - [x] Phase 5: Rate limiting and optimization 249 268 - [ ] Phase 6: Comprehensive testing 250 269 - [ ] Phase 7: Documentation 251 270 252 271 ## Safety Features 253 272 254 273 ### Blob Hydration 255 - By default, `HYDRATE_BLOBS` is `false`. This prevents accidental download of potentially harmful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes for ML training. 274 + By default, `HYDRATE_BLOBS` is `false`. This prevents accidental download of potentially harmful / and or unlawful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes. 256 275 257 276 Only enable blob download if: 258 277 1. You understand the legal and safety implications
+164
docs/profile-blob-hydration.md
··· 1 + # Profile Blob Hydration - Implementation Notes 2 + 3 + ## Overview 4 + 5 + This document captures key learnings from implementing avatar and banner blob hydration for Bluesky profiles. 6 + 7 + ## Key Discoveries 8 + 9 + ### 1. CID Deserialization in @atproto/api 10 + 11 + The `@atproto/api` library deserializes blob references from their JSON `$link` representation into CID class objects. 12 + 13 + **Raw JSON from API:** 14 + ```json 15 + { 16 + "avatar": { 17 + "$type": "blob", 18 + "ref": { 19 + "$link": "bafkreigg3s6plegjncmxubeufbohj3qasbm4r23q2x7zlivdhccfqfypve" 20 + }, 21 + "mimeType": "image/jpeg", 22 + "size": 101770 23 + } 24 + } 25 + ``` 26 + 27 + **What you get in TypeScript:** 28 + ```typescript 29 + record.avatar.ref // CID object with { code, version, hash, ... } 30 + ``` 31 + 32 + **Solution:** 33 + ```typescript 34 + const cid = record.avatar.ref.toString(); // "bafkrei..." 35 + ``` 36 + 37 + ### 2. PDS Endpoint Resolution 38 + 39 + Users can be on different Personal Data Servers (PDS), not just `bsky.social`. Blobs must be fetched from the user's actual PDS. 40 + 41 + **Process:** 42 + 1. Query PLC directory for DID document: `https://plc.wtf/${did}` 43 + 2. Find service with `id: "#atproto_pds"` and `type: "AtprotoPersonalDataServer"` 44 + 3. Extract `serviceEndpoint` URL 45 + 4. Use that endpoint for `com.atproto.sync.getBlob` 46 + 47 + **Example:** 48 + ```typescript 49 + const didDoc = await fetch(`https://plc.wtf/${did}`).then(r => r.json()); 50 + const pdsService = didDoc.service?.find(s => 51 + s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 52 + ); 53 + const pdsEndpoint = pdsService.serviceEndpoint; // e.g., "https://waxcap.us-west.host.bsky.network" 54 + ``` 55 + 56 + ### 3. Correct Blob Fetching 57 + 58 + **Don't use CDN paths** - they don't work reliably for all blobs and require authentication context. 59 + 60 + **Use the AT Protocol API:** 61 + ```typescript 62 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 63 + const response = await fetch(blobUrl); 64 + const blobData = Buffer.from(await response.arrayBuffer()); 65 + ``` 66 + 67 + ### 4. Database Schema Design 68 + 69 + **Separate tables for different blob types:** 70 + 71 + - `blobs` table: Post images with FK to `posts(uri)` 72 + - `profile_blobs` table: Avatars/banners with FK to `profiles(did)` 73 + 74 + This allows proper relational queries and analysis. 75 + 76 + **Profile blobs schema:** 77 + ```sql 78 + CREATE TABLE profile_blobs ( 79 + did TEXT NOT NULL, 80 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 81 + blob_cid TEXT NOT NULL, 82 + sha256 TEXT NOT NULL, 83 + phash TEXT, 84 + storage_path TEXT, 85 + mimetype TEXT, 86 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 87 + PRIMARY KEY (did, blob_type, captured_at), 88 + FOREIGN KEY (did) REFERENCES profiles(did) 89 + ); 90 + ``` 91 + 92 + ### 5. Change Tracking 93 + 94 + Including `captured_at` in the primary key allows tracking when users change their avatars/banners. 95 + 96 + **Query latest state:** 97 + ```sql 98 + SELECT * FROM profile_blobs 99 + WHERE did = ? AND blob_type = ? 100 + ORDER BY captured_at DESC 101 + LIMIT 1 102 + ``` 103 + 104 + **Only insert if changed:** 105 + ```typescript 106 + const latest = await findLatestByDidAndType(did, type); 107 + if (latest && latest.blob_cid === cid) { 108 + return; // No change, skip 109 + } 110 + // Insert new row with current timestamp 111 + ``` 112 + 113 + ### 6. Sentinel Values for Missing Data 114 + 115 + Use empty string (`""`) to distinguish "we checked, user has no avatar" from NULL "we haven't checked yet". 116 + 117 + ```typescript 118 + if (record.avatar?.ref) { 119 + avatarCid = record.avatar.ref.toString(); 120 + } else { 121 + avatarCid = ""; // Explicitly checked, not present 122 + } 123 + ``` 124 + 125 + This prevents infinite re-hydration loops for profiles without avatars. 126 + 127 + ### 7. Profile Re-hydration Logic 128 + 129 + ```typescript 130 + const existingProfile = await findByDid(did); 131 + const needsRehydration = existingProfile && 132 + (existingProfile.avatar_cid === null || existingProfile.banner_cid === null); 133 + 134 + if (existingProfile && !needsRehydration) { 135 + return; // Skip 136 + } 137 + ``` 138 + 139 + ## Configuration 140 + 141 + - `PLC_ENDPOINT`: DID resolution endpoint (default: `https://plc.wtf`) 142 + - Can be changed to `https://plc.directory` or custom instance 143 + - plc.wtf is faster but unofficial 144 + 145 + ## Common Errors 146 + 147 + ### "RepoNotFound" 148 + - **Cause:** Querying wrong PDS endpoint 149 + - **Solution:** Resolve correct PDS from DID document 150 + 151 + ### Foreign Key Constraint Violation 152 + - **Cause:** Trying to insert profile blobs into `blobs` table 153 + - **Solution:** Use separate `profile_blobs` table 154 + 155 + ### Missing CIDs Despite API Returning Them 156 + - **Cause:** Trying to access `ref.$link` when ref is a CID object 157 + - **Solution:** Call `.toString()` on the CID object 158 + 159 + ## Related Files 160 + 161 + - `src/hydration/profiles.service.ts` - Main hydration logic 162 + - `src/database/profile-blobs.repository.ts` - Profile blob persistence 163 + - `src/database/schema.ts` - Table definitions 164 + - `src/config/index.ts` - PLC endpoint configuration
-17
src/agent.ts
··· 1 - import { setGlobalDispatcher, Agent as Agent } from "undici"; 2 - setGlobalDispatcher(new Agent({ connect: { timeout: 20_000 } })); 3 - import { BSKY_HANDLE, BSKY_PASSWORD, PDS } from "./config.js"; 4 - import { AtpAgent } from "@atproto/api"; 5 - 6 - export const agent = new AtpAgent({ 7 - service: `https://${PDS}`, 8 - }); 9 - export const login = () => 10 - agent.login({ 11 - identifier: BSKY_HANDLE, 12 - password: BSKY_PASSWORD, 13 - }); 14 - 15 - export const isLoggedIn = login() 16 - .then(() => true) 17 - .catch(() => false);
+68
src/blobs/hasher.ts
··· 1 + import crypto from "crypto"; 2 + import sharp from "sharp"; 3 + import { logger } from "../logger/index.js"; 4 + 5 + export async function computeSha256(buffer: Buffer): Promise<string> { 6 + return crypto.createHash("sha256").update(buffer).digest("hex"); 7 + } 8 + 9 + export async function computePerceptualHash(buffer: Buffer): Promise<string> { 10 + try { 11 + const image = sharp(buffer); 12 + const metadata = await image.metadata(); 13 + 14 + if (!metadata.width || !metadata.height) { 15 + throw new Error("Invalid image metadata"); 16 + } 17 + 18 + const resized = await image 19 + .resize(8, 8, { fit: "fill" }) 20 + .grayscale() 21 + .raw() 22 + .toBuffer(); 23 + 24 + const pixels = new Uint8Array(resized); 25 + const avg = 26 + pixels.reduce((sum, val) => sum + val, 0) / pixels.length; 27 + 28 + let hash = ""; 29 + for (let i = 0; i < pixels.length; i++) { 30 + hash += pixels[i] > avg ? "1" : "0"; 31 + } 32 + 33 + return BigInt("0b" + hash).toString(16).padStart(16, "0"); 34 + } catch (error) { 35 + logger.error({ error }, "Failed to compute perceptual hash"); 36 + throw error; 37 + } 38 + } 39 + 40 + export interface BlobHashes { 41 + sha256: string; 42 + phash?: string; 43 + } 44 + 45 + export async function computeBlobHashes( 46 + buffer: Buffer, 47 + mimetype?: string 48 + ): Promise<BlobHashes> { 49 + const sha256 = await computeSha256(buffer); 50 + 51 + if ( 52 + mimetype?.startsWith("image/") && 53 + !mimetype.includes("svg") 54 + ) { 55 + try { 56 + const phash = await computePerceptualHash(buffer); 57 + return { sha256, phash }; 58 + } catch (error) { 59 + logger.warn( 60 + { error, mimetype }, 61 + "Failed to compute pHash, returning SHA256 only" 62 + ); 63 + return { sha256 }; 64 + } 65 + } 66 + 67 + return { sha256 }; 68 + }
+188
src/blobs/processor.ts
··· 1 + import { AtpAgent } from "@atproto/api"; 2 + import { Database } from "duckdb"; 3 + import { BlobsRepository } from "../database/blobs.repository.js"; 4 + import { computeBlobHashes } from "./hasher.js"; 5 + import { LocalBlobStorage } from "./storage/local.js"; 6 + import { S3BlobStorage } from "./storage/s3.js"; 7 + import { config } from "../config/index.js"; 8 + import { logger } from "../logger/index.js"; 9 + 10 + export interface BlobReference { 11 + cid: string; 12 + mimeType?: string; 13 + } 14 + 15 + export interface BlobStorage { 16 + store(cid: string, data: Buffer, mimeType?: string): Promise<string>; 17 + retrieve(cid: string): Promise<Buffer | null>; 18 + } 19 + 20 + export class BlobProcessor { 21 + private blobsRepo: BlobsRepository; 22 + private storage: BlobStorage | null = null; 23 + private agent: AtpAgent; 24 + 25 + constructor(db: Database, agent: AtpAgent) { 26 + this.blobsRepo = new BlobsRepository(db); 27 + this.agent = agent; 28 + 29 + if (config.blobs.hydrateBlobs) { 30 + if (config.blobs.storage.type === "s3") { 31 + this.storage = new S3BlobStorage( 32 + config.blobs.storage.s3Bucket!, 33 + config.blobs.storage.s3Region! 34 + ); 35 + } else { 36 + this.storage = new LocalBlobStorage( 37 + config.blobs.storage.localPath 38 + ); 39 + } 40 + } 41 + } 42 + 43 + extractBlobReferences(embedsJson: any): BlobReference[] { 44 + const refs: BlobReference[] = []; 45 + 46 + if (!embedsJson || !Array.isArray(embedsJson)) { 47 + return refs; 48 + } 49 + 50 + for (const embed of embedsJson) { 51 + if (embed.images && Array.isArray(embed.images)) { 52 + for (const img of embed.images) { 53 + if (img.image?.ref?.$link) { 54 + refs.push({ 55 + cid: img.image.ref.$link, 56 + mimeType: img.image.mimeType, 57 + }); 58 + } 59 + } 60 + } 61 + 62 + if (embed.media?.images && Array.isArray(embed.media.images)) { 63 + for (const img of embed.media.images) { 64 + if (img.image?.ref?.$link) { 65 + refs.push({ 66 + cid: img.image.ref.$link, 67 + mimeType: img.image.mimeType, 68 + }); 69 + } 70 + } 71 + } 72 + 73 + if (embed.video?.ref?.$link) { 74 + refs.push({ 75 + cid: embed.video.ref.$link, 76 + mimeType: embed.video.mimeType, 77 + }); 78 + } 79 + } 80 + 81 + return refs; 82 + } 83 + 84 + async processBlobs(postUri: string, embedsJson: any): Promise<void> { 85 + const blobRefs = this.extractBlobReferences(embedsJson); 86 + 87 + if (blobRefs.length === 0) { 88 + return; 89 + } 90 + 91 + for (const ref of blobRefs) { 92 + try { 93 + await this.processBlob(postUri, ref); 94 + } catch (error) { 95 + logger.error( 96 + { error, postUri, cid: ref.cid }, 97 + "Failed to process blob" 98 + ); 99 + } 100 + } 101 + } 102 + 103 + private parseBlobUri(uri: string): { did: string; type: 'post' | 'avatar' | 'banner' } { 104 + if (uri.startsWith("profile://")) { 105 + const match = uri.match(/^profile:\/\/([^/]+)\/(avatar|banner)$/); 106 + if (match) { 107 + return { did: match[1], type: match[2] as 'avatar' | 'banner' }; 108 + } 109 + } 110 + 111 + const [, did] = uri.replace("at://", "").split("/"); 112 + return { did, type: 'post' }; 113 + } 114 + 115 + private async processBlob( 116 + postUri: string, 117 + ref: BlobReference 118 + ): Promise<void> { 119 + const existing = await this.blobsRepo.findByCid(ref.cid); 120 + if (existing) { 121 + await this.blobsRepo.insert({ 122 + post_uri: postUri, 123 + blob_cid: ref.cid, 124 + sha256: existing.sha256, 125 + phash: existing.phash, 126 + storage_path: existing.storage_path, 127 + mimetype: existing.mimetype, 128 + }); 129 + logger.debug( 130 + { postUri, cid: ref.cid }, 131 + "Blob already processed, reusing hashes" 132 + ); 133 + return; 134 + } 135 + 136 + const { did, type } = this.parseBlobUri(postUri); 137 + const pds = `https://${config.bsky.pds}`; 138 + const blobUrl = `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${ref.cid}`; 139 + 140 + try { 141 + const response = await fetch(blobUrl); 142 + 143 + if (!response.ok) { 144 + logger.warn( 145 + { postUri, cid: ref.cid, status: response.status, did }, 146 + "Failed to fetch blob" 147 + ); 148 + return; 149 + } 150 + 151 + const blobData = Buffer.from(await response.arrayBuffer()); 152 + 153 + let storagePath: string | undefined; 154 + if (this.storage && config.blobs.hydrateBlobs) { 155 + storagePath = await this.storage.store( 156 + ref.cid, 157 + blobData, 158 + ref.mimeType 159 + ); 160 + } 161 + 162 + const hashes = await computeBlobHashes( 163 + blobData, 164 + ref.mimeType 165 + ); 166 + 167 + await this.blobsRepo.insert({ 168 + post_uri: postUri, 169 + blob_cid: ref.cid, 170 + sha256: hashes.sha256, 171 + phash: hashes.phash, 172 + storage_path: storagePath, 173 + mimetype: ref.mimeType, 174 + }); 175 + 176 + logger.info( 177 + { postUri, cid: ref.cid, sha256: hashes.sha256, type }, 178 + "Blob processed successfully" 179 + ); 180 + } catch (error) { 181 + logger.error( 182 + { error, postUri, cid: ref.cid }, 183 + "Failed to download or hash blob" 184 + ); 185 + throw error; 186 + } 187 + } 188 + }
+82
src/blobs/storage/local.ts
··· 1 + import * as fs from "fs/promises"; 2 + import * as path from "path"; 3 + import { BlobStorage } from "../processor.js"; 4 + import { logger } from "../../logger/index.js"; 5 + 6 + export class LocalBlobStorage implements BlobStorage { 7 + constructor(private basePath: string) {} 8 + 9 + async store( 10 + cid: string, 11 + data: Buffer, 12 + mimeType?: string 13 + ): Promise<string> { 14 + try { 15 + const extension = this.getExtensionFromMime(mimeType); 16 + const filename = `${cid}${extension}`; 17 + 18 + const dir = path.join( 19 + this.basePath, 20 + cid.substring(0, 2), 21 + cid.substring(2, 4) 22 + ); 23 + 24 + await fs.mkdir(dir, { recursive: true }); 25 + 26 + const fullPath = path.join(dir, filename); 27 + await fs.writeFile(fullPath, data); 28 + 29 + logger.debug({ cid, path: fullPath }, "Blob stored locally"); 30 + 31 + return fullPath; 32 + } catch (error) { 33 + logger.error({ error, cid }, "Failed to store blob locally"); 34 + throw error; 35 + } 36 + } 37 + 38 + async retrieve(cid: string): Promise<Buffer | null> { 39 + try { 40 + const possibleExtensions = ["", ".jpg", ".jpeg", ".png", ".webp", ".mp4"]; 41 + 42 + for (const ext of possibleExtensions) { 43 + const filename = `${cid}${ext}`; 44 + const filePath = path.join( 45 + this.basePath, 46 + cid.substring(0, 2), 47 + cid.substring(2, 4), 48 + filename 49 + ); 50 + 51 + try { 52 + const data = await fs.readFile(filePath); 53 + return data; 54 + } catch { 55 + continue; 56 + } 57 + } 58 + 59 + logger.warn({ cid }, "Blob not found in local storage"); 60 + return null; 61 + } catch (error) { 62 + logger.error({ error, cid }, "Failed to retrieve blob from local storage"); 63 + throw error; 64 + } 65 + } 66 + 67 + private getExtensionFromMime(mimeType?: string): string { 68 + if (!mimeType) return ""; 69 + 70 + const mimeMap: Record<string, string> = { 71 + "image/jpeg": ".jpg", 72 + "image/jpg": ".jpg", 73 + "image/png": ".png", 74 + "image/webp": ".webp", 75 + "image/gif": ".gif", 76 + "video/mp4": ".mp4", 77 + "video/webm": ".webm", 78 + }; 79 + 80 + return mimeMap[mimeType.toLowerCase()] || ""; 81 + } 82 + }
+71
src/blobs/storage/s3.ts
··· 1 + import { 2 + S3Client, 3 + PutObjectCommand, 4 + GetObjectCommand, 5 + } from "@aws-sdk/client-s3"; 6 + import { BlobStorage } from "../processor.js"; 7 + import { logger } from "../../logger/index.js"; 8 + 9 + export class S3BlobStorage implements BlobStorage { 10 + private client: S3Client; 11 + private bucket: string; 12 + 13 + constructor(bucket: string, region: string) { 14 + this.bucket = bucket; 15 + this.client = new S3Client({ region }); 16 + } 17 + 18 + async store( 19 + cid: string, 20 + data: Buffer, 21 + mimeType?: string 22 + ): Promise<string> { 23 + try { 24 + const key = `blobs/${cid.substring(0, 2)}/${cid.substring(2, 4)}/${cid}`; 25 + 26 + await this.client.send( 27 + new PutObjectCommand({ 28 + Bucket: this.bucket, 29 + Key: key, 30 + Body: data, 31 + ContentType: mimeType, 32 + }) 33 + ); 34 + 35 + logger.debug({ cid, key }, "Blob stored in S3"); 36 + 37 + return `s3://${this.bucket}/${key}`; 38 + } catch (error) { 39 + logger.error({ error, cid }, "Failed to store blob in S3"); 40 + throw error; 41 + } 42 + } 43 + 44 + async retrieve(cid: string): Promise<Buffer | null> { 45 + try { 46 + const key = `blobs/${cid.substring(0, 2)}/${cid.substring(2, 4)}/${cid}`; 47 + 48 + const response = await this.client.send( 49 + new GetObjectCommand({ 50 + Bucket: this.bucket, 51 + Key: key, 52 + }) 53 + ); 54 + 55 + if (!response.Body) { 56 + logger.warn({ cid }, "Blob not found in S3"); 57 + return null; 58 + } 59 + 60 + const chunks: Uint8Array[] = []; 61 + for await (const chunk of response.Body as any) { 62 + chunks.push(chunk); 63 + } 64 + 65 + return Buffer.concat(chunks); 66 + } catch (error) { 67 + logger.error({ error, cid }, "Failed to retrieve blob from S3"); 68 + return null; 69 + } 70 + } 71 + }
+24 -29
src/config/index.ts
··· 9 9 password: z.string().min(1, "BSKY_PASSWORD is required"), 10 10 pds: z.string().default("bsky.social"), 11 11 }), 12 + plc: z.object({ 13 + endpoint: z.string().url().default("https://plc.wtf"), 14 + }), 12 15 labeler: z.object({ 13 16 wssUrl: z.string().url("WSS_URL must be a valid URL"), 14 17 }), 15 18 blobs: z.object({ 16 - hydrate: z.boolean().default(false), 17 - storageType: z.enum(["local", "s3"]).default("local"), 18 - storagePath: z.string().default("./data/blobs"), 19 + hydrateBlobs: z.boolean().default(false), 20 + storage: z.object({ 21 + type: z.enum(["local", "s3"]).default("local"), 22 + localPath: z.string().default("./data/blobs"), 23 + s3Bucket: z.string().optional(), 24 + s3Region: z.string().optional(), 25 + }), 19 26 }), 20 - s3: z 21 - .object({ 22 - bucket: z.string().optional(), 23 - region: z.string().optional(), 24 - accessKeyId: z.string().optional(), 25 - secretAccessKey: z.string().optional(), 26 - }) 27 - .optional(), 28 27 database: z.object({ 29 28 path: z.string().default("./data/skywatch.duckdb"), 30 29 }), ··· 47 46 password: process.env.BSKY_PASSWORD, 48 47 pds: process.env.PDS, 49 48 }, 49 + plc: { 50 + endpoint: process.env.PLC_ENDPOINT, 51 + }, 50 52 labeler: { 51 53 wssUrl: process.env.WSS_URL, 52 54 }, 53 55 blobs: { 54 - hydrate: process.env.HYDRATE_BLOBS === "true", 55 - storageType: process.env.BLOB_STORAGE_TYPE, 56 - storagePath: process.env.BLOB_STORAGE_PATH, 56 + hydrateBlobs: process.env.HYDRATE_BLOBS === "true", 57 + storage: { 58 + type: process.env.BLOB_STORAGE_TYPE, 59 + localPath: process.env.BLOB_STORAGE_PATH, 60 + s3Bucket: process.env.S3_BUCKET, 61 + s3Region: process.env.S3_REGION, 62 + }, 57 63 }, 58 - s3: 59 - process.env.BLOB_STORAGE_TYPE === "s3" 60 - ? { 61 - bucket: process.env.S3_BUCKET, 62 - region: process.env.S3_REGION, 63 - accessKeyId: process.env.AWS_ACCESS_KEY_ID, 64 - secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, 65 - } 66 - : undefined, 67 64 database: { 68 65 path: process.env.DB_PATH, 69 66 }, ··· 85 82 process.exit(1); 86 83 } 87 84 88 - if (result.data.blobs.storageType === "s3") { 85 + if (result.data.blobs.storage.type === "s3") { 89 86 if ( 90 - !result.data.s3?.bucket || 91 - !result.data.s3?.region || 92 - !result.data.s3?.accessKeyId || 93 - !result.data.s3?.secretAccessKey 87 + !result.data.blobs.storage.s3Bucket || 88 + !result.data.blobs.storage.s3Region 94 89 ) { 95 90 console.error( 96 - "S3 configuration is incomplete. Required: S3_BUCKET, S3_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY" 91 + "S3 configuration is incomplete. Required: S3_BUCKET, S3_REGION" 97 92 ); 98 93 process.exit(1); 99 94 }
+17
src/database/blobs.repository.ts
··· 103 103 ); 104 104 }); 105 105 } 106 + 107 + async findByCid(cid: string): Promise<Blob | null> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`, 111 + cid, 112 + (err, rows: Blob[]) => { 113 + if (err) { 114 + logger.error({ err, cid }, "Failed to find blob by CID"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows?.[0] || null); 119 + } 120 + ); 121 + }); 122 + } 106 123 }
+123
src/database/profile-blobs.repository.ts
··· 1 + import { Database } from "duckdb"; 2 + import { logger } from "../logger/index.js"; 3 + 4 + export interface ProfileBlob { 5 + did: string; 6 + blob_type: "avatar" | "banner"; 7 + blob_cid: string; 8 + sha256: string; 9 + phash?: string; 10 + storage_path?: string; 11 + mimetype?: string; 12 + captured_at?: Date; 13 + } 14 + 15 + export class ProfileBlobsRepository { 16 + constructor(private db: Database) {} 17 + 18 + async insert(blob: ProfileBlob): Promise<void> { 19 + return new Promise((resolve, reject) => { 20 + this.db.prepare( 21 + ` 22 + INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at) 23 + VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP)) 24 + `, 25 + (err, stmt) => { 26 + if (err) { 27 + logger.error({ err }, "Failed to prepare profile blob insert statement"); 28 + reject(err); 29 + return; 30 + } 31 + 32 + stmt.run( 33 + blob.did, 34 + blob.blob_type, 35 + blob.blob_cid, 36 + blob.sha256, 37 + blob.phash || null, 38 + blob.storage_path || null, 39 + blob.mimetype || null, 40 + blob.captured_at || null, 41 + (err) => { 42 + if (err) { 43 + logger.error({ err, blob }, "Failed to insert profile blob"); 44 + reject(err); 45 + return; 46 + } 47 + resolve(); 48 + } 49 + ); 50 + } 51 + ); 52 + }); 53 + } 54 + 55 + async findByDid(did: string): Promise<ProfileBlob[]> { 56 + return new Promise((resolve, reject) => { 57 + this.db.all( 58 + `SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`, 59 + did, 60 + (err, rows: ProfileBlob[]) => { 61 + if (err) { 62 + logger.error({ err, did }, "Failed to find profile blobs by DID"); 63 + reject(err); 64 + return; 65 + } 66 + resolve(rows || []); 67 + } 68 + ); 69 + }); 70 + } 71 + 72 + async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> { 73 + return new Promise((resolve, reject) => { 74 + this.db.all( 75 + `SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`, 76 + did, 77 + blobType, 78 + (err, rows: ProfileBlob[]) => { 79 + if (err) { 80 + logger.error({ err, did, blobType }, "Failed to find latest profile blob"); 81 + reject(err); 82 + return; 83 + } 84 + resolve(rows?.[0] || null); 85 + } 86 + ); 87 + }); 88 + } 89 + 90 + async findBySha256(sha256: string): Promise<ProfileBlob | null> { 91 + return new Promise((resolve, reject) => { 92 + this.db.all( 93 + `SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`, 94 + sha256, 95 + (err, rows: ProfileBlob[]) => { 96 + if (err) { 97 + logger.error({ err, sha256 }, "Failed to find profile blob by SHA256"); 98 + reject(err); 99 + return; 100 + } 101 + resolve(rows?.[0] || null); 102 + } 103 + ); 104 + }); 105 + } 106 + 107 + async findByPhash(phash: string): Promise<ProfileBlob[]> { 108 + return new Promise((resolve, reject) => { 109 + this.db.all( 110 + `SELECT * FROM profile_blobs WHERE phash = $1`, 111 + phash, 112 + (err, rows: ProfileBlob[]) => { 113 + if (err) { 114 + logger.error({ err, phash }, "Failed to find profile blobs by pHash"); 115 + reject(err); 116 + return; 117 + } 118 + resolve(rows || []); 119 + } 120 + ); 121 + }); 122 + } 123 + }
+9 -3
src/database/profiles.repository.ts
··· 6 6 handle?: string; 7 7 display_name?: string; 8 8 description?: string; 9 + avatar_cid?: string; 10 + banner_cid?: string; 9 11 } 10 12 11 13 export class ProfilesRepository { ··· 15 17 return new Promise((resolve, reject) => { 16 18 this.db.prepare( 17 19 ` 18 - INSERT INTO profiles (did, handle, display_name, description) 19 - VALUES ($1, $2, $3, $4) 20 + INSERT INTO profiles (did, handle, display_name, description, avatar_cid, banner_cid) 21 + VALUES ($1, $2, $3, $4, $5, $6) 20 22 ON CONFLICT (did) DO UPDATE SET 21 23 handle = EXCLUDED.handle, 22 24 display_name = EXCLUDED.display_name, 23 - description = EXCLUDED.description 25 + description = EXCLUDED.description, 26 + avatar_cid = EXCLUDED.avatar_cid, 27 + banner_cid = EXCLUDED.banner_cid 24 28 `, 25 29 (err, stmt) => { 26 30 if (err) { ··· 34 38 profile.handle || null, 35 39 profile.display_name || null, 36 40 profile.description || null, 41 + profile.avatar_cid || null, 42 + profile.banner_cid || null, 37 43 (err) => { 38 44 if (err) { 39 45 logger.error({ err, profile }, "Failed to insert profile");
+74 -3
src/database/schema.ts
··· 34 34 did TEXT PRIMARY KEY, 35 35 handle TEXT, 36 36 display_name TEXT, 37 - description TEXT 37 + description TEXT, 38 + avatar_cid TEXT, 39 + banner_cid TEXT 38 40 ); 39 41 40 42 -- Blobs table: stores information about image blobs found in posts ··· 49 51 FOREIGN KEY (post_uri) REFERENCES posts(uri) 50 52 ); 51 53 54 + -- Profile blobs table: stores avatar and banner blobs for profiles 55 + CREATE TABLE IF NOT EXISTS profile_blobs ( 56 + did TEXT NOT NULL, 57 + blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')), 58 + blob_cid TEXT NOT NULL, 59 + sha256 TEXT NOT NULL, 60 + phash TEXT, 61 + storage_path TEXT, 62 + mimetype TEXT, 63 + captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 64 + PRIMARY KEY (did, blob_type, captured_at), 65 + FOREIGN KEY (did) REFERENCES profiles(did) 66 + ); 67 + 52 68 -- Indexes for performance 53 69 CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri); 54 70 CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val); 55 71 CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts); 56 72 CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did); 73 + CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid); 57 74 CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256); 58 75 CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash); 76 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256); 77 + CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash); 59 78 `; 60 79 80 + async function migrateProfilesTable(): Promise<void> { 81 + const db = getDatabase(); 82 + 83 + return new Promise((resolve, reject) => { 84 + db.all( 85 + "SELECT column_name FROM information_schema.columns WHERE table_name = 'profiles'", 86 + (err, rows: any[]) => { 87 + if (err) { 88 + logger.error({ err }, "Failed to check profiles table columns"); 89 + reject(err); 90 + return; 91 + } 92 + 93 + const columnNames = rows.map((row) => row.column_name); 94 + const hasAvatarCid = columnNames.includes("avatar_cid"); 95 + const hasBannerCid = columnNames.includes("banner_cid"); 96 + 97 + if (!hasAvatarCid || !hasBannerCid) { 98 + logger.info("Migrating profiles table to add avatar_cid and banner_cid columns"); 99 + 100 + const migrations: string[] = []; 101 + if (!hasAvatarCid) { 102 + migrations.push("ALTER TABLE profiles ADD COLUMN avatar_cid TEXT"); 103 + } 104 + if (!hasBannerCid) { 105 + migrations.push("ALTER TABLE profiles ADD COLUMN banner_cid TEXT"); 106 + } 107 + 108 + db.exec(migrations.join("; "), (err) => { 109 + if (err) { 110 + logger.error({ err }, "Failed to migrate profiles table"); 111 + reject(err); 112 + return; 113 + } 114 + logger.info("Profiles table migration completed"); 115 + resolve(); 116 + }); 117 + } else { 118 + logger.debug("Profiles table already has avatar_cid and banner_cid columns"); 119 + resolve(); 120 + } 121 + } 122 + ); 123 + }); 124 + } 125 + 61 126 export async function initializeSchema(): Promise<void> { 62 127 const db = getDatabase(); 63 128 64 129 return new Promise((resolve, reject) => { 65 - db.exec(SCHEMA_SQL, (err) => { 130 + db.exec(SCHEMA_SQL, async (err) => { 66 131 if (err) { 67 132 logger.error({ err }, "Failed to initialize schema"); 68 133 reject(err); 69 134 return; 70 135 } 71 136 logger.info("Database schema initialized"); 72 - resolve(); 137 + 138 + try { 139 + await migrateProfilesTable(); 140 + resolve(); 141 + } catch (migrationErr) { 142 + reject(migrationErr); 143 + } 73 144 }); 74 145 }); 75 146 }
+25 -17
src/firehose/decoder.ts
··· 1 - import { decode as decodeCBOR } from "@atcute/cbor"; 1 + import { decodeFirst } from "@atcute/cbor"; 2 2 import { logger } from "../logger/index.js"; 3 3 4 4 export interface LabelEvent { ··· 14 14 } 15 15 16 16 export interface FirehoseMessage { 17 - op: number; 17 + op?: number; 18 18 t?: string; 19 + seq?: number; 20 + labels?: LabelEvent[]; 19 21 [key: string]: any; 20 22 } 21 23 22 24 export function decodeFirehoseMessage(data: Buffer): FirehoseMessage | null { 23 25 try { 24 - const decoded = decodeCBOR(data); 25 - return decoded as FirehoseMessage; 26 - } catch (error) { 27 - logger.error({ error }, "Failed to decode CBOR message"); 28 - return null; 29 - } 30 - } 26 + const buffer = new Uint8Array(data); 27 + const [header, remainder] = decodeFirst(buffer); 28 + const [body] = decodeFirst(remainder); 31 29 32 - export function extractLabelFromMessage(message: FirehoseMessage): LabelEvent | null { 33 - if (!message || message.op !== 1) { 30 + return body as FirehoseMessage; 31 + } catch (err) { 32 + logger.error( 33 + { 34 + err: err instanceof Error ? err.message : String(err), 35 + errorStack: err instanceof Error ? err.stack : undefined, 36 + dataLength: data.length, 37 + dataPreview: data.slice(0, 50).toString("hex") 38 + }, 39 + "Failed to decode CBOR message" 40 + ); 34 41 return null; 35 42 } 43 + } 36 44 37 - if (message.t !== "#labels") { 38 - return null; 45 + export function extractLabelsFromMessage(message: FirehoseMessage): LabelEvent[] { 46 + if (!message) { 47 + return []; 39 48 } 40 49 41 - const labels = message.labels; 42 - if (!Array.isArray(labels) || labels.length === 0) { 43 - return null; 50 + if (message.labels && Array.isArray(message.labels)) { 51 + return message.labels; 44 52 } 45 53 46 - return labels[0] as LabelEvent; 54 + return []; 47 55 } 48 56 49 57 export function validateLabel(label: LabelEvent): boolean {
+8 -10
src/firehose/subscriber.ts
··· 4 4 import { logger } from "../logger/index.js"; 5 5 import { 6 6 decodeFirehoseMessage, 7 - extractLabelFromMessage, 7 + extractLabelsFromMessage, 8 8 validateLabel, 9 9 LabelEvent, 10 10 } from "./decoder.js"; ··· 86 86 return; 87 87 } 88 88 89 - const label = extractLabelFromMessage(message); 90 - if (!label) return; 91 - 92 - if (!validateLabel(label)) return; 93 - 94 - if (!this.filter.shouldCapture(label)) return; 95 - 96 - this.emit("label", label); 97 - 98 89 if (message.seq) { 99 90 await this.saveCursor(message.seq); 91 + } 92 + 93 + const labels = extractLabelsFromMessage(message); 94 + for (const label of labels) { 95 + if (!validateLabel(label)) continue; 96 + if (!this.filter.shouldCapture(label)) continue; 97 + this.emit("label", label); 100 98 } 101 99 } catch (error) { 102 100 logger.error({ error }, "Error processing message");
-165
src/firehose.ts
··· 1 - import { decode, decodeFirst } from "@atcute/cbor"; 2 - import { readFileSync, writeFileSync } from "fs"; 3 - import { WSS_URL } from "./config.js"; 4 - import { logger } from "./logger.js"; 5 - import { LabelEvent } from "./types.js"; 6 - 7 - let ws: WebSocket | null = null; 8 - let reconnectTimeout: NodeJS.Timeout | null = null; 9 - let reconnectAttempts = 0; 10 - let cursor: string = ""; 11 - const MAX_RECONNECT_DELAY = 60000; 12 - const INITIAL_RECONNECT_DELAY = 1000; 13 - const CURSOR_FILE = "./cursor.txt"; 14 - 15 - function getReconnectDelay(): number { 16 - const delay = Math.min( 17 - INITIAL_RECONNECT_DELAY * Math.pow(2, reconnectAttempts), 18 - MAX_RECONNECT_DELAY, 19 - ); 20 - reconnectAttempts++; 21 - return delay; 22 - } 23 - 24 - async function handleLabelEvent(event: LabelEvent): Promise<void> { 25 - // Placeholder for hydration logic 26 - logger.info({ event }, "Received label event"); 27 - } 28 - 29 - function saveCursor(seq: string): void { 30 - try { 31 - cursor = seq; 32 - writeFileSync(CURSOR_FILE, seq, "utf8"); 33 - logger.debug({ cursor: seq }, "Saved cursor"); 34 - } catch (err) { 35 - logger.warn({ err }, "Failed to save cursor"); 36 - } 37 - } 38 - 39 - function loadCursor(): string { 40 - try { 41 - const saved = readFileSync(CURSOR_FILE, "utf8").trim(); 42 - logger.info({ cursor: saved }, "Loaded cursor from file"); 43 - return saved; 44 - } catch (err) { 45 - logger.info("No cursor file found, starting from live"); 46 - return ""; 47 - } 48 - } 49 - 50 - function parseMessage(data: any): void { 51 - try { 52 - let buffer: Uint8Array; 53 - 54 - if (data instanceof ArrayBuffer) { 55 - buffer = new Uint8Array(data); 56 - } else if (data instanceof Uint8Array) { 57 - buffer = data; 58 - } else if (typeof data === "string") { 59 - try { 60 - const parsed = JSON.parse(data); 61 - if (parsed.seq) { 62 - saveCursor(parsed.seq.toString()); 63 - } 64 - processLabels(parsed); 65 - return; 66 - } catch { 67 - logger.warn("Received non-JSON string message"); 68 - return; 69 - } 70 - } else { 71 - processLabels(data); 72 - return; 73 - } 74 - 75 - const [header, remainder] = decodeFirst(buffer); 76 - const [body] = decodeFirst(remainder); 77 - 78 - if (body && typeof body === "object" && "seq" in body) { 79 - saveCursor(body.seq.toString()); 80 - } 81 - 82 - processLabels(body); 83 - } catch (err) { 84 - logger.error({ err }, "Error parsing message"); 85 - } 86 - } 87 - 88 - function processLabels(parsed: any): void { 89 - if (parsed.labels && Array.isArray(parsed.labels)) { 90 - for (const label of parsed.labels) { 91 - handleLabelEvent(label as LabelEvent); 92 - } 93 - } else if (parsed.label) { 94 - handleLabelEvent(parsed.label as LabelEvent); 95 - } else { 96 - logger.debug({ parsed }, "Message does not contain label data"); 97 - } 98 - } 99 - 100 - function connect(): void { 101 - if ( 102 - ws && 103 - (ws.readyState === WebSocket.CONNECTING || ws.readyState === WebSocket.OPEN) 104 - ) { 105 - logger.debug("WebSocket already connected or connecting"); 106 - return; 107 - } 108 - 109 - const url = cursor ? `${WSS_URL}?cursor=${cursor}` : WSS_URL; 110 - logger.info({ url, cursor }, "Connecting to firehose"); 111 - 112 - ws = new WebSocket(url); 113 - 114 - ws.addEventListener("open", () => { 115 - logger.info("Firehose connection established"); 116 - reconnectAttempts = 0; 117 - }); 118 - 119 - ws.addEventListener("message", (event) => { 120 - parseMessage(event.data); 121 - }); 122 - 123 - ws.addEventListener("error", (event) => { 124 - logger.error({ event }, "Firehose WebSocket error"); 125 - }); 126 - 127 - ws.addEventListener("close", (event) => { 128 - logger.warn( 129 - { code: event.code, reason: event.reason }, 130 - "Firehose connection closed", 131 - ); 132 - scheduleReconnect(); 133 - }); 134 - } 135 - 136 - function scheduleReconnect(): void { 137 - if (reconnectTimeout) { 138 - clearTimeout(reconnectTimeout); 139 - } 140 - 141 - const delay = getReconnectDelay(); 142 - logger.info({ delay, attempt: reconnectAttempts }, "Scheduling reconnect"); 143 - 144 - reconnectTimeout = setTimeout(() => { 145 - connect(); 146 - }, delay); 147 - } 148 - 149 - export function startFirehose(): void { 150 - cursor = loadCursor(); 151 - connect(); 152 - } 153 - 154 - export function stopFirehose(): void { 155 - if (reconnectTimeout) { 156 - clearTimeout(reconnectTimeout); 157 - reconnectTimeout = null; 158 - } 159 - 160 - if (ws) { 161 - logger.info("Closing firehose connection"); 162 - ws.close(); 163 - ws = null; 164 - } 165 - }
+49 -6
src/hydration/posts.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { PostsRepository } from "../database/posts.repository.js"; 4 + import { BlobProcessor } from "../blobs/processor.js"; 5 + import { pRateLimit } from "p-ratelimit"; 6 + import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js"; 4 7 import { logger } from "../logger/index.js"; 5 8 import { config } from "../config/index.js"; 6 9 7 10 export class PostHydrationService { 8 11 private agent: AtpAgent; 9 12 private postsRepo: PostsRepository; 13 + private blobProcessor: BlobProcessor; 14 + private limit: ReturnType<typeof pRateLimit>; 10 15 11 16 constructor(db: Database) { 12 17 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 13 18 this.postsRepo = new PostsRepository(db); 19 + this.blobProcessor = new BlobProcessor(db, this.agent); 20 + this.limit = pRateLimit({ 21 + interval: 300000, 22 + rate: 3000, 23 + concurrency: 48, 24 + maxDelay: 60000, 25 + }); 14 26 } 15 27 16 28 async initialize(): Promise<void> { ··· 42 54 43 55 const [did, collection, rkey] = uriParts; 44 56 45 - const response = await this.agent.com.atproto.repo.getRecord({ 46 - repo: did, 47 - collection, 48 - rkey, 49 - }); 57 + const response = await this.limit(() => 58 + withRetry( 59 + async () => { 60 + return await this.agent.com.atproto.repo.getRecord({ 61 + repo: did, 62 + collection, 63 + rkey, 64 + }); 65 + }, 66 + { 67 + maxAttempts: 3, 68 + initialDelay: 1000, 69 + maxDelay: 10000, 70 + backoffMultiplier: 2, 71 + retryableErrors: [ 72 + isRateLimitError, 73 + isNetworkError, 74 + isServerError, 75 + ], 76 + } 77 + ) 78 + ); 50 79 51 80 if (!response.success || !response.data.value) { 52 81 logger.warn({ uri }, "Failed to fetch post record"); ··· 57 86 58 87 const isReply = !!record.reply; 59 88 89 + const embeds = record.embed ? [record.embed] : null; 90 + 60 91 await this.postsRepo.insert({ 61 92 uri, 62 93 did, 63 94 text: record.text || "", 64 95 facets: record.facets || null, 65 - embeds: record.embed || null, 96 + embeds, 66 97 langs: record.langs || null, 67 98 tags: record.tags || null, 68 99 created_at: record.createdAt, ··· 70 101 }); 71 102 72 103 logger.info({ uri }, "Post hydrated successfully"); 104 + 105 + if (embeds) { 106 + try { 107 + await this.blobProcessor.processBlobs(uri, embeds); 108 + } catch (error) { 109 + logger.warn({ error, uri }, "Failed to process blobs for post"); 110 + } 111 + } 73 112 } catch (error) { 113 + if (isRecordNotFoundError(error)) { 114 + logger.warn({ uri }, "Post record not found, skipping"); 115 + return; 116 + } 74 117 logger.error({ error, uri }, "Failed to hydrate post"); 75 118 throw error; 76 119 }
+191 -9
src/hydration/profiles.service.ts
··· 1 1 import { AtpAgent } from "@atproto/api"; 2 2 import { Database } from "duckdb"; 3 3 import { ProfilesRepository } from "../database/profiles.repository.js"; 4 + import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js"; 5 + import { computeBlobHashes } from "../blobs/hasher.js"; 6 + import { LocalBlobStorage } from "../blobs/storage/local.js"; 7 + import { S3BlobStorage } from "../blobs/storage/s3.js"; 8 + import { BlobStorage } from "../blobs/processor.js"; 9 + import { pRateLimit } from "p-ratelimit"; 10 + import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js"; 4 11 import { logger } from "../logger/index.js"; 5 12 import { config } from "../config/index.js"; 6 13 7 14 export class ProfileHydrationService { 8 15 private agent: AtpAgent; 9 16 private profilesRepo: ProfilesRepository; 17 + private profileBlobsRepo: ProfileBlobsRepository; 18 + private storage: BlobStorage | null = null; 19 + private limit: ReturnType<typeof pRateLimit>; 10 20 11 21 constructor(db: Database) { 12 22 this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` }); 13 23 this.profilesRepo = new ProfilesRepository(db); 24 + this.profileBlobsRepo = new ProfileBlobsRepository(db); 25 + 26 + if (config.blobs.hydrateBlobs) { 27 + if (config.blobs.storage.type === "s3") { 28 + this.storage = new S3BlobStorage( 29 + config.blobs.storage.s3Bucket!, 30 + config.blobs.storage.s3Region! 31 + ); 32 + } else { 33 + this.storage = new LocalBlobStorage( 34 + config.blobs.storage.localPath 35 + ); 36 + } 37 + } 38 + 39 + this.limit = pRateLimit({ 40 + interval: 300000, 41 + rate: 3000, 42 + concurrency: 48, 43 + maxDelay: 60000, 44 + }); 14 45 } 15 46 16 47 async initialize(): Promise<void> { ··· 29 60 async hydrateProfile(did: string): Promise<void> { 30 61 try { 31 62 const existingProfile = await this.profilesRepo.findByDid(did); 32 - if (existingProfile) { 33 - logger.debug({ did }, "Profile already hydrated, skipping"); 63 + const needsRehydration = existingProfile && (existingProfile.avatar_cid === null || existingProfile.banner_cid === null); 64 + 65 + if (existingProfile && !needsRehydration) { 66 + logger.debug({ did }, "Profile already fully hydrated, skipping"); 34 67 return; 35 68 } 36 69 37 - const profileResponse = await this.agent.com.atproto.repo.getRecord({ 38 - repo: did, 39 - collection: "app.bsky.actor.profile", 40 - rkey: "self", 41 - }); 70 + if (needsRehydration) { 71 + logger.debug({ did }, "Re-hydrating profile to fetch avatar/banner CIDs"); 72 + } 73 + 74 + const profileResponse = await this.limit(() => 75 + withRetry( 76 + async () => { 77 + return await this.agent.com.atproto.repo.getRecord({ 78 + repo: did, 79 + collection: "app.bsky.actor.profile", 80 + rkey: "self", 81 + }); 82 + }, 83 + { 84 + maxAttempts: 3, 85 + initialDelay: 1000, 86 + maxDelay: 10000, 87 + backoffMultiplier: 2, 88 + retryableErrors: [ 89 + isRateLimitError, 90 + isNetworkError, 91 + isServerError, 92 + ], 93 + } 94 + ) 95 + ); 42 96 43 97 let displayName: string | undefined; 44 98 let description: string | undefined; 99 + let avatarCid: string | undefined; 100 + let bannerCid: string | undefined; 45 101 46 102 if (profileResponse.success && profileResponse.data.value) { 47 103 const record = profileResponse.data.value as any; 48 104 displayName = record.displayName; 49 105 description = record.description; 106 + 107 + if (record.avatar?.ref) { 108 + avatarCid = record.avatar.ref.toString(); 109 + } else { 110 + avatarCid = ""; 111 + } 112 + 113 + if (record.banner?.ref) { 114 + bannerCid = record.banner.ref.toString(); 115 + } else { 116 + bannerCid = ""; 117 + } 118 + 119 + logger.debug({ did, avatarCid, bannerCid, hasAvatar: !!record.avatar, hasBanner: !!record.banner }, "Extracted CIDs from profile record"); 50 120 } 51 121 52 - const profileLookup = await this.agent.getProfile({ actor: did }); 122 + const profileLookup = await this.limit(() => 123 + withRetry( 124 + async () => { 125 + return await this.agent.getProfile({ actor: did }); 126 + }, 127 + { 128 + maxAttempts: 3, 129 + initialDelay: 1000, 130 + maxDelay: 10000, 131 + backoffMultiplier: 2, 132 + retryableErrors: [ 133 + isRateLimitError, 134 + isNetworkError, 135 + isServerError, 136 + ], 137 + } 138 + ) 139 + ); 53 140 54 141 let handle: string | undefined; 55 142 if (profileLookup.success) { ··· 61 148 handle, 62 149 display_name: displayName, 63 150 description, 151 + avatar_cid: avatarCid, 152 + banner_cid: bannerCid, 64 153 }); 65 154 66 - logger.info({ did, handle }, "Profile hydrated successfully"); 155 + if (avatarCid && avatarCid !== "") { 156 + try { 157 + await this.processProfileBlob(did, avatarCid, "avatar"); 158 + } catch (error) { 159 + logger.warn({ error, did, avatarCid }, "Failed to process avatar blob"); 160 + } 161 + } 162 + 163 + if (bannerCid && bannerCid !== "") { 164 + try { 165 + await this.processProfileBlob(did, bannerCid, "banner"); 166 + } catch (error) { 167 + logger.warn({ error, did, bannerCid }, "Failed to process banner blob"); 168 + } 169 + } 170 + 171 + logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully"); 67 172 } catch (error) { 173 + if (isRecordNotFoundError(error)) { 174 + logger.warn({ did }, "Profile record not found, skipping"); 175 + return; 176 + } 68 177 logger.error({ error, did }, "Failed to hydrate profile"); 69 178 throw error; 70 179 } 180 + } 181 + 182 + private async resolvePds(did: string): Promise<string | null> { 183 + try { 184 + const didDocResponse = await fetch(`${config.plc.endpoint}/${did}`); 185 + if (!didDocResponse.ok) { 186 + logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document"); 187 + return null; 188 + } 189 + 190 + const didDoc = await didDocResponse.json(); 191 + const pdsService = didDoc.service?.find((s: any) => 192 + s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer" 193 + ); 194 + 195 + if (!pdsService?.serviceEndpoint) { 196 + logger.warn({ did }, "No PDS endpoint found in DID document"); 197 + return null; 198 + } 199 + 200 + return pdsService.serviceEndpoint; 201 + } catch (error) { 202 + logger.error({ error, did }, "Failed to resolve PDS from DID"); 203 + return null; 204 + } 205 + } 206 + 207 + private async processProfileBlob( 208 + did: string, 209 + cid: string, 210 + type: "avatar" | "banner" 211 + ): Promise<void> { 212 + const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type); 213 + 214 + if (latestBlob && latestBlob.blob_cid === cid) { 215 + logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping"); 216 + return; 217 + } 218 + 219 + const pdsEndpoint = await this.resolvePds(did); 220 + if (!pdsEndpoint) { 221 + logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint"); 222 + return; 223 + } 224 + 225 + const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; 226 + const blobResponse = await fetch(blobUrl); 227 + 228 + if (!blobResponse.ok) { 229 + logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS"); 230 + return; 231 + } 232 + 233 + const blobData = Buffer.from(await blobResponse.arrayBuffer()); 234 + 235 + let storagePath: string | undefined; 236 + if (this.storage && config.blobs.hydrateBlobs) { 237 + storagePath = await this.storage.store(cid, blobData, "image/jpeg"); 238 + } 239 + 240 + const hashes = await computeBlobHashes(blobData, "image/jpeg"); 241 + 242 + await this.profileBlobsRepo.insert({ 243 + did, 244 + blob_type: type, 245 + blob_cid: cid, 246 + sha256: hashes.sha256, 247 + phash: hashes.phash, 248 + storage_path: storagePath, 249 + mimetype: "image/jpeg", 250 + }); 251 + 252 + logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully"); 71 253 } 72 254 }
+93
src/utils/rate-limit.ts
··· 1 + import { logger } from "../logger/index.js"; 2 + 3 + export interface RateLimiterConfig { 4 + maxTokens: number; 5 + refillRate: number; 6 + refillInterval: number; 7 + } 8 + 9 + export class RateLimiter { 10 + private tokens: number; 11 + private lastRefill: number; 12 + private config: RateLimiterConfig; 13 + 14 + constructor(config: RateLimiterConfig) { 15 + this.config = config; 16 + this.tokens = config.maxTokens; 17 + this.lastRefill = Date.now(); 18 + } 19 + 20 + private refill(): void { 21 + const now = Date.now(); 22 + const elapsed = now - this.lastRefill; 23 + const intervals = Math.floor(elapsed / this.config.refillInterval); 24 + 25 + if (intervals > 0) { 26 + this.tokens = Math.min( 27 + this.config.maxTokens, 28 + this.tokens + intervals * this.config.refillRate 29 + ); 30 + this.lastRefill = now; 31 + } 32 + } 33 + 34 + async acquire(tokens: number = 1): Promise<void> { 35 + while (true) { 36 + this.refill(); 37 + 38 + if (this.tokens >= tokens) { 39 + this.tokens -= tokens; 40 + return; 41 + } 42 + 43 + const waitTime = this.config.refillInterval; 44 + logger.debug( 45 + { tokens, available: this.tokens, waitTime }, 46 + "Rate limit reached, waiting" 47 + ); 48 + 49 + await new Promise((resolve) => setTimeout(resolve, waitTime)); 50 + } 51 + } 52 + 53 + getAvailableTokens(): number { 54 + this.refill(); 55 + return this.tokens; 56 + } 57 + 58 + reset(): void { 59 + this.tokens = this.config.maxTokens; 60 + this.lastRefill = Date.now(); 61 + } 62 + } 63 + 64 + export class MultiRateLimiter { 65 + private limiters: Map<string, RateLimiter> = new Map(); 66 + 67 + constructor( 68 + private defaultConfig: RateLimiterConfig 69 + ) {} 70 + 71 + setLimiter(key: string, config: RateLimiterConfig): void { 72 + this.limiters.set(key, new RateLimiter(config)); 73 + } 74 + 75 + async acquire(key: string, tokens: number = 1): Promise<void> { 76 + let limiter = this.limiters.get(key); 77 + if (!limiter) { 78 + limiter = new RateLimiter(this.defaultConfig); 79 + this.limiters.set(key, limiter); 80 + } 81 + await limiter.acquire(tokens); 82 + } 83 + 84 + reset(key?: string): void { 85 + if (key) { 86 + this.limiters.get(key)?.reset(); 87 + } else { 88 + for (const limiter of this.limiters.values()) { 89 + limiter.reset(); 90 + } 91 + } 92 + } 93 + }
+102
src/utils/retry.ts
··· 1 + import { logger } from "../logger/index.js"; 2 + 3 + export interface RetryConfig { 4 + maxAttempts: number; 5 + initialDelay: number; 6 + maxDelay: number; 7 + backoffMultiplier: number; 8 + retryableErrors?: ((error: any) => boolean)[]; 9 + } 10 + 11 + export class RetryError extends Error { 12 + constructor( 13 + message: string, 14 + public attempts: number, 15 + public lastError: Error 16 + ) { 17 + super(message); 18 + this.name = "RetryError"; 19 + } 20 + } 21 + 22 + export async function withRetry<T>( 23 + fn: () => Promise<T>, 24 + config: Partial<RetryConfig> = {} 25 + ): Promise<T> { 26 + const { 27 + maxAttempts = 3, 28 + initialDelay = 1000, 29 + maxDelay = 30000, 30 + backoffMultiplier = 2, 31 + retryableErrors = [(error) => true], 32 + } = config; 33 + 34 + let lastError: Error; 35 + let delay = initialDelay; 36 + 37 + for (let attempt = 1; attempt <= maxAttempts; attempt++) { 38 + try { 39 + return await fn(); 40 + } catch (error) { 41 + lastError = error instanceof Error ? error : new Error(String(error)); 42 + 43 + const isRetryable = retryableErrors.some((check) => check(lastError)); 44 + 45 + if (!isRetryable || attempt >= maxAttempts) { 46 + throw new RetryError( 47 + `Operation failed after ${attempt} attempts`, 48 + attempt, 49 + lastError 50 + ); 51 + } 52 + 53 + logger.warn( 54 + { 55 + attempt, 56 + maxAttempts, 57 + delay, 58 + error: lastError.message, 59 + }, 60 + "Retrying after error" 61 + ); 62 + 63 + await new Promise((resolve) => setTimeout(resolve, delay)); 64 + delay = Math.min(delay * backoffMultiplier, maxDelay); 65 + } 66 + } 67 + 68 + throw new RetryError( 69 + `Operation failed after ${maxAttempts} attempts`, 70 + maxAttempts, 71 + lastError! 72 + ); 73 + } 74 + 75 + export function isRateLimitError(error: any): boolean { 76 + return ( 77 + error?.status === 429 || 78 + error?.message?.toLowerCase().includes("rate limit") || 79 + error?.message?.toLowerCase().includes("too many requests") 80 + ); 81 + } 82 + 83 + export function isNetworkError(error: any): boolean { 84 + return ( 85 + error?.code === "ECONNRESET" || 86 + error?.code === "ENOTFOUND" || 87 + error?.code === "ETIMEDOUT" || 88 + error?.message?.toLowerCase().includes("network") || 89 + error?.message?.toLowerCase().includes("timeout") 90 + ); 91 + } 92 + 93 + export function isServerError(error: any): boolean { 94 + return error?.status >= 500 && error?.status < 600; 95 + } 96 + 97 + export function isRecordNotFoundError(error: any): boolean { 98 + return ( 99 + error?.error === "RecordNotFound" || 100 + error?.message?.includes("RecordNotFound") 101 + ); 102 + }
+27 -1
tests/integration/database.test.ts
··· 48 48 did TEXT PRIMARY KEY, 49 49 handle TEXT, 50 50 display_name TEXT, 51 - description TEXT 51 + description TEXT, 52 + avatar_cid TEXT, 53 + banner_cid TEXT 52 54 ); 53 55 54 56 CREATE TABLE IF NOT EXISTS blobs ( ··· 146 148 expect(found).not.toBeNull(); 147 149 expect(found?.did).toBe("did:plc:testuser"); 148 150 }); 151 + 152 + test("should insert and retrieve profile with avatar and banner", async () => { 153 + const profile = { 154 + did: "did:plc:testuser2", 155 + handle: "testuser2.bsky.social", 156 + display_name: "Test User 2", 157 + description: "A test user with avatar", 158 + avatar_cid: "bafyavatartest", 159 + banner_cid: "bafybannertest", 160 + }; 161 + 162 + await profilesRepo.insert(profile); 163 + const found = await profilesRepo.findByDid(profile.did); 164 + 165 + expect(found).not.toBeNull(); 166 + expect(found?.avatar_cid).toBe("bafyavatartest"); 167 + expect(found?.banner_cid).toBe("bafybannertest"); 168 + }); 149 169 }); 150 170 151 171 describe("BlobsRepository", () => { ··· 174 194 test("should find blobs by pHash", async () => { 175 195 const found = await blobsRepo.findByPhash("deadbeef"); 176 196 expect(found.length).toBeGreaterThan(0); 197 + }); 198 + 199 + test("should find blob by CID", async () => { 200 + const found = await blobsRepo.findByCid("bafytest123"); 201 + expect(found).not.toBeNull(); 202 + expect(found?.sha256).toBe("abc123def456"); 177 203 }); 178 204 }); 179 205 });
+25 -17
tests/unit/decoder.test.ts
··· 1 1 import { describe, test, expect } from "bun:test"; 2 2 import { 3 - extractLabelFromMessage, 3 + extractLabelsFromMessage, 4 4 validateLabel, 5 5 LabelEvent, 6 6 } from "../../src/firehose/decoder.js"; 7 7 8 8 describe("Firehose Decoder", () => { 9 - describe("extractLabelFromMessage", () => { 10 - test("should extract label from valid message", () => { 9 + describe("extractLabelsFromMessage", () => { 10 + test("should extract labels from valid message", () => { 11 11 const message = { 12 12 op: 1, 13 13 t: "#labels", ··· 21 21 ], 22 22 }; 23 23 24 - const label = extractLabelFromMessage(message); 24 + const labels = extractLabelsFromMessage(message); 25 25 26 - expect(label).not.toBeNull(); 27 - expect(label?.val).toBe("spam"); 28 - expect(label?.src).toBe("did:plc:labeler"); 26 + expect(labels).toHaveLength(1); 27 + expect(labels[0].val).toBe("spam"); 28 + expect(labels[0].src).toBe("did:plc:labeler"); 29 29 }); 30 30 31 - test("should return null for non-label messages", () => { 31 + test("should return empty array for non-label messages", () => { 32 32 const message = { 33 33 op: 1, 34 34 t: "#info", 35 35 }; 36 36 37 - const label = extractLabelFromMessage(message); 37 + const labels = extractLabelsFromMessage(message); 38 38 39 - expect(label).toBeNull(); 39 + expect(labels).toHaveLength(0); 40 40 }); 41 41 42 - test("should return null for messages with wrong op", () => { 42 + test("should extract all labels from message with multiple labels", () => { 43 43 const message = { 44 - op: 0, 44 + op: 1, 45 45 t: "#labels", 46 46 labels: [ 47 47 { ··· 50 50 val: "spam", 51 51 cts: "2025-01-15T12:00:00Z", 52 52 }, 53 + { 54 + src: "did:plc:labeler", 55 + uri: "at://did:plc:user/app.bsky.feed.post/456", 56 + val: "csam", 57 + cts: "2025-01-15T12:01:00Z", 58 + }, 53 59 ], 54 60 }; 55 61 56 - const label = extractLabelFromMessage(message); 62 + const labels = extractLabelsFromMessage(message); 57 63 58 - expect(label).toBeNull(); 64 + expect(labels).toHaveLength(2); 65 + expect(labels[0].val).toBe("spam"); 66 + expect(labels[1].val).toBe("csam"); 59 67 }); 60 68 61 - test("should return null for messages with empty labels array", () => { 69 + test("should return empty array for messages with empty labels array", () => { 62 70 const message = { 63 71 op: 1, 64 72 t: "#labels", 65 73 labels: [], 66 74 }; 67 75 68 - const label = extractLabelFromMessage(message); 76 + const labels = extractLabelsFromMessage(message); 69 77 70 - expect(label).toBeNull(); 78 + expect(labels).toHaveLength(0); 71 79 }); 72 80 }); 73 81
+220
tests/unit/subscriber.test.ts
··· 1 + import { describe, test, expect, beforeEach, afterEach, mock } from "bun:test"; 2 + import { FirehoseSubscriber } from "../../src/firehose/subscriber.js"; 3 + import { EventEmitter } from "events"; 4 + import WebSocket from "ws"; 5 + 6 + // Mock WebSocket class 7 + class MockWebSocket extends EventEmitter { 8 + constructor(url: string) { 9 + super(); 10 + } 11 + close() {} 12 + } 13 + 14 + // Mock the entire 'ws' module 15 + mock.module("ws", () => ({ 16 + default: MockWebSocket, 17 + })); 18 + 19 + describe("FirehoseSubscriber", () => { 20 + let subscriber: FirehoseSubscriber; 21 + let mockWsInstance: MockWebSocket; 22 + 23 + beforeEach(() => { 24 + subscriber = new FirehoseSubscriber(); 25 + // Mock the connect method to control the WebSocket instance 26 + (subscriber as any).connect = () => { 27 + const url = new URL("ws://localhost:1234"); 28 + if ((subscriber as any).cursor !== null) { 29 + url.searchParams.set("cursor", (subscriber as any).cursor.toString()); 30 + } 31 + const ws = new WebSocket(url.toString()); 32 + (subscriber as any).ws = ws; 33 + mockWsInstance = ws as any; 34 + 35 + ws.on("open", () => { 36 + subscriber.emit("connected"); 37 + }); 38 + ws.on("message", async (data: Buffer) => { 39 + try { 40 + const message = JSON.parse(data.toString()); 41 + if (message.seq) { 42 + await (subscriber as any).saveCursor(message.seq); 43 + } 44 + if (message.t === "#labels") { 45 + for (const label of message.labels) { 46 + subscriber.emit("label", label); 47 + } 48 + } 49 + } catch (error) { 50 + subscriber.emit("error", error); 51 + } 52 + }); 53 + ws.on("close", () => { 54 + (subscriber as any).ws = null; 55 + subscriber.emit("disconnected"); 56 + if ((subscriber as any).shouldReconnect) { 57 + (subscriber as any).scheduleReconnect(); 58 + } 59 + }); 60 + ws.on("error", (error) => { 61 + subscriber.emit("error", error); 62 + }); 63 + }; 64 + }); 65 + 66 + afterEach(() => { 67 + subscriber.stop(); 68 + }); 69 + 70 + test("should attempt to connect on start", async (done) => { 71 + subscriber.on("connected", () => { 72 + done(); 73 + }); 74 + await subscriber.start(); 75 + mockWsInstance.emit("open"); 76 + }); 77 + 78 + test("should emit label event on label for post", async (done) => { 79 + subscriber.on("label", (label) => { 80 + expect(label.uri).toBe("at://did:plc:user/app.bsky.feed.post/123"); 81 + done(); 82 + }); 83 + await subscriber.start(); 84 + mockWsInstance.emit( 85 + "message", 86 + Buffer.from( 87 + JSON.stringify({ 88 + op: 1, 89 + t: "#labels", 90 + labels: [ 91 + { 92 + src: "did:plc:labeler", 93 + uri: "at://did:plc:user/app.bsky.feed.post/123", 94 + val: "spam", 95 + cts: "2025-01-15T12:00:00Z", 96 + }, 97 + ], 98 + }) 99 + ) 100 + ); 101 + }); 102 + 103 + test("should emit label event on label for profile", async (done) => { 104 + subscriber.on("label", (label) => { 105 + expect(label.uri).toBe("did:plc:user"); 106 + done(); 107 + }); 108 + await subscriber.start(); 109 + mockWsInstance.emit( 110 + "message", 111 + Buffer.from( 112 + JSON.stringify({ 113 + op: 1, 114 + t: "#labels", 115 + labels: [ 116 + { 117 + src: "did:plc:labeler", 118 + uri: "did:plc:user", 119 + val: "spam", 120 + cts: "2025-01-15T12:00:00Z", 121 + }, 122 + ], 123 + }) 124 + ) 125 + ); 126 + }); 127 + 128 + test("should handle multiple labels in one message", async () => { 129 + let labelCount = 0; 130 + subscriber.on("label", () => { 131 + labelCount++; 132 + }); 133 + await subscriber.start(); 134 + mockWsInstance.emit( 135 + "message", 136 + Buffer.from( 137 + JSON.stringify({ 138 + op: 1, 139 + t: "#labels", 140 + labels: [ 141 + { 142 + src: "did:plc:labeler", 143 + uri: "at://did:plc:user/app.bsky.feed.post/123", 144 + val: "spam", 145 + cts: "2025-01-15T12:00:00Z", 146 + }, 147 + { 148 + src: "did:plc:labeler", 149 + uri: "did:plc:user", 150 + val: "spam", 151 + cts: "2025-01-15T12:00:00Z", 152 + }, 153 + ], 154 + }) 155 + ) 156 + ); 157 + expect(labelCount).toBe(2); 158 + }); 159 + 160 + test("should attempt to reconnect on close", (done) => { 161 + let connectAttempts = 0; 162 + (subscriber as any).connect = () => { 163 + connectAttempts++; 164 + if (connectAttempts > 1) { 165 + done(); 166 + } 167 + const url = new URL("ws://localhost:1234"); 168 + const ws = new WebSocket(url.toString()); 169 + (subscriber as any).ws = ws; 170 + mockWsInstance = ws as any; 171 + ws.on("close", () => { 172 + (subscriber as any).ws = null; 173 + subscriber.emit("disconnected"); 174 + if ((subscriber as any).shouldReconnect) { 175 + (subscriber as any).scheduleReconnect(); 176 + } 177 + }); 178 + }; 179 + 180 + subscriber.start(); 181 + mockWsInstance.emit("close"); 182 + }); 183 + 184 + test("should stop reconnecting after stop() is called", async (done) => { 185 + let connectAttempts = 0; 186 + (subscriber as any).connect = () => { 187 + connectAttempts++; 188 + const url = new URL("ws://localhost:1234"); 189 + const ws = new WebSocket(url.toString()); 190 + (subscriber as any).ws = ws; 191 + mockWsInstance = ws as any; 192 + ws.on("close", () => { 193 + (subscriber as any).ws = null; 194 + subscriber.emit("disconnected"); 195 + if ((subscriber as any).shouldReconnect) { 196 + (subscriber as any).scheduleReconnect(); 197 + } 198 + }); 199 + }; 200 + 201 + await subscriber.start(); 202 + subscriber.stop(); 203 + mockWsInstance.emit("close"); 204 + 205 + setTimeout(() => { 206 + expect(connectAttempts).toBe(1); 207 + done(); 208 + }, 2000); 209 + }); 210 + 211 + test("should increase backoff delay on multiple reconnects", () => { 212 + subscriber.start(); 213 + (subscriber as any).reconnectAttempts = 0; 214 + (subscriber as any).scheduleReconnect(); 215 + const initialBackoff = (subscriber as any).reconnectAttempts; 216 + (subscriber as any).scheduleReconnect(); 217 + const secondBackoff = (subscriber as any).reconnectAttempts; 218 + expect(secondBackoff).toBeGreaterThan(initialBackoff); 219 + }); 220 + });