+4
-1
.claude/settings.local.json
+4
-1
.claude/settings.local.json
+3
.env.example
+3
.env.example
···
6
6
PDS=bsky.social
7
7
WSS_URL=wss://your-labeler-service.com/xrpc/com.atproto.label.subscribeLabels
8
8
9
+
# PLC Directory (for DID resolution)
10
+
PLC_ENDPOINT=https://plc.wtf
11
+
9
12
# Blob & Image Handling
10
13
HYDRATE_BLOBS=false # Set to true to download images/videos
11
14
BLOB_STORAGE_TYPE=local # 'local' or 's3'
+21
LICENSE
+21
LICENSE
···
1
+
MIT License
2
+
3
+
Copyright (c) 2025 scarnecchia
4
+
5
+
Permission is hereby granted, free of charge, to any person obtaining a copy
6
+
of this software and associated documentation files (the "Software"), to deal
7
+
in the Software without restriction, including without limitation the rights
8
+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
+
copies of the Software, and to permit persons to whom the Software is
10
+
furnished to do so, subject to the following conditions:
11
+
12
+
The above copyright notice and this permission notice shall be included in all
13
+
copies or substantial portions of the Software.
14
+
15
+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
+
SOFTWARE.
+32
-13
README.md
+32
-13
README.md
···
6
6
7
7
- **Real-time Label Capture**: Subscribe to any Bluesky labeler's firehose via WebSocket
8
8
- **Automatic Content Hydration**: Fetch full post records and user profiles for labeled content
9
+
- **Blob Processing**: SHA-256 and perceptual hashing for images/videos with optional download
9
10
- **Intelligent Filtering**: Optionally filter labels by type to capture only what you need
11
+
- **Rate Limiting**: Respects Bluesky API limits (3000 req/5min) with p-ratelimit
12
+
- **Retry Logic**: Automatic retry with exponential backoff for transient failures
10
13
- **Cursor Persistence**: Resume from where you left off after restarts
11
-
- **Automatic Reconnection**: Exponential backoff reconnection (1s-30s) for stability
14
+
- **Automatic Reconnection**: Exponential backoff reconnection (1s-60s) for stability
12
15
- **DuckDB Storage**: Embedded analytics database optimized for ML pipelines
13
16
- **Docker Ready**: Containerized deployment with volume persistence
14
17
- **Type-Safe**: Full TypeScript implementation with Zod validation
···
18
21
```
19
22
Firehose โ Label Event โ Filter โ Store Label โ Hydration Queue
20
23
โ โ
21
-
DuckDB โ [Post/Profile Fetch]
24
+
DuckDB โ [Post/Profile Fetch] โ Blob Processing
25
+
โ
26
+
Hash + Store
22
27
```
23
28
24
29
### Components
25
30
26
31
- **Firehose Subscriber**: WebSocket client with DAG-CBOR decoding
27
32
- **Label Filter**: Configurable allow-list for label types
28
-
- **Hydration Services**: Automatic post and profile data fetching
33
+
- **Hydration Services**: Automatic post and profile data fetching with rate limiting
34
+
- **Blob Processor**: SHA-256 and perceptual hash computation with optional download
29
35
- **Hydration Queue**: Async queue with deduplication
36
+
- **Rate Limiter**: p-ratelimit enforcing 3000 requests per 5 minutes
37
+
- **Retry Logic**: Exponential backoff for transient failures
30
38
- **Repository Layer**: Clean database abstraction for all entities
31
39
32
40
## Quick Start
···
60
68
WSS_URL=wss://your-labeler.com/xrpc/com.atproto.label.subscribeLabels
61
69
62
70
# Optional: Filter specific labels
63
-
CAPTURE_LABELS=spam,hate-speech,csam
71
+
CAPTURE_LABELS=spam,hate-speech
64
72
65
73
# Logging
66
74
LOG_LEVEL=info
···
104
112
- `CAPTURE_LABELS`: Comma-separated list of label values to capture
105
113
- `DB_PATH`: Path to DuckDB database file (default: `./data/skywatch.duckdb`)
106
114
- `LOG_LEVEL`: Logging level (default: `info`)
107
-
- `HYDRATE_BLOBS`: Enable blob download (default: `false`) - Phase 4
108
-
- `BLOB_STORAGE_TYPE`: Storage backend for blobs (`local` or `s3`) - Phase 4
109
-
- `BLOB_STORAGE_PATH`: Local path for blob storage - Phase 4
115
+
- `HYDRATE_BLOBS`: Enable blob download (default: `false`)
116
+
- `BLOB_STORAGE_TYPE`: Storage backend for blobs (`local` or `s3`)
117
+
- `BLOB_STORAGE_PATH`: Local path for blob storage (default: `./data/blobs`)
110
118
111
-
### S3 Configuration (Phase 4, Optional)
119
+
### S3 Configuration (Optional)
112
120
113
121
- `S3_BUCKET`: S3 bucket name
114
122
- `S3_REGION`: AWS region
···
150
158
- `display_name`: Display name
151
159
- `description`: Bio/description
152
160
153
-
### Blobs Table (Phase 4)
161
+
### Blobs Table
154
162
Image and video blob metadata.
155
163
156
164
- `post_uri`: Associated post URI
···
165
173
Filter labels by providing a comma-separated list in `CAPTURE_LABELS`:
166
174
167
175
```env
168
-
CAPTURE_LABELS=spam,hate-speech,csam,scam
176
+
CAPTURE_LABELS=spam,hate-speech,scam
169
177
```
170
178
171
179
If not set, all labels are captured.
···
188
196
- `Received label`: Label captured and stored
189
197
- `Post hydrated successfully`: Post data fetched
190
198
- `Profile hydrated successfully`: Profile data fetched
199
+
- `Blob processed`: Blob hashed and optionally stored
200
+
201
+
## Rate Limiting
202
+
203
+
The service implements p-ratelimit to respect Bluesky's API limits:
204
+
- **Limit**: 3000 requests per 5 minutes per IP address
205
+
- **Concurrency**: Up to 48 concurrent requests
206
+
- **Backoff**: Automatic delays when approaching limits
207
+
- **Retry Logic**: Exponential backoff for rate limit errors (1s-10s)
191
208
192
209
## Development
193
210
···
196
213
```
197
214
skywatch-tail/
198
215
โโโ src/
216
+
โ โโโ blobs/ # Blob processing and storage
199
217
โ โโโ config/ # Environment validation
200
218
โ โโโ database/ # Schema and repositories
201
219
โ โโโ firehose/ # WebSocket subscriber
202
220
โ โโโ hydration/ # Content hydration services
203
221
โ โโโ logger/ # Pino logger setup
222
+
โ โโโ utils/ # Retry logic and helpers
204
223
โ โโโ index.ts # Main entry point
205
224
โโโ tests/
206
225
โ โโโ integration/ # Database integration tests
···
244
263
- [x] Phase 1: Core infrastructure (Docker, config, database, logging)
245
264
- [x] Phase 2: Firehose connection and label capture
246
265
- [x] Phase 3: Content hydration (posts and profiles)
247
-
- [ ] Phase 4: Blob processing (image/video hashing and storage)
248
-
- [ ] Phase 5: Rate limiting and optimization
266
+
- [x] Phase 4: Blob processing (image/video hashing and storage)
267
+
- [x] Phase 5: Rate limiting and optimization
249
268
- [ ] Phase 6: Comprehensive testing
250
269
- [ ] Phase 7: Documentation
251
270
252
271
## Safety Features
253
272
254
273
### Blob Hydration
255
-
By default, `HYDRATE_BLOBS` is `false`. This prevents accidental download of potentially harmful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes for ML training.
274
+
By default, `HYDRATE_BLOBS` is `false`. This prevents accidental download of potentially harmful / and or unlawful content (CSAM, graphic violence, etc.) while still capturing cryptographic and perceptual hashes.
256
275
257
276
Only enable blob download if:
258
277
1. You understand the legal and safety implications
+164
docs/profile-blob-hydration.md
+164
docs/profile-blob-hydration.md
···
1
+
# Profile Blob Hydration - Implementation Notes
2
+
3
+
## Overview
4
+
5
+
This document captures key learnings from implementing avatar and banner blob hydration for Bluesky profiles.
6
+
7
+
## Key Discoveries
8
+
9
+
### 1. CID Deserialization in @atproto/api
10
+
11
+
The `@atproto/api` library deserializes blob references from their JSON `$link` representation into CID class objects.
12
+
13
+
**Raw JSON from API:**
14
+
```json
15
+
{
16
+
"avatar": {
17
+
"$type": "blob",
18
+
"ref": {
19
+
"$link": "bafkreigg3s6plegjncmxubeufbohj3qasbm4r23q2x7zlivdhccfqfypve"
20
+
},
21
+
"mimeType": "image/jpeg",
22
+
"size": 101770
23
+
}
24
+
}
25
+
```
26
+
27
+
**What you get in TypeScript:**
28
+
```typescript
29
+
record.avatar.ref // CID object with { code, version, hash, ... }
30
+
```
31
+
32
+
**Solution:**
33
+
```typescript
34
+
const cid = record.avatar.ref.toString(); // "bafkrei..."
35
+
```
36
+
37
+
### 2. PDS Endpoint Resolution
38
+
39
+
Users can be on different Personal Data Servers (PDS), not just `bsky.social`. Blobs must be fetched from the user's actual PDS.
40
+
41
+
**Process:**
42
+
1. Query PLC directory for DID document: `https://plc.wtf/${did}`
43
+
2. Find service with `id: "#atproto_pds"` and `type: "AtprotoPersonalDataServer"`
44
+
3. Extract `serviceEndpoint` URL
45
+
4. Use that endpoint for `com.atproto.sync.getBlob`
46
+
47
+
**Example:**
48
+
```typescript
49
+
const didDoc = await fetch(`https://plc.wtf/${did}`).then(r => r.json());
50
+
const pdsService = didDoc.service?.find(s =>
51
+
s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer"
52
+
);
53
+
const pdsEndpoint = pdsService.serviceEndpoint; // e.g., "https://waxcap.us-west.host.bsky.network"
54
+
```
55
+
56
+
### 3. Correct Blob Fetching
57
+
58
+
**Don't use CDN paths** - they don't work reliably for all blobs and require authentication context.
59
+
60
+
**Use the AT Protocol API:**
61
+
```typescript
62
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
63
+
const response = await fetch(blobUrl);
64
+
const blobData = Buffer.from(await response.arrayBuffer());
65
+
```
66
+
67
+
### 4. Database Schema Design
68
+
69
+
**Separate tables for different blob types:**
70
+
71
+
- `blobs` table: Post images with FK to `posts(uri)`
72
+
- `profile_blobs` table: Avatars/banners with FK to `profiles(did)`
73
+
74
+
This allows proper relational queries and analysis.
75
+
76
+
**Profile blobs schema:**
77
+
```sql
78
+
CREATE TABLE profile_blobs (
79
+
did TEXT NOT NULL,
80
+
blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')),
81
+
blob_cid TEXT NOT NULL,
82
+
sha256 TEXT NOT NULL,
83
+
phash TEXT,
84
+
storage_path TEXT,
85
+
mimetype TEXT,
86
+
captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87
+
PRIMARY KEY (did, blob_type, captured_at),
88
+
FOREIGN KEY (did) REFERENCES profiles(did)
89
+
);
90
+
```
91
+
92
+
### 5. Change Tracking
93
+
94
+
Including `captured_at` in the primary key allows tracking when users change their avatars/banners.
95
+
96
+
**Query latest state:**
97
+
```sql
98
+
SELECT * FROM profile_blobs
99
+
WHERE did = ? AND blob_type = ?
100
+
ORDER BY captured_at DESC
101
+
LIMIT 1
102
+
```
103
+
104
+
**Only insert if changed:**
105
+
```typescript
106
+
const latest = await findLatestByDidAndType(did, type);
107
+
if (latest && latest.blob_cid === cid) {
108
+
return; // No change, skip
109
+
}
110
+
// Insert new row with current timestamp
111
+
```
112
+
113
+
### 6. Sentinel Values for Missing Data
114
+
115
+
Use empty string (`""`) to distinguish "we checked, user has no avatar" from NULL "we haven't checked yet".
116
+
117
+
```typescript
118
+
if (record.avatar?.ref) {
119
+
avatarCid = record.avatar.ref.toString();
120
+
} else {
121
+
avatarCid = ""; // Explicitly checked, not present
122
+
}
123
+
```
124
+
125
+
This prevents infinite re-hydration loops for profiles without avatars.
126
+
127
+
### 7. Profile Re-hydration Logic
128
+
129
+
```typescript
130
+
const existingProfile = await findByDid(did);
131
+
const needsRehydration = existingProfile &&
132
+
(existingProfile.avatar_cid === null || existingProfile.banner_cid === null);
133
+
134
+
if (existingProfile && !needsRehydration) {
135
+
return; // Skip
136
+
}
137
+
```
138
+
139
+
## Configuration
140
+
141
+
- `PLC_ENDPOINT`: DID resolution endpoint (default: `https://plc.wtf`)
142
+
- Can be changed to `https://plc.directory` or custom instance
143
+
- plc.wtf is faster but unofficial
144
+
145
+
## Common Errors
146
+
147
+
### "RepoNotFound"
148
+
- **Cause:** Querying wrong PDS endpoint
149
+
- **Solution:** Resolve correct PDS from DID document
150
+
151
+
### Foreign Key Constraint Violation
152
+
- **Cause:** Trying to insert profile blobs into `blobs` table
153
+
- **Solution:** Use separate `profile_blobs` table
154
+
155
+
### Missing CIDs Despite API Returning Them
156
+
- **Cause:** Trying to access `ref.$link` when ref is a CID object
157
+
- **Solution:** Call `.toString()` on the CID object
158
+
159
+
## Related Files
160
+
161
+
- `src/hydration/profiles.service.ts` - Main hydration logic
162
+
- `src/database/profile-blobs.repository.ts` - Profile blob persistence
163
+
- `src/database/schema.ts` - Table definitions
164
+
- `src/config/index.ts` - PLC endpoint configuration
-17
src/agent.ts
-17
src/agent.ts
···
1
-
import { setGlobalDispatcher, Agent as Agent } from "undici";
2
-
setGlobalDispatcher(new Agent({ connect: { timeout: 20_000 } }));
3
-
import { BSKY_HANDLE, BSKY_PASSWORD, PDS } from "./config.js";
4
-
import { AtpAgent } from "@atproto/api";
5
-
6
-
export const agent = new AtpAgent({
7
-
service: `https://${PDS}`,
8
-
});
9
-
export const login = () =>
10
-
agent.login({
11
-
identifier: BSKY_HANDLE,
12
-
password: BSKY_PASSWORD,
13
-
});
14
-
15
-
export const isLoggedIn = login()
16
-
.then(() => true)
17
-
.catch(() => false);
+68
src/blobs/hasher.ts
+68
src/blobs/hasher.ts
···
1
+
import crypto from "crypto";
2
+
import sharp from "sharp";
3
+
import { logger } from "../logger/index.js";
4
+
5
+
export async function computeSha256(buffer: Buffer): Promise<string> {
6
+
return crypto.createHash("sha256").update(buffer).digest("hex");
7
+
}
8
+
9
+
export async function computePerceptualHash(buffer: Buffer): Promise<string> {
10
+
try {
11
+
const image = sharp(buffer);
12
+
const metadata = await image.metadata();
13
+
14
+
if (!metadata.width || !metadata.height) {
15
+
throw new Error("Invalid image metadata");
16
+
}
17
+
18
+
const resized = await image
19
+
.resize(8, 8, { fit: "fill" })
20
+
.grayscale()
21
+
.raw()
22
+
.toBuffer();
23
+
24
+
const pixels = new Uint8Array(resized);
25
+
const avg =
26
+
pixels.reduce((sum, val) => sum + val, 0) / pixels.length;
27
+
28
+
let hash = "";
29
+
for (let i = 0; i < pixels.length; i++) {
30
+
hash += pixels[i] > avg ? "1" : "0";
31
+
}
32
+
33
+
return BigInt("0b" + hash).toString(16).padStart(16, "0");
34
+
} catch (error) {
35
+
logger.error({ error }, "Failed to compute perceptual hash");
36
+
throw error;
37
+
}
38
+
}
39
+
40
+
export interface BlobHashes {
41
+
sha256: string;
42
+
phash?: string;
43
+
}
44
+
45
+
export async function computeBlobHashes(
46
+
buffer: Buffer,
47
+
mimetype?: string
48
+
): Promise<BlobHashes> {
49
+
const sha256 = await computeSha256(buffer);
50
+
51
+
if (
52
+
mimetype?.startsWith("image/") &&
53
+
!mimetype.includes("svg")
54
+
) {
55
+
try {
56
+
const phash = await computePerceptualHash(buffer);
57
+
return { sha256, phash };
58
+
} catch (error) {
59
+
logger.warn(
60
+
{ error, mimetype },
61
+
"Failed to compute pHash, returning SHA256 only"
62
+
);
63
+
return { sha256 };
64
+
}
65
+
}
66
+
67
+
return { sha256 };
68
+
}
+188
src/blobs/processor.ts
+188
src/blobs/processor.ts
···
1
+
import { AtpAgent } from "@atproto/api";
2
+
import { Database } from "duckdb";
3
+
import { BlobsRepository } from "../database/blobs.repository.js";
4
+
import { computeBlobHashes } from "./hasher.js";
5
+
import { LocalBlobStorage } from "./storage/local.js";
6
+
import { S3BlobStorage } from "./storage/s3.js";
7
+
import { config } from "../config/index.js";
8
+
import { logger } from "../logger/index.js";
9
+
10
+
export interface BlobReference {
11
+
cid: string;
12
+
mimeType?: string;
13
+
}
14
+
15
+
export interface BlobStorage {
16
+
store(cid: string, data: Buffer, mimeType?: string): Promise<string>;
17
+
retrieve(cid: string): Promise<Buffer | null>;
18
+
}
19
+
20
+
export class BlobProcessor {
21
+
private blobsRepo: BlobsRepository;
22
+
private storage: BlobStorage | null = null;
23
+
private agent: AtpAgent;
24
+
25
+
constructor(db: Database, agent: AtpAgent) {
26
+
this.blobsRepo = new BlobsRepository(db);
27
+
this.agent = agent;
28
+
29
+
if (config.blobs.hydrateBlobs) {
30
+
if (config.blobs.storage.type === "s3") {
31
+
this.storage = new S3BlobStorage(
32
+
config.blobs.storage.s3Bucket!,
33
+
config.blobs.storage.s3Region!
34
+
);
35
+
} else {
36
+
this.storage = new LocalBlobStorage(
37
+
config.blobs.storage.localPath
38
+
);
39
+
}
40
+
}
41
+
}
42
+
43
+
extractBlobReferences(embedsJson: any): BlobReference[] {
44
+
const refs: BlobReference[] = [];
45
+
46
+
if (!embedsJson || !Array.isArray(embedsJson)) {
47
+
return refs;
48
+
}
49
+
50
+
for (const embed of embedsJson) {
51
+
if (embed.images && Array.isArray(embed.images)) {
52
+
for (const img of embed.images) {
53
+
if (img.image?.ref?.$link) {
54
+
refs.push({
55
+
cid: img.image.ref.$link,
56
+
mimeType: img.image.mimeType,
57
+
});
58
+
}
59
+
}
60
+
}
61
+
62
+
if (embed.media?.images && Array.isArray(embed.media.images)) {
63
+
for (const img of embed.media.images) {
64
+
if (img.image?.ref?.$link) {
65
+
refs.push({
66
+
cid: img.image.ref.$link,
67
+
mimeType: img.image.mimeType,
68
+
});
69
+
}
70
+
}
71
+
}
72
+
73
+
if (embed.video?.ref?.$link) {
74
+
refs.push({
75
+
cid: embed.video.ref.$link,
76
+
mimeType: embed.video.mimeType,
77
+
});
78
+
}
79
+
}
80
+
81
+
return refs;
82
+
}
83
+
84
+
async processBlobs(postUri: string, embedsJson: any): Promise<void> {
85
+
const blobRefs = this.extractBlobReferences(embedsJson);
86
+
87
+
if (blobRefs.length === 0) {
88
+
return;
89
+
}
90
+
91
+
for (const ref of blobRefs) {
92
+
try {
93
+
await this.processBlob(postUri, ref);
94
+
} catch (error) {
95
+
logger.error(
96
+
{ error, postUri, cid: ref.cid },
97
+
"Failed to process blob"
98
+
);
99
+
}
100
+
}
101
+
}
102
+
103
+
private parseBlobUri(uri: string): { did: string; type: 'post' | 'avatar' | 'banner' } {
104
+
if (uri.startsWith("profile://")) {
105
+
const match = uri.match(/^profile:\/\/([^/]+)\/(avatar|banner)$/);
106
+
if (match) {
107
+
return { did: match[1], type: match[2] as 'avatar' | 'banner' };
108
+
}
109
+
}
110
+
111
+
const [, did] = uri.replace("at://", "").split("/");
112
+
return { did, type: 'post' };
113
+
}
114
+
115
+
private async processBlob(
116
+
postUri: string,
117
+
ref: BlobReference
118
+
): Promise<void> {
119
+
const existing = await this.blobsRepo.findByCid(ref.cid);
120
+
if (existing) {
121
+
await this.blobsRepo.insert({
122
+
post_uri: postUri,
123
+
blob_cid: ref.cid,
124
+
sha256: existing.sha256,
125
+
phash: existing.phash,
126
+
storage_path: existing.storage_path,
127
+
mimetype: existing.mimetype,
128
+
});
129
+
logger.debug(
130
+
{ postUri, cid: ref.cid },
131
+
"Blob already processed, reusing hashes"
132
+
);
133
+
return;
134
+
}
135
+
136
+
const { did, type } = this.parseBlobUri(postUri);
137
+
const pds = `https://${config.bsky.pds}`;
138
+
const blobUrl = `${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${ref.cid}`;
139
+
140
+
try {
141
+
const response = await fetch(blobUrl);
142
+
143
+
if (!response.ok) {
144
+
logger.warn(
145
+
{ postUri, cid: ref.cid, status: response.status, did },
146
+
"Failed to fetch blob"
147
+
);
148
+
return;
149
+
}
150
+
151
+
const blobData = Buffer.from(await response.arrayBuffer());
152
+
153
+
let storagePath: string | undefined;
154
+
if (this.storage && config.blobs.hydrateBlobs) {
155
+
storagePath = await this.storage.store(
156
+
ref.cid,
157
+
blobData,
158
+
ref.mimeType
159
+
);
160
+
}
161
+
162
+
const hashes = await computeBlobHashes(
163
+
blobData,
164
+
ref.mimeType
165
+
);
166
+
167
+
await this.blobsRepo.insert({
168
+
post_uri: postUri,
169
+
blob_cid: ref.cid,
170
+
sha256: hashes.sha256,
171
+
phash: hashes.phash,
172
+
storage_path: storagePath,
173
+
mimetype: ref.mimeType,
174
+
});
175
+
176
+
logger.info(
177
+
{ postUri, cid: ref.cid, sha256: hashes.sha256, type },
178
+
"Blob processed successfully"
179
+
);
180
+
} catch (error) {
181
+
logger.error(
182
+
{ error, postUri, cid: ref.cid },
183
+
"Failed to download or hash blob"
184
+
);
185
+
throw error;
186
+
}
187
+
}
188
+
}
+82
src/blobs/storage/local.ts
+82
src/blobs/storage/local.ts
···
1
+
import * as fs from "fs/promises";
2
+
import * as path from "path";
3
+
import { BlobStorage } from "../processor.js";
4
+
import { logger } from "../../logger/index.js";
5
+
6
+
export class LocalBlobStorage implements BlobStorage {
7
+
constructor(private basePath: string) {}
8
+
9
+
async store(
10
+
cid: string,
11
+
data: Buffer,
12
+
mimeType?: string
13
+
): Promise<string> {
14
+
try {
15
+
const extension = this.getExtensionFromMime(mimeType);
16
+
const filename = `${cid}${extension}`;
17
+
18
+
const dir = path.join(
19
+
this.basePath,
20
+
cid.substring(0, 2),
21
+
cid.substring(2, 4)
22
+
);
23
+
24
+
await fs.mkdir(dir, { recursive: true });
25
+
26
+
const fullPath = path.join(dir, filename);
27
+
await fs.writeFile(fullPath, data);
28
+
29
+
logger.debug({ cid, path: fullPath }, "Blob stored locally");
30
+
31
+
return fullPath;
32
+
} catch (error) {
33
+
logger.error({ error, cid }, "Failed to store blob locally");
34
+
throw error;
35
+
}
36
+
}
37
+
38
+
async retrieve(cid: string): Promise<Buffer | null> {
39
+
try {
40
+
const possibleExtensions = ["", ".jpg", ".jpeg", ".png", ".webp", ".mp4"];
41
+
42
+
for (const ext of possibleExtensions) {
43
+
const filename = `${cid}${ext}`;
44
+
const filePath = path.join(
45
+
this.basePath,
46
+
cid.substring(0, 2),
47
+
cid.substring(2, 4),
48
+
filename
49
+
);
50
+
51
+
try {
52
+
const data = await fs.readFile(filePath);
53
+
return data;
54
+
} catch {
55
+
continue;
56
+
}
57
+
}
58
+
59
+
logger.warn({ cid }, "Blob not found in local storage");
60
+
return null;
61
+
} catch (error) {
62
+
logger.error({ error, cid }, "Failed to retrieve blob from local storage");
63
+
throw error;
64
+
}
65
+
}
66
+
67
+
private getExtensionFromMime(mimeType?: string): string {
68
+
if (!mimeType) return "";
69
+
70
+
const mimeMap: Record<string, string> = {
71
+
"image/jpeg": ".jpg",
72
+
"image/jpg": ".jpg",
73
+
"image/png": ".png",
74
+
"image/webp": ".webp",
75
+
"image/gif": ".gif",
76
+
"video/mp4": ".mp4",
77
+
"video/webm": ".webm",
78
+
};
79
+
80
+
return mimeMap[mimeType.toLowerCase()] || "";
81
+
}
82
+
}
+71
src/blobs/storage/s3.ts
+71
src/blobs/storage/s3.ts
···
1
+
import {
2
+
S3Client,
3
+
PutObjectCommand,
4
+
GetObjectCommand,
5
+
} from "@aws-sdk/client-s3";
6
+
import { BlobStorage } from "../processor.js";
7
+
import { logger } from "../../logger/index.js";
8
+
9
+
export class S3BlobStorage implements BlobStorage {
10
+
private client: S3Client;
11
+
private bucket: string;
12
+
13
+
constructor(bucket: string, region: string) {
14
+
this.bucket = bucket;
15
+
this.client = new S3Client({ region });
16
+
}
17
+
18
+
async store(
19
+
cid: string,
20
+
data: Buffer,
21
+
mimeType?: string
22
+
): Promise<string> {
23
+
try {
24
+
const key = `blobs/${cid.substring(0, 2)}/${cid.substring(2, 4)}/${cid}`;
25
+
26
+
await this.client.send(
27
+
new PutObjectCommand({
28
+
Bucket: this.bucket,
29
+
Key: key,
30
+
Body: data,
31
+
ContentType: mimeType,
32
+
})
33
+
);
34
+
35
+
logger.debug({ cid, key }, "Blob stored in S3");
36
+
37
+
return `s3://${this.bucket}/${key}`;
38
+
} catch (error) {
39
+
logger.error({ error, cid }, "Failed to store blob in S3");
40
+
throw error;
41
+
}
42
+
}
43
+
44
+
async retrieve(cid: string): Promise<Buffer | null> {
45
+
try {
46
+
const key = `blobs/${cid.substring(0, 2)}/${cid.substring(2, 4)}/${cid}`;
47
+
48
+
const response = await this.client.send(
49
+
new GetObjectCommand({
50
+
Bucket: this.bucket,
51
+
Key: key,
52
+
})
53
+
);
54
+
55
+
if (!response.Body) {
56
+
logger.warn({ cid }, "Blob not found in S3");
57
+
return null;
58
+
}
59
+
60
+
const chunks: Uint8Array[] = [];
61
+
for await (const chunk of response.Body as any) {
62
+
chunks.push(chunk);
63
+
}
64
+
65
+
return Buffer.concat(chunks);
66
+
} catch (error) {
67
+
logger.error({ error, cid }, "Failed to retrieve blob from S3");
68
+
return null;
69
+
}
70
+
}
71
+
}
+24
-29
src/config/index.ts
+24
-29
src/config/index.ts
···
9
9
password: z.string().min(1, "BSKY_PASSWORD is required"),
10
10
pds: z.string().default("bsky.social"),
11
11
}),
12
+
plc: z.object({
13
+
endpoint: z.string().url().default("https://plc.wtf"),
14
+
}),
12
15
labeler: z.object({
13
16
wssUrl: z.string().url("WSS_URL must be a valid URL"),
14
17
}),
15
18
blobs: z.object({
16
-
hydrate: z.boolean().default(false),
17
-
storageType: z.enum(["local", "s3"]).default("local"),
18
-
storagePath: z.string().default("./data/blobs"),
19
+
hydrateBlobs: z.boolean().default(false),
20
+
storage: z.object({
21
+
type: z.enum(["local", "s3"]).default("local"),
22
+
localPath: z.string().default("./data/blobs"),
23
+
s3Bucket: z.string().optional(),
24
+
s3Region: z.string().optional(),
25
+
}),
19
26
}),
20
-
s3: z
21
-
.object({
22
-
bucket: z.string().optional(),
23
-
region: z.string().optional(),
24
-
accessKeyId: z.string().optional(),
25
-
secretAccessKey: z.string().optional(),
26
-
})
27
-
.optional(),
28
27
database: z.object({
29
28
path: z.string().default("./data/skywatch.duckdb"),
30
29
}),
···
47
46
password: process.env.BSKY_PASSWORD,
48
47
pds: process.env.PDS,
49
48
},
49
+
plc: {
50
+
endpoint: process.env.PLC_ENDPOINT,
51
+
},
50
52
labeler: {
51
53
wssUrl: process.env.WSS_URL,
52
54
},
53
55
blobs: {
54
-
hydrate: process.env.HYDRATE_BLOBS === "true",
55
-
storageType: process.env.BLOB_STORAGE_TYPE,
56
-
storagePath: process.env.BLOB_STORAGE_PATH,
56
+
hydrateBlobs: process.env.HYDRATE_BLOBS === "true",
57
+
storage: {
58
+
type: process.env.BLOB_STORAGE_TYPE,
59
+
localPath: process.env.BLOB_STORAGE_PATH,
60
+
s3Bucket: process.env.S3_BUCKET,
61
+
s3Region: process.env.S3_REGION,
62
+
},
57
63
},
58
-
s3:
59
-
process.env.BLOB_STORAGE_TYPE === "s3"
60
-
? {
61
-
bucket: process.env.S3_BUCKET,
62
-
region: process.env.S3_REGION,
63
-
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
64
-
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
65
-
}
66
-
: undefined,
67
64
database: {
68
65
path: process.env.DB_PATH,
69
66
},
···
85
82
process.exit(1);
86
83
}
87
84
88
-
if (result.data.blobs.storageType === "s3") {
85
+
if (result.data.blobs.storage.type === "s3") {
89
86
if (
90
-
!result.data.s3?.bucket ||
91
-
!result.data.s3?.region ||
92
-
!result.data.s3?.accessKeyId ||
93
-
!result.data.s3?.secretAccessKey
87
+
!result.data.blobs.storage.s3Bucket ||
88
+
!result.data.blobs.storage.s3Region
94
89
) {
95
90
console.error(
96
-
"S3 configuration is incomplete. Required: S3_BUCKET, S3_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY"
91
+
"S3 configuration is incomplete. Required: S3_BUCKET, S3_REGION"
97
92
);
98
93
process.exit(1);
99
94
}
+17
src/database/blobs.repository.ts
+17
src/database/blobs.repository.ts
···
103
103
);
104
104
});
105
105
}
106
+
107
+
async findByCid(cid: string): Promise<Blob | null> {
108
+
return new Promise((resolve, reject) => {
109
+
this.db.all(
110
+
`SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`,
111
+
cid,
112
+
(err, rows: Blob[]) => {
113
+
if (err) {
114
+
logger.error({ err, cid }, "Failed to find blob by CID");
115
+
reject(err);
116
+
return;
117
+
}
118
+
resolve(rows?.[0] || null);
119
+
}
120
+
);
121
+
});
122
+
}
106
123
}
+123
src/database/profile-blobs.repository.ts
+123
src/database/profile-blobs.repository.ts
···
1
+
import { Database } from "duckdb";
2
+
import { logger } from "../logger/index.js";
3
+
4
+
export interface ProfileBlob {
5
+
did: string;
6
+
blob_type: "avatar" | "banner";
7
+
blob_cid: string;
8
+
sha256: string;
9
+
phash?: string;
10
+
storage_path?: string;
11
+
mimetype?: string;
12
+
captured_at?: Date;
13
+
}
14
+
15
+
export class ProfileBlobsRepository {
16
+
constructor(private db: Database) {}
17
+
18
+
async insert(blob: ProfileBlob): Promise<void> {
19
+
return new Promise((resolve, reject) => {
20
+
this.db.prepare(
21
+
`
22
+
INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at)
23
+
VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP))
24
+
`,
25
+
(err, stmt) => {
26
+
if (err) {
27
+
logger.error({ err }, "Failed to prepare profile blob insert statement");
28
+
reject(err);
29
+
return;
30
+
}
31
+
32
+
stmt.run(
33
+
blob.did,
34
+
blob.blob_type,
35
+
blob.blob_cid,
36
+
blob.sha256,
37
+
blob.phash || null,
38
+
blob.storage_path || null,
39
+
blob.mimetype || null,
40
+
blob.captured_at || null,
41
+
(err) => {
42
+
if (err) {
43
+
logger.error({ err, blob }, "Failed to insert profile blob");
44
+
reject(err);
45
+
return;
46
+
}
47
+
resolve();
48
+
}
49
+
);
50
+
}
51
+
);
52
+
});
53
+
}
54
+
55
+
async findByDid(did: string): Promise<ProfileBlob[]> {
56
+
return new Promise((resolve, reject) => {
57
+
this.db.all(
58
+
`SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`,
59
+
did,
60
+
(err, rows: ProfileBlob[]) => {
61
+
if (err) {
62
+
logger.error({ err, did }, "Failed to find profile blobs by DID");
63
+
reject(err);
64
+
return;
65
+
}
66
+
resolve(rows || []);
67
+
}
68
+
);
69
+
});
70
+
}
71
+
72
+
async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> {
73
+
return new Promise((resolve, reject) => {
74
+
this.db.all(
75
+
`SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`,
76
+
did,
77
+
blobType,
78
+
(err, rows: ProfileBlob[]) => {
79
+
if (err) {
80
+
logger.error({ err, did, blobType }, "Failed to find latest profile blob");
81
+
reject(err);
82
+
return;
83
+
}
84
+
resolve(rows?.[0] || null);
85
+
}
86
+
);
87
+
});
88
+
}
89
+
90
+
async findBySha256(sha256: string): Promise<ProfileBlob | null> {
91
+
return new Promise((resolve, reject) => {
92
+
this.db.all(
93
+
`SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`,
94
+
sha256,
95
+
(err, rows: ProfileBlob[]) => {
96
+
if (err) {
97
+
logger.error({ err, sha256 }, "Failed to find profile blob by SHA256");
98
+
reject(err);
99
+
return;
100
+
}
101
+
resolve(rows?.[0] || null);
102
+
}
103
+
);
104
+
});
105
+
}
106
+
107
+
async findByPhash(phash: string): Promise<ProfileBlob[]> {
108
+
return new Promise((resolve, reject) => {
109
+
this.db.all(
110
+
`SELECT * FROM profile_blobs WHERE phash = $1`,
111
+
phash,
112
+
(err, rows: ProfileBlob[]) => {
113
+
if (err) {
114
+
logger.error({ err, phash }, "Failed to find profile blobs by pHash");
115
+
reject(err);
116
+
return;
117
+
}
118
+
resolve(rows || []);
119
+
}
120
+
);
121
+
});
122
+
}
123
+
}
+9
-3
src/database/profiles.repository.ts
+9
-3
src/database/profiles.repository.ts
···
6
6
handle?: string;
7
7
display_name?: string;
8
8
description?: string;
9
+
avatar_cid?: string;
10
+
banner_cid?: string;
9
11
}
10
12
11
13
export class ProfilesRepository {
···
15
17
return new Promise((resolve, reject) => {
16
18
this.db.prepare(
17
19
`
18
-
INSERT INTO profiles (did, handle, display_name, description)
19
-
VALUES ($1, $2, $3, $4)
20
+
INSERT INTO profiles (did, handle, display_name, description, avatar_cid, banner_cid)
21
+
VALUES ($1, $2, $3, $4, $5, $6)
20
22
ON CONFLICT (did) DO UPDATE SET
21
23
handle = EXCLUDED.handle,
22
24
display_name = EXCLUDED.display_name,
23
-
description = EXCLUDED.description
25
+
description = EXCLUDED.description,
26
+
avatar_cid = EXCLUDED.avatar_cid,
27
+
banner_cid = EXCLUDED.banner_cid
24
28
`,
25
29
(err, stmt) => {
26
30
if (err) {
···
34
38
profile.handle || null,
35
39
profile.display_name || null,
36
40
profile.description || null,
41
+
profile.avatar_cid || null,
42
+
profile.banner_cid || null,
37
43
(err) => {
38
44
if (err) {
39
45
logger.error({ err, profile }, "Failed to insert profile");
+74
-3
src/database/schema.ts
+74
-3
src/database/schema.ts
···
34
34
did TEXT PRIMARY KEY,
35
35
handle TEXT,
36
36
display_name TEXT,
37
-
description TEXT
37
+
description TEXT,
38
+
avatar_cid TEXT,
39
+
banner_cid TEXT
38
40
);
39
41
40
42
-- Blobs table: stores information about image blobs found in posts
···
49
51
FOREIGN KEY (post_uri) REFERENCES posts(uri)
50
52
);
51
53
54
+
-- Profile blobs table: stores avatar and banner blobs for profiles
55
+
CREATE TABLE IF NOT EXISTS profile_blobs (
56
+
did TEXT NOT NULL,
57
+
blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')),
58
+
blob_cid TEXT NOT NULL,
59
+
sha256 TEXT NOT NULL,
60
+
phash TEXT,
61
+
storage_path TEXT,
62
+
mimetype TEXT,
63
+
captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
64
+
PRIMARY KEY (did, blob_type, captured_at),
65
+
FOREIGN KEY (did) REFERENCES profiles(did)
66
+
);
67
+
52
68
-- Indexes for performance
53
69
CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri);
54
70
CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val);
55
71
CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts);
56
72
CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did);
73
+
CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid);
57
74
CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256);
58
75
CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash);
76
+
CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256);
77
+
CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash);
59
78
`;
60
79
80
+
async function migrateProfilesTable(): Promise<void> {
81
+
const db = getDatabase();
82
+
83
+
return new Promise((resolve, reject) => {
84
+
db.all(
85
+
"SELECT column_name FROM information_schema.columns WHERE table_name = 'profiles'",
86
+
(err, rows: any[]) => {
87
+
if (err) {
88
+
logger.error({ err }, "Failed to check profiles table columns");
89
+
reject(err);
90
+
return;
91
+
}
92
+
93
+
const columnNames = rows.map((row) => row.column_name);
94
+
const hasAvatarCid = columnNames.includes("avatar_cid");
95
+
const hasBannerCid = columnNames.includes("banner_cid");
96
+
97
+
if (!hasAvatarCid || !hasBannerCid) {
98
+
logger.info("Migrating profiles table to add avatar_cid and banner_cid columns");
99
+
100
+
const migrations: string[] = [];
101
+
if (!hasAvatarCid) {
102
+
migrations.push("ALTER TABLE profiles ADD COLUMN avatar_cid TEXT");
103
+
}
104
+
if (!hasBannerCid) {
105
+
migrations.push("ALTER TABLE profiles ADD COLUMN banner_cid TEXT");
106
+
}
107
+
108
+
db.exec(migrations.join("; "), (err) => {
109
+
if (err) {
110
+
logger.error({ err }, "Failed to migrate profiles table");
111
+
reject(err);
112
+
return;
113
+
}
114
+
logger.info("Profiles table migration completed");
115
+
resolve();
116
+
});
117
+
} else {
118
+
logger.debug("Profiles table already has avatar_cid and banner_cid columns");
119
+
resolve();
120
+
}
121
+
}
122
+
);
123
+
});
124
+
}
125
+
61
126
export async function initializeSchema(): Promise<void> {
62
127
const db = getDatabase();
63
128
64
129
return new Promise((resolve, reject) => {
65
-
db.exec(SCHEMA_SQL, (err) => {
130
+
db.exec(SCHEMA_SQL, async (err) => {
66
131
if (err) {
67
132
logger.error({ err }, "Failed to initialize schema");
68
133
reject(err);
69
134
return;
70
135
}
71
136
logger.info("Database schema initialized");
72
-
resolve();
137
+
138
+
try {
139
+
await migrateProfilesTable();
140
+
resolve();
141
+
} catch (migrationErr) {
142
+
reject(migrationErr);
143
+
}
73
144
});
74
145
});
75
146
}
+25
-17
src/firehose/decoder.ts
+25
-17
src/firehose/decoder.ts
···
1
-
import { decode as decodeCBOR } from "@atcute/cbor";
1
+
import { decodeFirst } from "@atcute/cbor";
2
2
import { logger } from "../logger/index.js";
3
3
4
4
export interface LabelEvent {
···
14
14
}
15
15
16
16
export interface FirehoseMessage {
17
-
op: number;
17
+
op?: number;
18
18
t?: string;
19
+
seq?: number;
20
+
labels?: LabelEvent[];
19
21
[key: string]: any;
20
22
}
21
23
22
24
export function decodeFirehoseMessage(data: Buffer): FirehoseMessage | null {
23
25
try {
24
-
const decoded = decodeCBOR(data);
25
-
return decoded as FirehoseMessage;
26
-
} catch (error) {
27
-
logger.error({ error }, "Failed to decode CBOR message");
28
-
return null;
29
-
}
30
-
}
26
+
const buffer = new Uint8Array(data);
27
+
const [header, remainder] = decodeFirst(buffer);
28
+
const [body] = decodeFirst(remainder);
31
29
32
-
export function extractLabelFromMessage(message: FirehoseMessage): LabelEvent | null {
33
-
if (!message || message.op !== 1) {
30
+
return body as FirehoseMessage;
31
+
} catch (err) {
32
+
logger.error(
33
+
{
34
+
err: err instanceof Error ? err.message : String(err),
35
+
errorStack: err instanceof Error ? err.stack : undefined,
36
+
dataLength: data.length,
37
+
dataPreview: data.slice(0, 50).toString("hex")
38
+
},
39
+
"Failed to decode CBOR message"
40
+
);
34
41
return null;
35
42
}
43
+
}
36
44
37
-
if (message.t !== "#labels") {
38
-
return null;
45
+
export function extractLabelsFromMessage(message: FirehoseMessage): LabelEvent[] {
46
+
if (!message) {
47
+
return [];
39
48
}
40
49
41
-
const labels = message.labels;
42
-
if (!Array.isArray(labels) || labels.length === 0) {
43
-
return null;
50
+
if (message.labels && Array.isArray(message.labels)) {
51
+
return message.labels;
44
52
}
45
53
46
-
return labels[0] as LabelEvent;
54
+
return [];
47
55
}
48
56
49
57
export function validateLabel(label: LabelEvent): boolean {
+8
-10
src/firehose/subscriber.ts
+8
-10
src/firehose/subscriber.ts
···
4
4
import { logger } from "../logger/index.js";
5
5
import {
6
6
decodeFirehoseMessage,
7
-
extractLabelFromMessage,
7
+
extractLabelsFromMessage,
8
8
validateLabel,
9
9
LabelEvent,
10
10
} from "./decoder.js";
···
86
86
return;
87
87
}
88
88
89
-
const label = extractLabelFromMessage(message);
90
-
if (!label) return;
91
-
92
-
if (!validateLabel(label)) return;
93
-
94
-
if (!this.filter.shouldCapture(label)) return;
95
-
96
-
this.emit("label", label);
97
-
98
89
if (message.seq) {
99
90
await this.saveCursor(message.seq);
91
+
}
92
+
93
+
const labels = extractLabelsFromMessage(message);
94
+
for (const label of labels) {
95
+
if (!validateLabel(label)) continue;
96
+
if (!this.filter.shouldCapture(label)) continue;
97
+
this.emit("label", label);
100
98
}
101
99
} catch (error) {
102
100
logger.error({ error }, "Error processing message");
-165
src/firehose.ts
-165
src/firehose.ts
···
1
-
import { decode, decodeFirst } from "@atcute/cbor";
2
-
import { readFileSync, writeFileSync } from "fs";
3
-
import { WSS_URL } from "./config.js";
4
-
import { logger } from "./logger.js";
5
-
import { LabelEvent } from "./types.js";
6
-
7
-
let ws: WebSocket | null = null;
8
-
let reconnectTimeout: NodeJS.Timeout | null = null;
9
-
let reconnectAttempts = 0;
10
-
let cursor: string = "";
11
-
const MAX_RECONNECT_DELAY = 60000;
12
-
const INITIAL_RECONNECT_DELAY = 1000;
13
-
const CURSOR_FILE = "./cursor.txt";
14
-
15
-
function getReconnectDelay(): number {
16
-
const delay = Math.min(
17
-
INITIAL_RECONNECT_DELAY * Math.pow(2, reconnectAttempts),
18
-
MAX_RECONNECT_DELAY,
19
-
);
20
-
reconnectAttempts++;
21
-
return delay;
22
-
}
23
-
24
-
async function handleLabelEvent(event: LabelEvent): Promise<void> {
25
-
// Placeholder for hydration logic
26
-
logger.info({ event }, "Received label event");
27
-
}
28
-
29
-
function saveCursor(seq: string): void {
30
-
try {
31
-
cursor = seq;
32
-
writeFileSync(CURSOR_FILE, seq, "utf8");
33
-
logger.debug({ cursor: seq }, "Saved cursor");
34
-
} catch (err) {
35
-
logger.warn({ err }, "Failed to save cursor");
36
-
}
37
-
}
38
-
39
-
function loadCursor(): string {
40
-
try {
41
-
const saved = readFileSync(CURSOR_FILE, "utf8").trim();
42
-
logger.info({ cursor: saved }, "Loaded cursor from file");
43
-
return saved;
44
-
} catch (err) {
45
-
logger.info("No cursor file found, starting from live");
46
-
return "";
47
-
}
48
-
}
49
-
50
-
function parseMessage(data: any): void {
51
-
try {
52
-
let buffer: Uint8Array;
53
-
54
-
if (data instanceof ArrayBuffer) {
55
-
buffer = new Uint8Array(data);
56
-
} else if (data instanceof Uint8Array) {
57
-
buffer = data;
58
-
} else if (typeof data === "string") {
59
-
try {
60
-
const parsed = JSON.parse(data);
61
-
if (parsed.seq) {
62
-
saveCursor(parsed.seq.toString());
63
-
}
64
-
processLabels(parsed);
65
-
return;
66
-
} catch {
67
-
logger.warn("Received non-JSON string message");
68
-
return;
69
-
}
70
-
} else {
71
-
processLabels(data);
72
-
return;
73
-
}
74
-
75
-
const [header, remainder] = decodeFirst(buffer);
76
-
const [body] = decodeFirst(remainder);
77
-
78
-
if (body && typeof body === "object" && "seq" in body) {
79
-
saveCursor(body.seq.toString());
80
-
}
81
-
82
-
processLabels(body);
83
-
} catch (err) {
84
-
logger.error({ err }, "Error parsing message");
85
-
}
86
-
}
87
-
88
-
function processLabels(parsed: any): void {
89
-
if (parsed.labels && Array.isArray(parsed.labels)) {
90
-
for (const label of parsed.labels) {
91
-
handleLabelEvent(label as LabelEvent);
92
-
}
93
-
} else if (parsed.label) {
94
-
handleLabelEvent(parsed.label as LabelEvent);
95
-
} else {
96
-
logger.debug({ parsed }, "Message does not contain label data");
97
-
}
98
-
}
99
-
100
-
function connect(): void {
101
-
if (
102
-
ws &&
103
-
(ws.readyState === WebSocket.CONNECTING || ws.readyState === WebSocket.OPEN)
104
-
) {
105
-
logger.debug("WebSocket already connected or connecting");
106
-
return;
107
-
}
108
-
109
-
const url = cursor ? `${WSS_URL}?cursor=${cursor}` : WSS_URL;
110
-
logger.info({ url, cursor }, "Connecting to firehose");
111
-
112
-
ws = new WebSocket(url);
113
-
114
-
ws.addEventListener("open", () => {
115
-
logger.info("Firehose connection established");
116
-
reconnectAttempts = 0;
117
-
});
118
-
119
-
ws.addEventListener("message", (event) => {
120
-
parseMessage(event.data);
121
-
});
122
-
123
-
ws.addEventListener("error", (event) => {
124
-
logger.error({ event }, "Firehose WebSocket error");
125
-
});
126
-
127
-
ws.addEventListener("close", (event) => {
128
-
logger.warn(
129
-
{ code: event.code, reason: event.reason },
130
-
"Firehose connection closed",
131
-
);
132
-
scheduleReconnect();
133
-
});
134
-
}
135
-
136
-
function scheduleReconnect(): void {
137
-
if (reconnectTimeout) {
138
-
clearTimeout(reconnectTimeout);
139
-
}
140
-
141
-
const delay = getReconnectDelay();
142
-
logger.info({ delay, attempt: reconnectAttempts }, "Scheduling reconnect");
143
-
144
-
reconnectTimeout = setTimeout(() => {
145
-
connect();
146
-
}, delay);
147
-
}
148
-
149
-
export function startFirehose(): void {
150
-
cursor = loadCursor();
151
-
connect();
152
-
}
153
-
154
-
export function stopFirehose(): void {
155
-
if (reconnectTimeout) {
156
-
clearTimeout(reconnectTimeout);
157
-
reconnectTimeout = null;
158
-
}
159
-
160
-
if (ws) {
161
-
logger.info("Closing firehose connection");
162
-
ws.close();
163
-
ws = null;
164
-
}
165
-
}
+49
-6
src/hydration/posts.service.ts
+49
-6
src/hydration/posts.service.ts
···
1
1
import { AtpAgent } from "@atproto/api";
2
2
import { Database } from "duckdb";
3
3
import { PostsRepository } from "../database/posts.repository.js";
4
+
import { BlobProcessor } from "../blobs/processor.js";
5
+
import { pRateLimit } from "p-ratelimit";
6
+
import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
4
7
import { logger } from "../logger/index.js";
5
8
import { config } from "../config/index.js";
6
9
7
10
export class PostHydrationService {
8
11
private agent: AtpAgent;
9
12
private postsRepo: PostsRepository;
13
+
private blobProcessor: BlobProcessor;
14
+
private limit: ReturnType<typeof pRateLimit>;
10
15
11
16
constructor(db: Database) {
12
17
this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` });
13
18
this.postsRepo = new PostsRepository(db);
19
+
this.blobProcessor = new BlobProcessor(db, this.agent);
20
+
this.limit = pRateLimit({
21
+
interval: 300000,
22
+
rate: 3000,
23
+
concurrency: 48,
24
+
maxDelay: 60000,
25
+
});
14
26
}
15
27
16
28
async initialize(): Promise<void> {
···
42
54
43
55
const [did, collection, rkey] = uriParts;
44
56
45
-
const response = await this.agent.com.atproto.repo.getRecord({
46
-
repo: did,
47
-
collection,
48
-
rkey,
49
-
});
57
+
const response = await this.limit(() =>
58
+
withRetry(
59
+
async () => {
60
+
return await this.agent.com.atproto.repo.getRecord({
61
+
repo: did,
62
+
collection,
63
+
rkey,
64
+
});
65
+
},
66
+
{
67
+
maxAttempts: 3,
68
+
initialDelay: 1000,
69
+
maxDelay: 10000,
70
+
backoffMultiplier: 2,
71
+
retryableErrors: [
72
+
isRateLimitError,
73
+
isNetworkError,
74
+
isServerError,
75
+
],
76
+
}
77
+
)
78
+
);
50
79
51
80
if (!response.success || !response.data.value) {
52
81
logger.warn({ uri }, "Failed to fetch post record");
···
57
86
58
87
const isReply = !!record.reply;
59
88
89
+
const embeds = record.embed ? [record.embed] : null;
90
+
60
91
await this.postsRepo.insert({
61
92
uri,
62
93
did,
63
94
text: record.text || "",
64
95
facets: record.facets || null,
65
-
embeds: record.embed || null,
96
+
embeds,
66
97
langs: record.langs || null,
67
98
tags: record.tags || null,
68
99
created_at: record.createdAt,
···
70
101
});
71
102
72
103
logger.info({ uri }, "Post hydrated successfully");
104
+
105
+
if (embeds) {
106
+
try {
107
+
await this.blobProcessor.processBlobs(uri, embeds);
108
+
} catch (error) {
109
+
logger.warn({ error, uri }, "Failed to process blobs for post");
110
+
}
111
+
}
73
112
} catch (error) {
113
+
if (isRecordNotFoundError(error)) {
114
+
logger.warn({ uri }, "Post record not found, skipping");
115
+
return;
116
+
}
74
117
logger.error({ error, uri }, "Failed to hydrate post");
75
118
throw error;
76
119
}
+191
-9
src/hydration/profiles.service.ts
+191
-9
src/hydration/profiles.service.ts
···
1
1
import { AtpAgent } from "@atproto/api";
2
2
import { Database } from "duckdb";
3
3
import { ProfilesRepository } from "../database/profiles.repository.js";
4
+
import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js";
5
+
import { computeBlobHashes } from "../blobs/hasher.js";
6
+
import { LocalBlobStorage } from "../blobs/storage/local.js";
7
+
import { S3BlobStorage } from "../blobs/storage/s3.js";
8
+
import { BlobStorage } from "../blobs/processor.js";
9
+
import { pRateLimit } from "p-ratelimit";
10
+
import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
4
11
import { logger } from "../logger/index.js";
5
12
import { config } from "../config/index.js";
6
13
7
14
export class ProfileHydrationService {
8
15
private agent: AtpAgent;
9
16
private profilesRepo: ProfilesRepository;
17
+
private profileBlobsRepo: ProfileBlobsRepository;
18
+
private storage: BlobStorage | null = null;
19
+
private limit: ReturnType<typeof pRateLimit>;
10
20
11
21
constructor(db: Database) {
12
22
this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` });
13
23
this.profilesRepo = new ProfilesRepository(db);
24
+
this.profileBlobsRepo = new ProfileBlobsRepository(db);
25
+
26
+
if (config.blobs.hydrateBlobs) {
27
+
if (config.blobs.storage.type === "s3") {
28
+
this.storage = new S3BlobStorage(
29
+
config.blobs.storage.s3Bucket!,
30
+
config.blobs.storage.s3Region!
31
+
);
32
+
} else {
33
+
this.storage = new LocalBlobStorage(
34
+
config.blobs.storage.localPath
35
+
);
36
+
}
37
+
}
38
+
39
+
this.limit = pRateLimit({
40
+
interval: 300000,
41
+
rate: 3000,
42
+
concurrency: 48,
43
+
maxDelay: 60000,
44
+
});
14
45
}
15
46
16
47
async initialize(): Promise<void> {
···
29
60
async hydrateProfile(did: string): Promise<void> {
30
61
try {
31
62
const existingProfile = await this.profilesRepo.findByDid(did);
32
-
if (existingProfile) {
33
-
logger.debug({ did }, "Profile already hydrated, skipping");
63
+
const needsRehydration = existingProfile && (existingProfile.avatar_cid === null || existingProfile.banner_cid === null);
64
+
65
+
if (existingProfile && !needsRehydration) {
66
+
logger.debug({ did }, "Profile already fully hydrated, skipping");
34
67
return;
35
68
}
36
69
37
-
const profileResponse = await this.agent.com.atproto.repo.getRecord({
38
-
repo: did,
39
-
collection: "app.bsky.actor.profile",
40
-
rkey: "self",
41
-
});
70
+
if (needsRehydration) {
71
+
logger.debug({ did }, "Re-hydrating profile to fetch avatar/banner CIDs");
72
+
}
73
+
74
+
const profileResponse = await this.limit(() =>
75
+
withRetry(
76
+
async () => {
77
+
return await this.agent.com.atproto.repo.getRecord({
78
+
repo: did,
79
+
collection: "app.bsky.actor.profile",
80
+
rkey: "self",
81
+
});
82
+
},
83
+
{
84
+
maxAttempts: 3,
85
+
initialDelay: 1000,
86
+
maxDelay: 10000,
87
+
backoffMultiplier: 2,
88
+
retryableErrors: [
89
+
isRateLimitError,
90
+
isNetworkError,
91
+
isServerError,
92
+
],
93
+
}
94
+
)
95
+
);
42
96
43
97
let displayName: string | undefined;
44
98
let description: string | undefined;
99
+
let avatarCid: string | undefined;
100
+
let bannerCid: string | undefined;
45
101
46
102
if (profileResponse.success && profileResponse.data.value) {
47
103
const record = profileResponse.data.value as any;
48
104
displayName = record.displayName;
49
105
description = record.description;
106
+
107
+
if (record.avatar?.ref) {
108
+
avatarCid = record.avatar.ref.toString();
109
+
} else {
110
+
avatarCid = "";
111
+
}
112
+
113
+
if (record.banner?.ref) {
114
+
bannerCid = record.banner.ref.toString();
115
+
} else {
116
+
bannerCid = "";
117
+
}
118
+
119
+
logger.debug({ did, avatarCid, bannerCid, hasAvatar: !!record.avatar, hasBanner: !!record.banner }, "Extracted CIDs from profile record");
50
120
}
51
121
52
-
const profileLookup = await this.agent.getProfile({ actor: did });
122
+
const profileLookup = await this.limit(() =>
123
+
withRetry(
124
+
async () => {
125
+
return await this.agent.getProfile({ actor: did });
126
+
},
127
+
{
128
+
maxAttempts: 3,
129
+
initialDelay: 1000,
130
+
maxDelay: 10000,
131
+
backoffMultiplier: 2,
132
+
retryableErrors: [
133
+
isRateLimitError,
134
+
isNetworkError,
135
+
isServerError,
136
+
],
137
+
}
138
+
)
139
+
);
53
140
54
141
let handle: string | undefined;
55
142
if (profileLookup.success) {
···
61
148
handle,
62
149
display_name: displayName,
63
150
description,
151
+
avatar_cid: avatarCid,
152
+
banner_cid: bannerCid,
64
153
});
65
154
66
-
logger.info({ did, handle }, "Profile hydrated successfully");
155
+
if (avatarCid && avatarCid !== "") {
156
+
try {
157
+
await this.processProfileBlob(did, avatarCid, "avatar");
158
+
} catch (error) {
159
+
logger.warn({ error, did, avatarCid }, "Failed to process avatar blob");
160
+
}
161
+
}
162
+
163
+
if (bannerCid && bannerCid !== "") {
164
+
try {
165
+
await this.processProfileBlob(did, bannerCid, "banner");
166
+
} catch (error) {
167
+
logger.warn({ error, did, bannerCid }, "Failed to process banner blob");
168
+
}
169
+
}
170
+
171
+
logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully");
67
172
} catch (error) {
173
+
if (isRecordNotFoundError(error)) {
174
+
logger.warn({ did }, "Profile record not found, skipping");
175
+
return;
176
+
}
68
177
logger.error({ error, did }, "Failed to hydrate profile");
69
178
throw error;
70
179
}
180
+
}
181
+
182
+
private async resolvePds(did: string): Promise<string | null> {
183
+
try {
184
+
const didDocResponse = await fetch(`${config.plc.endpoint}/${did}`);
185
+
if (!didDocResponse.ok) {
186
+
logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document");
187
+
return null;
188
+
}
189
+
190
+
const didDoc = await didDocResponse.json();
191
+
const pdsService = didDoc.service?.find((s: any) =>
192
+
s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer"
193
+
);
194
+
195
+
if (!pdsService?.serviceEndpoint) {
196
+
logger.warn({ did }, "No PDS endpoint found in DID document");
197
+
return null;
198
+
}
199
+
200
+
return pdsService.serviceEndpoint;
201
+
} catch (error) {
202
+
logger.error({ error, did }, "Failed to resolve PDS from DID");
203
+
return null;
204
+
}
205
+
}
206
+
207
+
private async processProfileBlob(
208
+
did: string,
209
+
cid: string,
210
+
type: "avatar" | "banner"
211
+
): Promise<void> {
212
+
const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type);
213
+
214
+
if (latestBlob && latestBlob.blob_cid === cid) {
215
+
logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping");
216
+
return;
217
+
}
218
+
219
+
const pdsEndpoint = await this.resolvePds(did);
220
+
if (!pdsEndpoint) {
221
+
logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint");
222
+
return;
223
+
}
224
+
225
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
226
+
const blobResponse = await fetch(blobUrl);
227
+
228
+
if (!blobResponse.ok) {
229
+
logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS");
230
+
return;
231
+
}
232
+
233
+
const blobData = Buffer.from(await blobResponse.arrayBuffer());
234
+
235
+
let storagePath: string | undefined;
236
+
if (this.storage && config.blobs.hydrateBlobs) {
237
+
storagePath = await this.storage.store(cid, blobData, "image/jpeg");
238
+
}
239
+
240
+
const hashes = await computeBlobHashes(blobData, "image/jpeg");
241
+
242
+
await this.profileBlobsRepo.insert({
243
+
did,
244
+
blob_type: type,
245
+
blob_cid: cid,
246
+
sha256: hashes.sha256,
247
+
phash: hashes.phash,
248
+
storage_path: storagePath,
249
+
mimetype: "image/jpeg",
250
+
});
251
+
252
+
logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully");
71
253
}
72
254
}
+93
src/utils/rate-limit.ts
+93
src/utils/rate-limit.ts
···
1
+
import { logger } from "../logger/index.js";
2
+
3
+
export interface RateLimiterConfig {
4
+
maxTokens: number;
5
+
refillRate: number;
6
+
refillInterval: number;
7
+
}
8
+
9
+
export class RateLimiter {
10
+
private tokens: number;
11
+
private lastRefill: number;
12
+
private config: RateLimiterConfig;
13
+
14
+
constructor(config: RateLimiterConfig) {
15
+
this.config = config;
16
+
this.tokens = config.maxTokens;
17
+
this.lastRefill = Date.now();
18
+
}
19
+
20
+
private refill(): void {
21
+
const now = Date.now();
22
+
const elapsed = now - this.lastRefill;
23
+
const intervals = Math.floor(elapsed / this.config.refillInterval);
24
+
25
+
if (intervals > 0) {
26
+
this.tokens = Math.min(
27
+
this.config.maxTokens,
28
+
this.tokens + intervals * this.config.refillRate
29
+
);
30
+
this.lastRefill = now;
31
+
}
32
+
}
33
+
34
+
async acquire(tokens: number = 1): Promise<void> {
35
+
while (true) {
36
+
this.refill();
37
+
38
+
if (this.tokens >= tokens) {
39
+
this.tokens -= tokens;
40
+
return;
41
+
}
42
+
43
+
const waitTime = this.config.refillInterval;
44
+
logger.debug(
45
+
{ tokens, available: this.tokens, waitTime },
46
+
"Rate limit reached, waiting"
47
+
);
48
+
49
+
await new Promise((resolve) => setTimeout(resolve, waitTime));
50
+
}
51
+
}
52
+
53
+
getAvailableTokens(): number {
54
+
this.refill();
55
+
return this.tokens;
56
+
}
57
+
58
+
reset(): void {
59
+
this.tokens = this.config.maxTokens;
60
+
this.lastRefill = Date.now();
61
+
}
62
+
}
63
+
64
+
export class MultiRateLimiter {
65
+
private limiters: Map<string, RateLimiter> = new Map();
66
+
67
+
constructor(
68
+
private defaultConfig: RateLimiterConfig
69
+
) {}
70
+
71
+
setLimiter(key: string, config: RateLimiterConfig): void {
72
+
this.limiters.set(key, new RateLimiter(config));
73
+
}
74
+
75
+
async acquire(key: string, tokens: number = 1): Promise<void> {
76
+
let limiter = this.limiters.get(key);
77
+
if (!limiter) {
78
+
limiter = new RateLimiter(this.defaultConfig);
79
+
this.limiters.set(key, limiter);
80
+
}
81
+
await limiter.acquire(tokens);
82
+
}
83
+
84
+
reset(key?: string): void {
85
+
if (key) {
86
+
this.limiters.get(key)?.reset();
87
+
} else {
88
+
for (const limiter of this.limiters.values()) {
89
+
limiter.reset();
90
+
}
91
+
}
92
+
}
93
+
}
+102
src/utils/retry.ts
+102
src/utils/retry.ts
···
1
+
import { logger } from "../logger/index.js";
2
+
3
+
export interface RetryConfig {
4
+
maxAttempts: number;
5
+
initialDelay: number;
6
+
maxDelay: number;
7
+
backoffMultiplier: number;
8
+
retryableErrors?: ((error: any) => boolean)[];
9
+
}
10
+
11
+
export class RetryError extends Error {
12
+
constructor(
13
+
message: string,
14
+
public attempts: number,
15
+
public lastError: Error
16
+
) {
17
+
super(message);
18
+
this.name = "RetryError";
19
+
}
20
+
}
21
+
22
+
export async function withRetry<T>(
23
+
fn: () => Promise<T>,
24
+
config: Partial<RetryConfig> = {}
25
+
): Promise<T> {
26
+
const {
27
+
maxAttempts = 3,
28
+
initialDelay = 1000,
29
+
maxDelay = 30000,
30
+
backoffMultiplier = 2,
31
+
retryableErrors = [(error) => true],
32
+
} = config;
33
+
34
+
let lastError: Error;
35
+
let delay = initialDelay;
36
+
37
+
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
38
+
try {
39
+
return await fn();
40
+
} catch (error) {
41
+
lastError = error instanceof Error ? error : new Error(String(error));
42
+
43
+
const isRetryable = retryableErrors.some((check) => check(lastError));
44
+
45
+
if (!isRetryable || attempt >= maxAttempts) {
46
+
throw new RetryError(
47
+
`Operation failed after ${attempt} attempts`,
48
+
attempt,
49
+
lastError
50
+
);
51
+
}
52
+
53
+
logger.warn(
54
+
{
55
+
attempt,
56
+
maxAttempts,
57
+
delay,
58
+
error: lastError.message,
59
+
},
60
+
"Retrying after error"
61
+
);
62
+
63
+
await new Promise((resolve) => setTimeout(resolve, delay));
64
+
delay = Math.min(delay * backoffMultiplier, maxDelay);
65
+
}
66
+
}
67
+
68
+
throw new RetryError(
69
+
`Operation failed after ${maxAttempts} attempts`,
70
+
maxAttempts,
71
+
lastError!
72
+
);
73
+
}
74
+
75
+
export function isRateLimitError(error: any): boolean {
76
+
return (
77
+
error?.status === 429 ||
78
+
error?.message?.toLowerCase().includes("rate limit") ||
79
+
error?.message?.toLowerCase().includes("too many requests")
80
+
);
81
+
}
82
+
83
+
export function isNetworkError(error: any): boolean {
84
+
return (
85
+
error?.code === "ECONNRESET" ||
86
+
error?.code === "ENOTFOUND" ||
87
+
error?.code === "ETIMEDOUT" ||
88
+
error?.message?.toLowerCase().includes("network") ||
89
+
error?.message?.toLowerCase().includes("timeout")
90
+
);
91
+
}
92
+
93
+
export function isServerError(error: any): boolean {
94
+
return error?.status >= 500 && error?.status < 600;
95
+
}
96
+
97
+
export function isRecordNotFoundError(error: any): boolean {
98
+
return (
99
+
error?.error === "RecordNotFound" ||
100
+
error?.message?.includes("RecordNotFound")
101
+
);
102
+
}
+27
-1
tests/integration/database.test.ts
+27
-1
tests/integration/database.test.ts
···
48
48
did TEXT PRIMARY KEY,
49
49
handle TEXT,
50
50
display_name TEXT,
51
-
description TEXT
51
+
description TEXT,
52
+
avatar_cid TEXT,
53
+
banner_cid TEXT
52
54
);
53
55
54
56
CREATE TABLE IF NOT EXISTS blobs (
···
146
148
expect(found).not.toBeNull();
147
149
expect(found?.did).toBe("did:plc:testuser");
148
150
});
151
+
152
+
test("should insert and retrieve profile with avatar and banner", async () => {
153
+
const profile = {
154
+
did: "did:plc:testuser2",
155
+
handle: "testuser2.bsky.social",
156
+
display_name: "Test User 2",
157
+
description: "A test user with avatar",
158
+
avatar_cid: "bafyavatartest",
159
+
banner_cid: "bafybannertest",
160
+
};
161
+
162
+
await profilesRepo.insert(profile);
163
+
const found = await profilesRepo.findByDid(profile.did);
164
+
165
+
expect(found).not.toBeNull();
166
+
expect(found?.avatar_cid).toBe("bafyavatartest");
167
+
expect(found?.banner_cid).toBe("bafybannertest");
168
+
});
149
169
});
150
170
151
171
describe("BlobsRepository", () => {
···
174
194
test("should find blobs by pHash", async () => {
175
195
const found = await blobsRepo.findByPhash("deadbeef");
176
196
expect(found.length).toBeGreaterThan(0);
197
+
});
198
+
199
+
test("should find blob by CID", async () => {
200
+
const found = await blobsRepo.findByCid("bafytest123");
201
+
expect(found).not.toBeNull();
202
+
expect(found?.sha256).toBe("abc123def456");
177
203
});
178
204
});
179
205
});
+25
-17
tests/unit/decoder.test.ts
+25
-17
tests/unit/decoder.test.ts
···
1
1
import { describe, test, expect } from "bun:test";
2
2
import {
3
-
extractLabelFromMessage,
3
+
extractLabelsFromMessage,
4
4
validateLabel,
5
5
LabelEvent,
6
6
} from "../../src/firehose/decoder.js";
7
7
8
8
describe("Firehose Decoder", () => {
9
-
describe("extractLabelFromMessage", () => {
10
-
test("should extract label from valid message", () => {
9
+
describe("extractLabelsFromMessage", () => {
10
+
test("should extract labels from valid message", () => {
11
11
const message = {
12
12
op: 1,
13
13
t: "#labels",
···
21
21
],
22
22
};
23
23
24
-
const label = extractLabelFromMessage(message);
24
+
const labels = extractLabelsFromMessage(message);
25
25
26
-
expect(label).not.toBeNull();
27
-
expect(label?.val).toBe("spam");
28
-
expect(label?.src).toBe("did:plc:labeler");
26
+
expect(labels).toHaveLength(1);
27
+
expect(labels[0].val).toBe("spam");
28
+
expect(labels[0].src).toBe("did:plc:labeler");
29
29
});
30
30
31
-
test("should return null for non-label messages", () => {
31
+
test("should return empty array for non-label messages", () => {
32
32
const message = {
33
33
op: 1,
34
34
t: "#info",
35
35
};
36
36
37
-
const label = extractLabelFromMessage(message);
37
+
const labels = extractLabelsFromMessage(message);
38
38
39
-
expect(label).toBeNull();
39
+
expect(labels).toHaveLength(0);
40
40
});
41
41
42
-
test("should return null for messages with wrong op", () => {
42
+
test("should extract all labels from message with multiple labels", () => {
43
43
const message = {
44
-
op: 0,
44
+
op: 1,
45
45
t: "#labels",
46
46
labels: [
47
47
{
···
50
50
val: "spam",
51
51
cts: "2025-01-15T12:00:00Z",
52
52
},
53
+
{
54
+
src: "did:plc:labeler",
55
+
uri: "at://did:plc:user/app.bsky.feed.post/456",
56
+
val: "csam",
57
+
cts: "2025-01-15T12:01:00Z",
58
+
},
53
59
],
54
60
};
55
61
56
-
const label = extractLabelFromMessage(message);
62
+
const labels = extractLabelsFromMessage(message);
57
63
58
-
expect(label).toBeNull();
64
+
expect(labels).toHaveLength(2);
65
+
expect(labels[0].val).toBe("spam");
66
+
expect(labels[1].val).toBe("csam");
59
67
});
60
68
61
-
test("should return null for messages with empty labels array", () => {
69
+
test("should return empty array for messages with empty labels array", () => {
62
70
const message = {
63
71
op: 1,
64
72
t: "#labels",
65
73
labels: [],
66
74
};
67
75
68
-
const label = extractLabelFromMessage(message);
76
+
const labels = extractLabelsFromMessage(message);
69
77
70
-
expect(label).toBeNull();
78
+
expect(labels).toHaveLength(0);
71
79
});
72
80
});
73
81
+220
tests/unit/subscriber.test.ts
+220
tests/unit/subscriber.test.ts
···
1
+
import { describe, test, expect, beforeEach, afterEach, mock } from "bun:test";
2
+
import { FirehoseSubscriber } from "../../src/firehose/subscriber.js";
3
+
import { EventEmitter } from "events";
4
+
import WebSocket from "ws";
5
+
6
+
// Mock WebSocket class
7
+
class MockWebSocket extends EventEmitter {
8
+
constructor(url: string) {
9
+
super();
10
+
}
11
+
close() {}
12
+
}
13
+
14
+
// Mock the entire 'ws' module
15
+
mock.module("ws", () => ({
16
+
default: MockWebSocket,
17
+
}));
18
+
19
+
describe("FirehoseSubscriber", () => {
20
+
let subscriber: FirehoseSubscriber;
21
+
let mockWsInstance: MockWebSocket;
22
+
23
+
beforeEach(() => {
24
+
subscriber = new FirehoseSubscriber();
25
+
// Mock the connect method to control the WebSocket instance
26
+
(subscriber as any).connect = () => {
27
+
const url = new URL("ws://localhost:1234");
28
+
if ((subscriber as any).cursor !== null) {
29
+
url.searchParams.set("cursor", (subscriber as any).cursor.toString());
30
+
}
31
+
const ws = new WebSocket(url.toString());
32
+
(subscriber as any).ws = ws;
33
+
mockWsInstance = ws as any;
34
+
35
+
ws.on("open", () => {
36
+
subscriber.emit("connected");
37
+
});
38
+
ws.on("message", async (data: Buffer) => {
39
+
try {
40
+
const message = JSON.parse(data.toString());
41
+
if (message.seq) {
42
+
await (subscriber as any).saveCursor(message.seq);
43
+
}
44
+
if (message.t === "#labels") {
45
+
for (const label of message.labels) {
46
+
subscriber.emit("label", label);
47
+
}
48
+
}
49
+
} catch (error) {
50
+
subscriber.emit("error", error);
51
+
}
52
+
});
53
+
ws.on("close", () => {
54
+
(subscriber as any).ws = null;
55
+
subscriber.emit("disconnected");
56
+
if ((subscriber as any).shouldReconnect) {
57
+
(subscriber as any).scheduleReconnect();
58
+
}
59
+
});
60
+
ws.on("error", (error) => {
61
+
subscriber.emit("error", error);
62
+
});
63
+
};
64
+
});
65
+
66
+
afterEach(() => {
67
+
subscriber.stop();
68
+
});
69
+
70
+
test("should attempt to connect on start", async (done) => {
71
+
subscriber.on("connected", () => {
72
+
done();
73
+
});
74
+
await subscriber.start();
75
+
mockWsInstance.emit("open");
76
+
});
77
+
78
+
test("should emit label event on label for post", async (done) => {
79
+
subscriber.on("label", (label) => {
80
+
expect(label.uri).toBe("at://did:plc:user/app.bsky.feed.post/123");
81
+
done();
82
+
});
83
+
await subscriber.start();
84
+
mockWsInstance.emit(
85
+
"message",
86
+
Buffer.from(
87
+
JSON.stringify({
88
+
op: 1,
89
+
t: "#labels",
90
+
labels: [
91
+
{
92
+
src: "did:plc:labeler",
93
+
uri: "at://did:plc:user/app.bsky.feed.post/123",
94
+
val: "spam",
95
+
cts: "2025-01-15T12:00:00Z",
96
+
},
97
+
],
98
+
})
99
+
)
100
+
);
101
+
});
102
+
103
+
test("should emit label event on label for profile", async (done) => {
104
+
subscriber.on("label", (label) => {
105
+
expect(label.uri).toBe("did:plc:user");
106
+
done();
107
+
});
108
+
await subscriber.start();
109
+
mockWsInstance.emit(
110
+
"message",
111
+
Buffer.from(
112
+
JSON.stringify({
113
+
op: 1,
114
+
t: "#labels",
115
+
labels: [
116
+
{
117
+
src: "did:plc:labeler",
118
+
uri: "did:plc:user",
119
+
val: "spam",
120
+
cts: "2025-01-15T12:00:00Z",
121
+
},
122
+
],
123
+
})
124
+
)
125
+
);
126
+
});
127
+
128
+
test("should handle multiple labels in one message", async () => {
129
+
let labelCount = 0;
130
+
subscriber.on("label", () => {
131
+
labelCount++;
132
+
});
133
+
await subscriber.start();
134
+
mockWsInstance.emit(
135
+
"message",
136
+
Buffer.from(
137
+
JSON.stringify({
138
+
op: 1,
139
+
t: "#labels",
140
+
labels: [
141
+
{
142
+
src: "did:plc:labeler",
143
+
uri: "at://did:plc:user/app.bsky.feed.post/123",
144
+
val: "spam",
145
+
cts: "2025-01-15T12:00:00Z",
146
+
},
147
+
{
148
+
src: "did:plc:labeler",
149
+
uri: "did:plc:user",
150
+
val: "spam",
151
+
cts: "2025-01-15T12:00:00Z",
152
+
},
153
+
],
154
+
})
155
+
)
156
+
);
157
+
expect(labelCount).toBe(2);
158
+
});
159
+
160
+
test("should attempt to reconnect on close", (done) => {
161
+
let connectAttempts = 0;
162
+
(subscriber as any).connect = () => {
163
+
connectAttempts++;
164
+
if (connectAttempts > 1) {
165
+
done();
166
+
}
167
+
const url = new URL("ws://localhost:1234");
168
+
const ws = new WebSocket(url.toString());
169
+
(subscriber as any).ws = ws;
170
+
mockWsInstance = ws as any;
171
+
ws.on("close", () => {
172
+
(subscriber as any).ws = null;
173
+
subscriber.emit("disconnected");
174
+
if ((subscriber as any).shouldReconnect) {
175
+
(subscriber as any).scheduleReconnect();
176
+
}
177
+
});
178
+
};
179
+
180
+
subscriber.start();
181
+
mockWsInstance.emit("close");
182
+
});
183
+
184
+
test("should stop reconnecting after stop() is called", async (done) => {
185
+
let connectAttempts = 0;
186
+
(subscriber as any).connect = () => {
187
+
connectAttempts++;
188
+
const url = new URL("ws://localhost:1234");
189
+
const ws = new WebSocket(url.toString());
190
+
(subscriber as any).ws = ws;
191
+
mockWsInstance = ws as any;
192
+
ws.on("close", () => {
193
+
(subscriber as any).ws = null;
194
+
subscriber.emit("disconnected");
195
+
if ((subscriber as any).shouldReconnect) {
196
+
(subscriber as any).scheduleReconnect();
197
+
}
198
+
});
199
+
};
200
+
201
+
await subscriber.start();
202
+
subscriber.stop();
203
+
mockWsInstance.emit("close");
204
+
205
+
setTimeout(() => {
206
+
expect(connectAttempts).toBe(1);
207
+
done();
208
+
}, 2000);
209
+
});
210
+
211
+
test("should increase backoff delay on multiple reconnects", () => {
212
+
subscriber.start();
213
+
(subscriber as any).reconnectAttempts = 0;
214
+
(subscriber as any).scheduleReconnect();
215
+
const initialBackoff = (subscriber as any).reconnectAttempts;
216
+
(subscriber as any).scheduleReconnect();
217
+
const secondBackoff = (subscriber as any).reconnectAttempts;
218
+
expect(secondBackoff).toBeGreaterThan(initialBackoff);
219
+
});
220
+
});