+2
-1
.claude/settings.local.json
+2
-1
.claude/settings.local.json
+3
.env.example
+3
.env.example
···
6
6
PDS=bsky.social
7
7
WSS_URL=wss://your-labeler-service.com/xrpc/com.atproto.label.subscribeLabels
8
8
9
+
# PLC Directory (for DID resolution)
10
+
PLC_ENDPOINT=https://plc.wtf
11
+
9
12
# Blob & Image Handling
10
13
HYDRATE_BLOBS=false # Set to true to download images/videos
11
14
BLOB_STORAGE_TYPE=local # 'local' or 's3'
+164
docs/profile-blob-hydration.md
+164
docs/profile-blob-hydration.md
···
1
+
# Profile Blob Hydration - Implementation Notes
2
+
3
+
## Overview
4
+
5
+
This document captures key learnings from implementing avatar and banner blob hydration for Bluesky profiles.
6
+
7
+
## Key Discoveries
8
+
9
+
### 1. CID Deserialization in @atproto/api
10
+
11
+
The `@atproto/api` library deserializes blob references from their JSON `$link` representation into CID class objects.
12
+
13
+
**Raw JSON from API:**
14
+
```json
15
+
{
16
+
"avatar": {
17
+
"$type": "blob",
18
+
"ref": {
19
+
"$link": "bafkreigg3s6plegjncmxubeufbohj3qasbm4r23q2x7zlivdhccfqfypve"
20
+
},
21
+
"mimeType": "image/jpeg",
22
+
"size": 101770
23
+
}
24
+
}
25
+
```
26
+
27
+
**What you get in TypeScript:**
28
+
```typescript
29
+
record.avatar.ref // CID object with { code, version, hash, ... }
30
+
```
31
+
32
+
**Solution:**
33
+
```typescript
34
+
const cid = record.avatar.ref.toString(); // "bafkrei..."
35
+
```
36
+
37
+
### 2. PDS Endpoint Resolution
38
+
39
+
Users can be on different Personal Data Servers (PDS), not just `bsky.social`. Blobs must be fetched from the user's actual PDS.
40
+
41
+
**Process:**
42
+
1. Query PLC directory for DID document: `https://plc.wtf/${did}`
43
+
2. Find service with `id: "#atproto_pds"` and `type: "AtprotoPersonalDataServer"`
44
+
3. Extract `serviceEndpoint` URL
45
+
4. Use that endpoint for `com.atproto.sync.getBlob`
46
+
47
+
**Example:**
48
+
```typescript
49
+
const didDoc = await fetch(`https://plc.wtf/${did}`).then(r => r.json());
50
+
const pdsService = didDoc.service?.find(s =>
51
+
s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer"
52
+
);
53
+
const pdsEndpoint = pdsService.serviceEndpoint; // e.g., "https://waxcap.us-west.host.bsky.network"
54
+
```
55
+
56
+
### 3. Correct Blob Fetching
57
+
58
+
**Don't use CDN paths** - they don't work reliably for all blobs and require authentication context.
59
+
60
+
**Use the AT Protocol API:**
61
+
```typescript
62
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
63
+
const response = await fetch(blobUrl);
64
+
const blobData = Buffer.from(await response.arrayBuffer());
65
+
```
66
+
67
+
### 4. Database Schema Design
68
+
69
+
**Separate tables for different blob types:**
70
+
71
+
- `blobs` table: Post images with FK to `posts(uri)`
72
+
- `profile_blobs` table: Avatars/banners with FK to `profiles(did)`
73
+
74
+
This allows proper relational queries and analysis.
75
+
76
+
**Profile blobs schema:**
77
+
```sql
78
+
CREATE TABLE profile_blobs (
79
+
did TEXT NOT NULL,
80
+
blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')),
81
+
blob_cid TEXT NOT NULL,
82
+
sha256 TEXT NOT NULL,
83
+
phash TEXT,
84
+
storage_path TEXT,
85
+
mimetype TEXT,
86
+
captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87
+
PRIMARY KEY (did, blob_type, captured_at),
88
+
FOREIGN KEY (did) REFERENCES profiles(did)
89
+
);
90
+
```
91
+
92
+
### 5. Change Tracking
93
+
94
+
Including `captured_at` in the primary key allows tracking when users change their avatars/banners.
95
+
96
+
**Query latest state:**
97
+
```sql
98
+
SELECT * FROM profile_blobs
99
+
WHERE did = ? AND blob_type = ?
100
+
ORDER BY captured_at DESC
101
+
LIMIT 1
102
+
```
103
+
104
+
**Only insert if changed:**
105
+
```typescript
106
+
const latest = await findLatestByDidAndType(did, type);
107
+
if (latest && latest.blob_cid === cid) {
108
+
return; // No change, skip
109
+
}
110
+
// Insert new row with current timestamp
111
+
```
112
+
113
+
### 6. Sentinel Values for Missing Data
114
+
115
+
Use empty string (`""`) to distinguish "we checked, user has no avatar" from NULL "we haven't checked yet".
116
+
117
+
```typescript
118
+
if (record.avatar?.ref) {
119
+
avatarCid = record.avatar.ref.toString();
120
+
} else {
121
+
avatarCid = ""; // Explicitly checked, not present
122
+
}
123
+
```
124
+
125
+
This prevents infinite re-hydration loops for profiles without avatars.
126
+
127
+
### 7. Profile Re-hydration Logic
128
+
129
+
```typescript
130
+
const existingProfile = await findByDid(did);
131
+
const needsRehydration = existingProfile &&
132
+
(existingProfile.avatar_cid === null || existingProfile.banner_cid === null);
133
+
134
+
if (existingProfile && !needsRehydration) {
135
+
return; // Skip
136
+
}
137
+
```
138
+
139
+
## Configuration
140
+
141
+
- `PLC_ENDPOINT`: DID resolution endpoint (default: `https://plc.wtf`)
142
+
- Can be changed to `https://plc.directory` or custom instance
143
+
- plc.wtf is faster but unofficial
144
+
145
+
## Common Errors
146
+
147
+
### "RepoNotFound"
148
+
- **Cause:** Querying wrong PDS endpoint
149
+
- **Solution:** Resolve correct PDS from DID document
150
+
151
+
### Foreign Key Constraint Violation
152
+
- **Cause:** Trying to insert profile blobs into `blobs` table
153
+
- **Solution:** Use separate `profile_blobs` table
154
+
155
+
### Missing CIDs Despite API Returning Them
156
+
- **Cause:** Trying to access `ref.$link` when ref is a CID object
157
+
- **Solution:** Call `.toString()` on the CID object
158
+
159
+
## Related Files
160
+
161
+
- `src/hydration/profiles.service.ts` - Main hydration logic
162
+
- `src/database/profile-blobs.repository.ts` - Profile blob persistence
163
+
- `src/database/schema.ts` - Table definitions
164
+
- `src/config/index.ts` - PLC endpoint configuration
+10
-2
src/blobs/processor.ts
+10
-2
src/blobs/processor.ts
···
116
116
postUri: string,
117
117
ref: BlobReference
118
118
): Promise<void> {
119
-
const existing = await this.blobsRepo.findBySha256(ref.cid);
119
+
const existing = await this.blobsRepo.findByCid(ref.cid);
120
120
if (existing) {
121
+
await this.blobsRepo.insert({
122
+
post_uri: postUri,
123
+
blob_cid: ref.cid,
124
+
sha256: existing.sha256,
125
+
phash: existing.phash,
126
+
storage_path: existing.storage_path,
127
+
mimetype: existing.mimetype,
128
+
});
121
129
logger.debug(
122
130
{ postUri, cid: ref.cid },
123
-
"Blob already processed, skipping"
131
+
"Blob already processed, reusing hashes"
124
132
);
125
133
return;
126
134
}
+6
src/config/index.ts
+6
src/config/index.ts
···
9
9
password: z.string().min(1, "BSKY_PASSWORD is required"),
10
10
pds: z.string().default("bsky.social"),
11
11
}),
12
+
plc: z.object({
13
+
endpoint: z.string().url().default("https://plc.wtf"),
14
+
}),
12
15
labeler: z.object({
13
16
wssUrl: z.string().url("WSS_URL must be a valid URL"),
14
17
}),
···
42
45
handle: process.env.BSKY_HANDLE,
43
46
password: process.env.BSKY_PASSWORD,
44
47
pds: process.env.PDS,
48
+
},
49
+
plc: {
50
+
endpoint: process.env.PLC_ENDPOINT,
45
51
},
46
52
labeler: {
47
53
wssUrl: process.env.WSS_URL,
+17
src/database/blobs.repository.ts
+17
src/database/blobs.repository.ts
···
103
103
);
104
104
});
105
105
}
106
+
107
+
async findByCid(cid: string): Promise<Blob | null> {
108
+
return new Promise((resolve, reject) => {
109
+
this.db.all(
110
+
`SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`,
111
+
cid,
112
+
(err, rows: Blob[]) => {
113
+
if (err) {
114
+
logger.error({ err, cid }, "Failed to find blob by CID");
115
+
reject(err);
116
+
return;
117
+
}
118
+
resolve(rows?.[0] || null);
119
+
}
120
+
);
121
+
});
122
+
}
106
123
}
+123
src/database/profile-blobs.repository.ts
+123
src/database/profile-blobs.repository.ts
···
1
+
import { Database } from "duckdb";
2
+
import { logger } from "../logger/index.js";
3
+
4
+
export interface ProfileBlob {
5
+
did: string;
6
+
blob_type: "avatar" | "banner";
7
+
blob_cid: string;
8
+
sha256: string;
9
+
phash?: string;
10
+
storage_path?: string;
11
+
mimetype?: string;
12
+
captured_at?: Date;
13
+
}
14
+
15
+
export class ProfileBlobsRepository {
16
+
constructor(private db: Database) {}
17
+
18
+
async insert(blob: ProfileBlob): Promise<void> {
19
+
return new Promise((resolve, reject) => {
20
+
this.db.prepare(
21
+
`
22
+
INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at)
23
+
VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP))
24
+
`,
25
+
(err, stmt) => {
26
+
if (err) {
27
+
logger.error({ err }, "Failed to prepare profile blob insert statement");
28
+
reject(err);
29
+
return;
30
+
}
31
+
32
+
stmt.run(
33
+
blob.did,
34
+
blob.blob_type,
35
+
blob.blob_cid,
36
+
blob.sha256,
37
+
blob.phash || null,
38
+
blob.storage_path || null,
39
+
blob.mimetype || null,
40
+
blob.captured_at || null,
41
+
(err) => {
42
+
if (err) {
43
+
logger.error({ err, blob }, "Failed to insert profile blob");
44
+
reject(err);
45
+
return;
46
+
}
47
+
resolve();
48
+
}
49
+
);
50
+
}
51
+
);
52
+
});
53
+
}
54
+
55
+
async findByDid(did: string): Promise<ProfileBlob[]> {
56
+
return new Promise((resolve, reject) => {
57
+
this.db.all(
58
+
`SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`,
59
+
did,
60
+
(err, rows: ProfileBlob[]) => {
61
+
if (err) {
62
+
logger.error({ err, did }, "Failed to find profile blobs by DID");
63
+
reject(err);
64
+
return;
65
+
}
66
+
resolve(rows || []);
67
+
}
68
+
);
69
+
});
70
+
}
71
+
72
+
async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> {
73
+
return new Promise((resolve, reject) => {
74
+
this.db.all(
75
+
`SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`,
76
+
did,
77
+
blobType,
78
+
(err, rows: ProfileBlob[]) => {
79
+
if (err) {
80
+
logger.error({ err, did, blobType }, "Failed to find latest profile blob");
81
+
reject(err);
82
+
return;
83
+
}
84
+
resolve(rows?.[0] || null);
85
+
}
86
+
);
87
+
});
88
+
}
89
+
90
+
async findBySha256(sha256: string): Promise<ProfileBlob | null> {
91
+
return new Promise((resolve, reject) => {
92
+
this.db.all(
93
+
`SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`,
94
+
sha256,
95
+
(err, rows: ProfileBlob[]) => {
96
+
if (err) {
97
+
logger.error({ err, sha256 }, "Failed to find profile blob by SHA256");
98
+
reject(err);
99
+
return;
100
+
}
101
+
resolve(rows?.[0] || null);
102
+
}
103
+
);
104
+
});
105
+
}
106
+
107
+
async findByPhash(phash: string): Promise<ProfileBlob[]> {
108
+
return new Promise((resolve, reject) => {
109
+
this.db.all(
110
+
`SELECT * FROM profile_blobs WHERE phash = $1`,
111
+
phash,
112
+
(err, rows: ProfileBlob[]) => {
113
+
if (err) {
114
+
logger.error({ err, phash }, "Failed to find profile blobs by pHash");
115
+
reject(err);
116
+
return;
117
+
}
118
+
resolve(rows || []);
119
+
}
120
+
);
121
+
});
122
+
}
123
+
}
+17
src/database/schema.ts
+17
src/database/schema.ts
···
51
51
FOREIGN KEY (post_uri) REFERENCES posts(uri)
52
52
);
53
53
54
+
-- Profile blobs table: stores avatar and banner blobs for profiles
55
+
CREATE TABLE IF NOT EXISTS profile_blobs (
56
+
did TEXT NOT NULL,
57
+
blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')),
58
+
blob_cid TEXT NOT NULL,
59
+
sha256 TEXT NOT NULL,
60
+
phash TEXT,
61
+
storage_path TEXT,
62
+
mimetype TEXT,
63
+
captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
64
+
PRIMARY KEY (did, blob_type, captured_at),
65
+
FOREIGN KEY (did) REFERENCES profiles(did)
66
+
);
67
+
54
68
-- Indexes for performance
55
69
CREATE INDEX IF NOT EXISTS idx_labels_uri ON labels(uri);
56
70
CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val);
57
71
CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts);
58
72
CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did);
73
+
CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid);
59
74
CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256);
60
75
CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash);
76
+
CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256);
77
+
CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash);
61
78
`;
62
79
63
80
async function migrateProfilesTable(): Promise<void> {
+5
-1
src/hydration/posts.service.ts
+5
-1
src/hydration/posts.service.ts
···
3
3
import { PostsRepository } from "../database/posts.repository.js";
4
4
import { BlobProcessor } from "../blobs/processor.js";
5
5
import { pRateLimit } from "p-ratelimit";
6
-
import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js";
6
+
import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
7
7
import { logger } from "../logger/index.js";
8
8
import { config } from "../config/index.js";
9
9
···
110
110
}
111
111
}
112
112
} catch (error) {
113
+
if (isRecordNotFoundError(error)) {
114
+
logger.warn({ uri }, "Post record not found, skipping");
115
+
return;
116
+
}
113
117
logger.error({ error, uri }, "Failed to hydrate post");
114
118
throw error;
115
119
}
+108
-35
src/hydration/profiles.service.ts
+108
-35
src/hydration/profiles.service.ts
···
1
1
import { AtpAgent } from "@atproto/api";
2
2
import { Database } from "duckdb";
3
3
import { ProfilesRepository } from "../database/profiles.repository.js";
4
-
import { BlobProcessor } from "../blobs/processor.js";
4
+
import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js";
5
+
import { computeBlobHashes } from "../blobs/hasher.js";
6
+
import { LocalBlobStorage } from "../blobs/storage/local.js";
7
+
import { S3BlobStorage } from "../blobs/storage/s3.js";
8
+
import { BlobStorage } from "../blobs/processor.js";
5
9
import { pRateLimit } from "p-ratelimit";
6
-
import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js";
10
+
import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
7
11
import { logger } from "../logger/index.js";
8
12
import { config } from "../config/index.js";
9
13
10
14
export class ProfileHydrationService {
11
15
private agent: AtpAgent;
12
16
private profilesRepo: ProfilesRepository;
13
-
private blobProcessor: BlobProcessor;
17
+
private profileBlobsRepo: ProfileBlobsRepository;
18
+
private storage: BlobStorage | null = null;
14
19
private limit: ReturnType<typeof pRateLimit>;
15
20
16
21
constructor(db: Database) {
17
22
this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` });
18
23
this.profilesRepo = new ProfilesRepository(db);
19
-
this.blobProcessor = new BlobProcessor(db, this.agent);
24
+
this.profileBlobsRepo = new ProfileBlobsRepository(db);
25
+
26
+
if (config.blobs.hydrateBlobs) {
27
+
if (config.blobs.storage.type === "s3") {
28
+
this.storage = new S3BlobStorage(
29
+
config.blobs.storage.s3Bucket!,
30
+
config.blobs.storage.s3Region!
31
+
);
32
+
} else {
33
+
this.storage = new LocalBlobStorage(
34
+
config.blobs.storage.localPath
35
+
);
36
+
}
37
+
}
38
+
20
39
this.limit = pRateLimit({
21
40
interval: 300000,
22
41
rate: 3000,
···
82
101
83
102
if (profileResponse.success && profileResponse.data.value) {
84
103
const record = profileResponse.data.value as any;
85
-
logger.debug({ did, record }, "Profile record structure");
86
104
displayName = record.displayName;
87
105
description = record.description;
88
106
89
-
if (record.avatar?.ref?.$link) {
90
-
avatarCid = record.avatar.ref.$link;
107
+
if (record.avatar?.ref) {
108
+
avatarCid = record.avatar.ref.toString();
91
109
} else {
92
110
avatarCid = "";
93
111
}
94
112
95
-
if (record.banner?.ref?.$link) {
96
-
bannerCid = record.banner.ref.$link;
113
+
if (record.banner?.ref) {
114
+
bannerCid = record.banner.ref.toString();
97
115
} else {
98
116
bannerCid = "";
99
117
}
···
136
154
137
155
if (avatarCid && avatarCid !== "") {
138
156
try {
139
-
await this.blobProcessor.processBlobs(`profile://${did}/avatar`, [
140
-
{
141
-
images: [
142
-
{
143
-
image: {
144
-
ref: { $link: avatarCid },
145
-
mimeType: "image/jpeg",
146
-
},
147
-
},
148
-
],
149
-
},
150
-
]);
157
+
await this.processProfileBlob(did, avatarCid, "avatar");
151
158
} catch (error) {
152
-
logger.warn({ error, did }, "Failed to process avatar blob");
159
+
logger.warn({ error, did, avatarCid }, "Failed to process avatar blob");
153
160
}
154
161
}
155
162
156
163
if (bannerCid && bannerCid !== "") {
157
164
try {
158
-
await this.blobProcessor.processBlobs(`profile://${did}/banner`, [
159
-
{
160
-
images: [
161
-
{
162
-
image: {
163
-
ref: { $link: bannerCid },
164
-
mimeType: "image/jpeg",
165
-
},
166
-
},
167
-
],
168
-
},
169
-
]);
165
+
await this.processProfileBlob(did, bannerCid, "banner");
170
166
} catch (error) {
171
-
logger.warn({ error, did }, "Failed to process banner blob");
167
+
logger.warn({ error, did, bannerCid }, "Failed to process banner blob");
172
168
}
173
169
}
174
170
175
171
logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully");
176
172
} catch (error) {
173
+
if (isRecordNotFoundError(error)) {
174
+
logger.warn({ did }, "Profile record not found, skipping");
175
+
return;
176
+
}
177
177
logger.error({ error, did }, "Failed to hydrate profile");
178
178
throw error;
179
179
}
180
+
}
181
+
182
+
private async resolvePds(did: string): Promise<string | null> {
183
+
try {
184
+
const didDocResponse = await fetch(`${config.plc.endpoint}/${did}`);
185
+
if (!didDocResponse.ok) {
186
+
logger.warn({ did, status: didDocResponse.status }, "Failed to fetch DID document");
187
+
return null;
188
+
}
189
+
190
+
const didDoc = await didDocResponse.json();
191
+
const pdsService = didDoc.service?.find((s: any) =>
192
+
s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer"
193
+
);
194
+
195
+
if (!pdsService?.serviceEndpoint) {
196
+
logger.warn({ did }, "No PDS endpoint found in DID document");
197
+
return null;
198
+
}
199
+
200
+
return pdsService.serviceEndpoint;
201
+
} catch (error) {
202
+
logger.error({ error, did }, "Failed to resolve PDS from DID");
203
+
return null;
204
+
}
205
+
}
206
+
207
+
private async processProfileBlob(
208
+
did: string,
209
+
cid: string,
210
+
type: "avatar" | "banner"
211
+
): Promise<void> {
212
+
const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type);
213
+
214
+
if (latestBlob && latestBlob.blob_cid === cid) {
215
+
logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping");
216
+
return;
217
+
}
218
+
219
+
const pdsEndpoint = await this.resolvePds(did);
220
+
if (!pdsEndpoint) {
221
+
logger.warn({ did, cid, type }, "Cannot fetch blob without PDS endpoint");
222
+
return;
223
+
}
224
+
225
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
226
+
const blobResponse = await fetch(blobUrl);
227
+
228
+
if (!blobResponse.ok) {
229
+
logger.warn({ did, cid, type, pdsEndpoint, status: blobResponse.status }, "Failed to fetch blob from PDS");
230
+
return;
231
+
}
232
+
233
+
const blobData = Buffer.from(await blobResponse.arrayBuffer());
234
+
235
+
let storagePath: string | undefined;
236
+
if (this.storage && config.blobs.hydrateBlobs) {
237
+
storagePath = await this.storage.store(cid, blobData, "image/jpeg");
238
+
}
239
+
240
+
const hashes = await computeBlobHashes(blobData, "image/jpeg");
241
+
242
+
await this.profileBlobsRepo.insert({
243
+
did,
244
+
blob_type: type,
245
+
blob_cid: cid,
246
+
sha256: hashes.sha256,
247
+
phash: hashes.phash,
248
+
storage_path: storagePath,
249
+
mimetype: "image/jpeg",
250
+
});
251
+
252
+
logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully");
180
253
}
181
254
}
+7
src/utils/retry.ts
+7
src/utils/retry.ts
···
93
93
export function isServerError(error: any): boolean {
94
94
return error?.status >= 500 && error?.status < 600;
95
95
}
96
+
97
+
export function isRecordNotFoundError(error: any): boolean {
98
+
return (
99
+
error?.error === "RecordNotFound" ||
100
+
error?.message?.includes("RecordNotFound")
101
+
);
102
+
}
+6
tests/integration/database.test.ts
+6
tests/integration/database.test.ts
···
195
195
const found = await blobsRepo.findByPhash("deadbeef");
196
196
expect(found.length).toBeGreaterThan(0);
197
197
});
198
+
199
+
test("should find blob by CID", async () => {
200
+
const found = await blobsRepo.findByCid("bafytest123");
201
+
expect(found).not.toBeNull();
202
+
expect(found?.sha256).toBe("abc123def456");
203
+
});
198
204
});
199
205
});
+220
tests/unit/subscriber.test.ts
+220
tests/unit/subscriber.test.ts
···
1
+
import { describe, test, expect, beforeEach, afterEach, mock } from "bun:test";
2
+
import { FirehoseSubscriber } from "../../src/firehose/subscriber.js";
3
+
import { EventEmitter } from "events";
4
+
import WebSocket from "ws";
5
+
6
+
// Mock WebSocket class
7
+
class MockWebSocket extends EventEmitter {
8
+
constructor(url: string) {
9
+
super();
10
+
}
11
+
close() {}
12
+
}
13
+
14
+
// Mock the entire 'ws' module
15
+
mock.module("ws", () => ({
16
+
default: MockWebSocket,
17
+
}));
18
+
19
+
describe("FirehoseSubscriber", () => {
20
+
let subscriber: FirehoseSubscriber;
21
+
let mockWsInstance: MockWebSocket;
22
+
23
+
beforeEach(() => {
24
+
subscriber = new FirehoseSubscriber();
25
+
// Mock the connect method to control the WebSocket instance
26
+
(subscriber as any).connect = () => {
27
+
const url = new URL("ws://localhost:1234");
28
+
if ((subscriber as any).cursor !== null) {
29
+
url.searchParams.set("cursor", (subscriber as any).cursor.toString());
30
+
}
31
+
const ws = new WebSocket(url.toString());
32
+
(subscriber as any).ws = ws;
33
+
mockWsInstance = ws as any;
34
+
35
+
ws.on("open", () => {
36
+
subscriber.emit("connected");
37
+
});
38
+
ws.on("message", async (data: Buffer) => {
39
+
try {
40
+
const message = JSON.parse(data.toString());
41
+
if (message.seq) {
42
+
await (subscriber as any).saveCursor(message.seq);
43
+
}
44
+
if (message.t === "#labels") {
45
+
for (const label of message.labels) {
46
+
subscriber.emit("label", label);
47
+
}
48
+
}
49
+
} catch (error) {
50
+
subscriber.emit("error", error);
51
+
}
52
+
});
53
+
ws.on("close", () => {
54
+
(subscriber as any).ws = null;
55
+
subscriber.emit("disconnected");
56
+
if ((subscriber as any).shouldReconnect) {
57
+
(subscriber as any).scheduleReconnect();
58
+
}
59
+
});
60
+
ws.on("error", (error) => {
61
+
subscriber.emit("error", error);
62
+
});
63
+
};
64
+
});
65
+
66
+
afterEach(() => {
67
+
subscriber.stop();
68
+
});
69
+
70
+
test("should attempt to connect on start", async (done) => {
71
+
subscriber.on("connected", () => {
72
+
done();
73
+
});
74
+
await subscriber.start();
75
+
mockWsInstance.emit("open");
76
+
});
77
+
78
+
test("should emit label event on label for post", async (done) => {
79
+
subscriber.on("label", (label) => {
80
+
expect(label.uri).toBe("at://did:plc:user/app.bsky.feed.post/123");
81
+
done();
82
+
});
83
+
await subscriber.start();
84
+
mockWsInstance.emit(
85
+
"message",
86
+
Buffer.from(
87
+
JSON.stringify({
88
+
op: 1,
89
+
t: "#labels",
90
+
labels: [
91
+
{
92
+
src: "did:plc:labeler",
93
+
uri: "at://did:plc:user/app.bsky.feed.post/123",
94
+
val: "spam",
95
+
cts: "2025-01-15T12:00:00Z",
96
+
},
97
+
],
98
+
})
99
+
)
100
+
);
101
+
});
102
+
103
+
test("should emit label event on label for profile", async (done) => {
104
+
subscriber.on("label", (label) => {
105
+
expect(label.uri).toBe("did:plc:user");
106
+
done();
107
+
});
108
+
await subscriber.start();
109
+
mockWsInstance.emit(
110
+
"message",
111
+
Buffer.from(
112
+
JSON.stringify({
113
+
op: 1,
114
+
t: "#labels",
115
+
labels: [
116
+
{
117
+
src: "did:plc:labeler",
118
+
uri: "did:plc:user",
119
+
val: "spam",
120
+
cts: "2025-01-15T12:00:00Z",
121
+
},
122
+
],
123
+
})
124
+
)
125
+
);
126
+
});
127
+
128
+
test("should handle multiple labels in one message", async () => {
129
+
let labelCount = 0;
130
+
subscriber.on("label", () => {
131
+
labelCount++;
132
+
});
133
+
await subscriber.start();
134
+
mockWsInstance.emit(
135
+
"message",
136
+
Buffer.from(
137
+
JSON.stringify({
138
+
op: 1,
139
+
t: "#labels",
140
+
labels: [
141
+
{
142
+
src: "did:plc:labeler",
143
+
uri: "at://did:plc:user/app.bsky.feed.post/123",
144
+
val: "spam",
145
+
cts: "2025-01-15T12:00:00Z",
146
+
},
147
+
{
148
+
src: "did:plc:labeler",
149
+
uri: "did:plc:user",
150
+
val: "spam",
151
+
cts: "2025-01-15T12:00:00Z",
152
+
},
153
+
],
154
+
})
155
+
)
156
+
);
157
+
expect(labelCount).toBe(2);
158
+
});
159
+
160
+
test("should attempt to reconnect on close", (done) => {
161
+
let connectAttempts = 0;
162
+
(subscriber as any).connect = () => {
163
+
connectAttempts++;
164
+
if (connectAttempts > 1) {
165
+
done();
166
+
}
167
+
const url = new URL("ws://localhost:1234");
168
+
const ws = new WebSocket(url.toString());
169
+
(subscriber as any).ws = ws;
170
+
mockWsInstance = ws as any;
171
+
ws.on("close", () => {
172
+
(subscriber as any).ws = null;
173
+
subscriber.emit("disconnected");
174
+
if ((subscriber as any).shouldReconnect) {
175
+
(subscriber as any).scheduleReconnect();
176
+
}
177
+
});
178
+
};
179
+
180
+
subscriber.start();
181
+
mockWsInstance.emit("close");
182
+
});
183
+
184
+
test("should stop reconnecting after stop() is called", async (done) => {
185
+
let connectAttempts = 0;
186
+
(subscriber as any).connect = () => {
187
+
connectAttempts++;
188
+
const url = new URL("ws://localhost:1234");
189
+
const ws = new WebSocket(url.toString());
190
+
(subscriber as any).ws = ws;
191
+
mockWsInstance = ws as any;
192
+
ws.on("close", () => {
193
+
(subscriber as any).ws = null;
194
+
subscriber.emit("disconnected");
195
+
if ((subscriber as any).shouldReconnect) {
196
+
(subscriber as any).scheduleReconnect();
197
+
}
198
+
});
199
+
};
200
+
201
+
await subscriber.start();
202
+
subscriber.stop();
203
+
mockWsInstance.emit("close");
204
+
205
+
setTimeout(() => {
206
+
expect(connectAttempts).toBe(1);
207
+
done();
208
+
}, 2000);
209
+
});
210
+
211
+
test("should increase backoff delay on multiple reconnects", () => {
212
+
subscriber.start();
213
+
(subscriber as any).reconnectAttempts = 0;
214
+
(subscriber as any).scheduleReconnect();
215
+
const initialBackoff = (subscriber as any).reconnectAttempts;
216
+
(subscriber as any).scheduleReconnect();
217
+
const secondBackoff = (subscriber as any).reconnectAttempts;
218
+
expect(secondBackoff).toBeGreaterThan(initialBackoff);
219
+
});
220
+
});