+2
-1
.claude/settings.local.json
+2
-1
.claude/settings.local.json
+164
docs/profile-blob-hydration.md
+164
docs/profile-blob-hydration.md
···
1
+
# Profile Blob Hydration - Implementation Notes
2
+
3
+
## Overview
4
+
5
+
This document captures key learnings from implementing avatar and banner blob hydration for Bluesky profiles.
6
+
7
+
## Key Discoveries
8
+
9
+
### 1. CID Deserialization in @atproto/api
10
+
11
+
The `@atproto/api` library deserializes blob references from their JSON `$link` representation into CID class objects.
12
+
13
+
**Raw JSON from API:**
14
+
```json
15
+
{
16
+
"avatar": {
17
+
"$type": "blob",
18
+
"ref": {
19
+
"$link": "bafkreigg3s6plegjncmxubeufbohj3qasbm4r23q2x7zlivdhccfqfypve"
20
+
},
21
+
"mimeType": "image/jpeg",
22
+
"size": 101770
23
+
}
24
+
}
25
+
```
26
+
27
+
**What you get in TypeScript:**
28
+
```typescript
29
+
record.avatar.ref // CID object with { code, version, hash, ... }
30
+
```
31
+
32
+
**Solution:**
33
+
```typescript
34
+
const cid = record.avatar.ref.toString(); // "bafkrei..."
35
+
```
36
+
37
+
### 2. PDS Endpoint Resolution
38
+
39
+
Users can be on different Personal Data Servers (PDS), not just `bsky.social`. Blobs must be fetched from the user's actual PDS.
40
+
41
+
**Process:**
42
+
1. Query PLC directory for DID document: `https://plc.wtf/${did}`
43
+
2. Find service with `id: "#atproto_pds"` and `type: "AtprotoPersonalDataServer"`
44
+
3. Extract `serviceEndpoint` URL
45
+
4. Use that endpoint for `com.atproto.sync.getBlob`
46
+
47
+
**Example:**
48
+
```typescript
49
+
const didDoc = await fetch(`https://plc.wtf/${did}`).then(r => r.json());
50
+
const pdsService = didDoc.service?.find(s =>
51
+
s.id === "#atproto_pds" && s.type === "AtprotoPersonalDataServer"
52
+
);
53
+
const pdsEndpoint = pdsService.serviceEndpoint; // e.g., "https://waxcap.us-west.host.bsky.network"
54
+
```
55
+
56
+
### 3. Correct Blob Fetching
57
+
58
+
**Don't use CDN paths** - they don't work reliably for all blobs and require authentication context.
59
+
60
+
**Use the AT Protocol API:**
61
+
```typescript
62
+
const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`;
63
+
const response = await fetch(blobUrl);
64
+
const blobData = Buffer.from(await response.arrayBuffer());
65
+
```
66
+
67
+
### 4. Database Schema Design
68
+
69
+
**Separate tables for different blob types:**
70
+
71
+
- `blobs` table: Post images with FK to `posts(uri)`
72
+
- `profile_blobs` table: Avatars/banners with FK to `profiles(did)`
73
+
74
+
This allows proper relational queries and analysis.
75
+
76
+
**Profile blobs schema:**
77
+
```sql
78
+
CREATE TABLE profile_blobs (
79
+
did TEXT NOT NULL,
80
+
blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')),
81
+
blob_cid TEXT NOT NULL,
82
+
sha256 TEXT NOT NULL,
83
+
phash TEXT,
84
+
storage_path TEXT,
85
+
mimetype TEXT,
86
+
captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87
+
PRIMARY KEY (did, blob_type, captured_at),
88
+
FOREIGN KEY (did) REFERENCES profiles(did)
89
+
);
90
+
```
91
+
92
+
### 5. Change Tracking
93
+
94
+
Including `captured_at` in the primary key allows tracking when users change their avatars/banners.
95
+
96
+
**Query latest state:**
97
+
```sql
98
+
SELECT * FROM profile_blobs
99
+
WHERE did = ? AND blob_type = ?
100
+
ORDER BY captured_at DESC
101
+
LIMIT 1
102
+
```
103
+
104
+
**Only insert if changed:**
105
+
```typescript
106
+
const latest = await findLatestByDidAndType(did, type);
107
+
if (latest && latest.blob_cid === cid) {
108
+
return; // No change, skip
109
+
}
110
+
// Insert new row with current timestamp
111
+
```
112
+
113
+
### 6. Sentinel Values for Missing Data
114
+
115
+
Use empty string (`""`) to distinguish "we checked, user has no avatar" from NULL "we haven't checked yet".
116
+
117
+
```typescript
118
+
if (record.avatar?.ref) {
119
+
avatarCid = record.avatar.ref.toString();
120
+
} else {
121
+
avatarCid = ""; // Explicitly checked, not present
122
+
}
123
+
```
124
+
125
+
This prevents infinite re-hydration loops for profiles without avatars.
126
+
127
+
### 7. Profile Re-hydration Logic
128
+
129
+
```typescript
130
+
const existingProfile = await findByDid(did);
131
+
const needsRehydration = existingProfile &&
132
+
(existingProfile.avatar_cid === null || existingProfile.banner_cid === null);
133
+
134
+
if (existingProfile && !needsRehydration) {
135
+
return; // Skip
136
+
}
137
+
```
138
+
139
+
## Configuration
140
+
141
+
- `PLC_ENDPOINT`: DID resolution endpoint (default: `https://plc.wtf`)
142
+
- Can be changed to `https://plc.directory` or custom instance
143
+
- plc.wtf is faster but unofficial
144
+
145
+
## Common Errors
146
+
147
+
### "RepoNotFound"
148
+
- **Cause:** Querying wrong PDS endpoint
149
+
- **Solution:** Resolve correct PDS from DID document
150
+
151
+
### Foreign Key Constraint Violation
152
+
- **Cause:** Trying to insert profile blobs into `blobs` table
153
+
- **Solution:** Use separate `profile_blobs` table
154
+
155
+
### Missing CIDs Despite API Returning Them
156
+
- **Cause:** Trying to access `ref.$link` when ref is a CID object
157
+
- **Solution:** Call `.toString()` on the CID object
158
+
159
+
## Related Files
160
+
161
+
- `src/hydration/profiles.service.ts` - Main hydration logic
162
+
- `src/database/profile-blobs.repository.ts` - Profile blob persistence
163
+
- `src/database/schema.ts` - Table definitions
164
+
- `src/config/index.ts` - PLC endpoint configuration
+10
-2
src/blobs/processor.ts
+10
-2
src/blobs/processor.ts
···
116
116
postUri: string,
117
117
ref: BlobReference
118
118
): Promise<void> {
119
-
const existing = await this.blobsRepo.findBySha256(ref.cid);
119
+
const existing = await this.blobsRepo.findByCid(ref.cid);
120
120
if (existing) {
121
+
await this.blobsRepo.insert({
122
+
post_uri: postUri,
123
+
blob_cid: ref.cid,
124
+
sha256: existing.sha256,
125
+
phash: existing.phash,
126
+
storage_path: existing.storage_path,
127
+
mimetype: existing.mimetype,
128
+
});
121
129
logger.debug(
122
130
{ postUri, cid: ref.cid },
123
-
"Blob already processed, skipping"
131
+
"Blob already processed, reusing hashes"
124
132
);
125
133
return;
126
134
}
+17
src/database/blobs.repository.ts
+17
src/database/blobs.repository.ts
···
103
103
);
104
104
});
105
105
}
106
+
107
+
async findByCid(cid: string): Promise<Blob | null> {
108
+
return new Promise((resolve, reject) => {
109
+
this.db.all(
110
+
`SELECT * FROM blobs WHERE blob_cid = $1 LIMIT 1`,
111
+
cid,
112
+
(err, rows: Blob[]) => {
113
+
if (err) {
114
+
logger.error({ err, cid }, "Failed to find blob by CID");
115
+
reject(err);
116
+
return;
117
+
}
118
+
resolve(rows?.[0] || null);
119
+
}
120
+
);
121
+
});
122
+
}
106
123
}
+123
src/database/profile-blobs.repository.ts
+123
src/database/profile-blobs.repository.ts
···
1
+
import { Database } from "duckdb";
2
+
import { logger } from "../logger/index.js";
3
+
4
+
export interface ProfileBlob {
5
+
did: string;
6
+
blob_type: "avatar" | "banner";
7
+
blob_cid: string;
8
+
sha256: string;
9
+
phash?: string;
10
+
storage_path?: string;
11
+
mimetype?: string;
12
+
captured_at?: Date;
13
+
}
14
+
15
+
export class ProfileBlobsRepository {
16
+
constructor(private db: Database) {}
17
+
18
+
async insert(blob: ProfileBlob): Promise<void> {
19
+
return new Promise((resolve, reject) => {
20
+
this.db.prepare(
21
+
`
22
+
INSERT INTO profile_blobs (did, blob_type, blob_cid, sha256, phash, storage_path, mimetype, captured_at)
23
+
VALUES ($1, $2, $3, $4, $5, $6, $7, COALESCE($8, CURRENT_TIMESTAMP))
24
+
`,
25
+
(err, stmt) => {
26
+
if (err) {
27
+
logger.error({ err }, "Failed to prepare profile blob insert statement");
28
+
reject(err);
29
+
return;
30
+
}
31
+
32
+
stmt.run(
33
+
blob.did,
34
+
blob.blob_type,
35
+
blob.blob_cid,
36
+
blob.sha256,
37
+
blob.phash || null,
38
+
blob.storage_path || null,
39
+
blob.mimetype || null,
40
+
blob.captured_at || null,
41
+
(err) => {
42
+
if (err) {
43
+
logger.error({ err, blob }, "Failed to insert profile blob");
44
+
reject(err);
45
+
return;
46
+
}
47
+
resolve();
48
+
}
49
+
);
50
+
}
51
+
);
52
+
});
53
+
}
54
+
55
+
async findByDid(did: string): Promise<ProfileBlob[]> {
56
+
return new Promise((resolve, reject) => {
57
+
this.db.all(
58
+
`SELECT * FROM profile_blobs WHERE did = $1 ORDER BY captured_at DESC`,
59
+
did,
60
+
(err, rows: ProfileBlob[]) => {
61
+
if (err) {
62
+
logger.error({ err, did }, "Failed to find profile blobs by DID");
63
+
reject(err);
64
+
return;
65
+
}
66
+
resolve(rows || []);
67
+
}
68
+
);
69
+
});
70
+
}
71
+
72
+
async findLatestByDidAndType(did: string, blobType: "avatar" | "banner"): Promise<ProfileBlob | null> {
73
+
return new Promise((resolve, reject) => {
74
+
this.db.all(
75
+
`SELECT * FROM profile_blobs WHERE did = $1 AND blob_type = $2 ORDER BY captured_at DESC LIMIT 1`,
76
+
did,
77
+
blobType,
78
+
(err, rows: ProfileBlob[]) => {
79
+
if (err) {
80
+
logger.error({ err, did, blobType }, "Failed to find latest profile blob");
81
+
reject(err);
82
+
return;
83
+
}
84
+
resolve(rows?.[0] || null);
85
+
}
86
+
);
87
+
});
88
+
}
89
+
90
+
async findBySha256(sha256: string): Promise<ProfileBlob | null> {
91
+
return new Promise((resolve, reject) => {
92
+
this.db.all(
93
+
`SELECT * FROM profile_blobs WHERE sha256 = $1 LIMIT 1`,
94
+
sha256,
95
+
(err, rows: ProfileBlob[]) => {
96
+
if (err) {
97
+
logger.error({ err, sha256 }, "Failed to find profile blob by SHA256");
98
+
reject(err);
99
+
return;
100
+
}
101
+
resolve(rows?.[0] || null);
102
+
}
103
+
);
104
+
});
105
+
}
106
+
107
+
async findByPhash(phash: string): Promise<ProfileBlob[]> {
108
+
return new Promise((resolve, reject) => {
109
+
this.db.all(
110
+
`SELECT * FROM profile_blobs WHERE phash = $1`,
111
+
phash,
112
+
(err, rows: ProfileBlob[]) => {
113
+
if (err) {
114
+
logger.error({ err, phash }, "Failed to find profile blobs by pHash");
115
+
reject(err);
116
+
return;
117
+
}
118
+
resolve(rows || []);
119
+
}
120
+
);
121
+
});
122
+
}
123
+
}
+20
-46
src/database/schema.ts
+20
-46
src/database/schema.ts
···
39
39
banner_cid TEXT
40
40
);
41
41
42
-
-- Blobs table: stores information about image blobs found in posts and profiles
42
+
-- Blobs table: stores information about image blobs found in posts
43
43
CREATE TABLE IF NOT EXISTS blobs (
44
44
post_uri TEXT NOT NULL,
45
45
blob_cid TEXT NOT NULL,
···
47
47
phash TEXT,
48
48
storage_path TEXT,
49
49
mimetype TEXT,
50
-
PRIMARY KEY (post_uri, blob_cid)
50
+
PRIMARY KEY (post_uri, blob_cid),
51
+
FOREIGN KEY (post_uri) REFERENCES posts(uri)
52
+
);
53
+
54
+
-- Profile blobs table: stores avatar and banner blobs for profiles
55
+
CREATE TABLE IF NOT EXISTS profile_blobs (
56
+
did TEXT NOT NULL,
57
+
blob_type TEXT NOT NULL CHECK (blob_type IN ('avatar', 'banner')),
58
+
blob_cid TEXT NOT NULL,
59
+
sha256 TEXT NOT NULL,
60
+
phash TEXT,
61
+
storage_path TEXT,
62
+
mimetype TEXT,
63
+
captured_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
64
+
PRIMARY KEY (did, blob_type, captured_at),
65
+
FOREIGN KEY (did) REFERENCES profiles(did)
51
66
);
52
67
53
68
-- Indexes for performance
···
55
70
CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val);
56
71
CREATE INDEX IF NOT EXISTS idx_labels_cts ON labels(cts);
57
72
CREATE INDEX IF NOT EXISTS idx_posts_did ON posts(did);
73
+
CREATE INDEX IF NOT EXISTS idx_blobs_cid ON blobs(blob_cid);
58
74
CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256);
59
75
CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash);
76
+
CREATE INDEX IF NOT EXISTS idx_profile_blobs_sha256 ON profile_blobs(sha256);
77
+
CREATE INDEX IF NOT EXISTS idx_profile_blobs_phash ON profile_blobs(phash);
60
78
`;
61
79
62
80
async function migrateProfilesTable(): Promise<void> {
···
105
123
});
106
124
}
107
125
108
-
async function migrateBlobsTableConstraint(): Promise<void> {
109
-
const db = getDatabase();
110
-
111
-
return new Promise((resolve, reject) => {
112
-
db.all(
113
-
`SELECT constraint_name FROM information_schema.table_constraints
114
-
WHERE table_name = 'blobs' AND constraint_type = 'FOREIGN KEY'`,
115
-
(err, rows: any[]) => {
116
-
if (err) {
117
-
logger.error({ err }, "Failed to check blobs table constraints");
118
-
reject(err);
119
-
return;
120
-
}
121
-
122
-
if (rows && rows.length > 0) {
123
-
logger.info("Migrating blobs table to remove foreign key constraint");
124
-
125
-
const migration = `
126
-
CREATE TABLE blobs_new AS SELECT * FROM blobs;
127
-
DROP TABLE blobs;
128
-
ALTER TABLE blobs_new RENAME TO blobs;
129
-
CREATE INDEX IF NOT EXISTS idx_blobs_sha256 ON blobs(sha256);
130
-
CREATE INDEX IF NOT EXISTS idx_blobs_phash ON blobs(phash);
131
-
`;
132
-
133
-
db.exec(migration, (err) => {
134
-
if (err) {
135
-
logger.error({ err }, "Failed to migrate blobs table");
136
-
reject(err);
137
-
return;
138
-
}
139
-
logger.info("Blobs table migration completed");
140
-
resolve();
141
-
});
142
-
} else {
143
-
logger.debug("Blobs table already has no foreign key constraint");
144
-
resolve();
145
-
}
146
-
}
147
-
);
148
-
});
149
-
}
150
-
151
126
export async function initializeSchema(): Promise<void> {
152
127
const db = getDatabase();
153
128
···
162
137
163
138
try {
164
139
await migrateProfilesTable();
165
-
await migrateBlobsTableConstraint();
166
140
resolve();
167
141
} catch (migrationErr) {
168
142
reject(migrationErr);
+5
-1
src/hydration/posts.service.ts
+5
-1
src/hydration/posts.service.ts
···
3
3
import { PostsRepository } from "../database/posts.repository.js";
4
4
import { BlobProcessor } from "../blobs/processor.js";
5
5
import { pRateLimit } from "p-ratelimit";
6
-
import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js";
6
+
import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
7
7
import { logger } from "../logger/index.js";
8
8
import { config } from "../config/index.js";
9
9
···
110
110
}
111
111
}
112
112
} catch (error) {
113
+
if (isRecordNotFoundError(error)) {
114
+
logger.warn({ uri }, "Post record not found, skipping");
115
+
return;
116
+
}
113
117
logger.error({ error, uri }, "Failed to hydrate post");
114
118
throw error;
115
119
}
+40
-11
src/hydration/profiles.service.ts
+40
-11
src/hydration/profiles.service.ts
···
1
1
import { AtpAgent } from "@atproto/api";
2
2
import { Database } from "duckdb";
3
3
import { ProfilesRepository } from "../database/profiles.repository.js";
4
-
import { BlobsRepository } from "../database/blobs.repository.js";
4
+
import { ProfileBlobsRepository } from "../database/profile-blobs.repository.js";
5
5
import { computeBlobHashes } from "../blobs/hasher.js";
6
+
import { LocalBlobStorage } from "../blobs/storage/local.js";
7
+
import { S3BlobStorage } from "../blobs/storage/s3.js";
8
+
import { BlobStorage } from "../blobs/processor.js";
6
9
import { pRateLimit } from "p-ratelimit";
7
-
import { withRetry, isRateLimitError, isNetworkError, isServerError } from "../utils/retry.js";
10
+
import { withRetry, isRateLimitError, isNetworkError, isServerError, isRecordNotFoundError } from "../utils/retry.js";
8
11
import { logger } from "../logger/index.js";
9
12
import { config } from "../config/index.js";
10
13
11
14
export class ProfileHydrationService {
12
15
private agent: AtpAgent;
13
16
private profilesRepo: ProfilesRepository;
14
-
private blobsRepo: BlobsRepository;
17
+
private profileBlobsRepo: ProfileBlobsRepository;
18
+
private storage: BlobStorage | null = null;
15
19
private limit: ReturnType<typeof pRateLimit>;
16
20
17
21
constructor(db: Database) {
18
22
this.agent = new AtpAgent({ service: `https://${config.bsky.pds}` });
19
23
this.profilesRepo = new ProfilesRepository(db);
20
-
this.blobsRepo = new BlobsRepository(db);
24
+
this.profileBlobsRepo = new ProfileBlobsRepository(db);
25
+
26
+
if (config.blobs.hydrateBlobs) {
27
+
if (config.blobs.storage.type === "s3") {
28
+
this.storage = new S3BlobStorage(
29
+
config.blobs.storage.s3Bucket!,
30
+
config.blobs.storage.s3Region!
31
+
);
32
+
} else {
33
+
this.storage = new LocalBlobStorage(
34
+
config.blobs.storage.localPath
35
+
);
36
+
}
37
+
}
38
+
21
39
this.limit = pRateLimit({
22
40
interval: 300000,
23
41
rate: 3000,
···
152
170
153
171
logger.info({ did, handle, avatarCid, bannerCid }, "Profile hydrated successfully");
154
172
} catch (error) {
173
+
if (isRecordNotFoundError(error)) {
174
+
logger.warn({ did }, "Profile record not found, skipping");
175
+
return;
176
+
}
155
177
logger.error({ error, did }, "Failed to hydrate profile");
156
178
throw error;
157
179
}
···
187
209
cid: string,
188
210
type: "avatar" | "banner"
189
211
): Promise<void> {
190
-
const postUri = `profile://${did}/${type}`;
191
-
const existing = await this.blobsRepo.findByPostUri(postUri);
212
+
const latestBlob = await this.profileBlobsRepo.findLatestByDidAndType(did, type);
192
213
193
-
if (existing.length > 0 && existing.some(b => b.blob_cid === cid)) {
194
-
logger.debug({ did, cid, type }, "Blob already processed, skipping");
214
+
if (latestBlob && latestBlob.blob_cid === cid) {
215
+
logger.debug({ did, cid, type }, "Latest blob already has same CID, skipping");
195
216
return;
196
217
}
197
218
···
210
231
}
211
232
212
233
const blobData = Buffer.from(await blobResponse.arrayBuffer());
234
+
235
+
let storagePath: string | undefined;
236
+
if (this.storage && config.blobs.hydrateBlobs) {
237
+
storagePath = await this.storage.store(cid, blobData, "image/jpeg");
238
+
}
239
+
213
240
const hashes = await computeBlobHashes(blobData, "image/jpeg");
214
241
215
-
await this.blobsRepo.insert({
216
-
post_uri: postUri,
242
+
await this.profileBlobsRepo.insert({
243
+
did,
244
+
blob_type: type,
217
245
blob_cid: cid,
218
246
sha256: hashes.sha256,
219
247
phash: hashes.phash,
248
+
storage_path: storagePath,
220
249
mimetype: "image/jpeg",
221
250
});
222
251
223
-
logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint }, "Profile blob processed successfully");
252
+
logger.info({ did, cid, type, sha256: hashes.sha256, pdsEndpoint, storagePath }, "Profile blob processed successfully");
224
253
}
225
254
}
+7
src/utils/retry.ts
+7
src/utils/retry.ts
···
93
93
export function isServerError(error: any): boolean {
94
94
return error?.status >= 500 && error?.status < 600;
95
95
}
96
+
97
+
export function isRecordNotFoundError(error: any): boolean {
98
+
return (
99
+
error?.error === "RecordNotFound" ||
100
+
error?.message?.includes("RecordNotFound")
101
+
);
102
+
}
+6
tests/integration/database.test.ts
+6
tests/integration/database.test.ts
···
195
195
const found = await blobsRepo.findByPhash("deadbeef");
196
196
expect(found.length).toBeGreaterThan(0);
197
197
});
198
+
199
+
test("should find blob by CID", async () => {
200
+
const found = await blobsRepo.findByCid("bafytest123");
201
+
expect(found).not.toBeNull();
202
+
expect(found?.sha256).toBe("abc123def456");
203
+
});
198
204
});
199
205
});
+220
tests/unit/subscriber.test.ts
+220
tests/unit/subscriber.test.ts
···
1
+
import { describe, test, expect, beforeEach, afterEach, mock } from "bun:test";
2
+
import { FirehoseSubscriber } from "../../src/firehose/subscriber.js";
3
+
import { EventEmitter } from "events";
4
+
import WebSocket from "ws";
5
+
6
+
// Mock WebSocket class
7
+
class MockWebSocket extends EventEmitter {
8
+
constructor(url: string) {
9
+
super();
10
+
}
11
+
close() {}
12
+
}
13
+
14
+
// Mock the entire 'ws' module
15
+
mock.module("ws", () => ({
16
+
default: MockWebSocket,
17
+
}));
18
+
19
+
describe("FirehoseSubscriber", () => {
20
+
let subscriber: FirehoseSubscriber;
21
+
let mockWsInstance: MockWebSocket;
22
+
23
+
beforeEach(() => {
24
+
subscriber = new FirehoseSubscriber();
25
+
// Mock the connect method to control the WebSocket instance
26
+
(subscriber as any).connect = () => {
27
+
const url = new URL("ws://localhost:1234");
28
+
if ((subscriber as any).cursor !== null) {
29
+
url.searchParams.set("cursor", (subscriber as any).cursor.toString());
30
+
}
31
+
const ws = new WebSocket(url.toString());
32
+
(subscriber as any).ws = ws;
33
+
mockWsInstance = ws as any;
34
+
35
+
ws.on("open", () => {
36
+
subscriber.emit("connected");
37
+
});
38
+
ws.on("message", async (data: Buffer) => {
39
+
try {
40
+
const message = JSON.parse(data.toString());
41
+
if (message.seq) {
42
+
await (subscriber as any).saveCursor(message.seq);
43
+
}
44
+
if (message.t === "#labels") {
45
+
for (const label of message.labels) {
46
+
subscriber.emit("label", label);
47
+
}
48
+
}
49
+
} catch (error) {
50
+
subscriber.emit("error", error);
51
+
}
52
+
});
53
+
ws.on("close", () => {
54
+
(subscriber as any).ws = null;
55
+
subscriber.emit("disconnected");
56
+
if ((subscriber as any).shouldReconnect) {
57
+
(subscriber as any).scheduleReconnect();
58
+
}
59
+
});
60
+
ws.on("error", (error) => {
61
+
subscriber.emit("error", error);
62
+
});
63
+
};
64
+
});
65
+
66
+
afterEach(() => {
67
+
subscriber.stop();
68
+
});
69
+
70
+
test("should attempt to connect on start", async (done) => {
71
+
subscriber.on("connected", () => {
72
+
done();
73
+
});
74
+
await subscriber.start();
75
+
mockWsInstance.emit("open");
76
+
});
77
+
78
+
test("should emit label event on label for post", async (done) => {
79
+
subscriber.on("label", (label) => {
80
+
expect(label.uri).toBe("at://did:plc:user/app.bsky.feed.post/123");
81
+
done();
82
+
});
83
+
await subscriber.start();
84
+
mockWsInstance.emit(
85
+
"message",
86
+
Buffer.from(
87
+
JSON.stringify({
88
+
op: 1,
89
+
t: "#labels",
90
+
labels: [
91
+
{
92
+
src: "did:plc:labeler",
93
+
uri: "at://did:plc:user/app.bsky.feed.post/123",
94
+
val: "spam",
95
+
cts: "2025-01-15T12:00:00Z",
96
+
},
97
+
],
98
+
})
99
+
)
100
+
);
101
+
});
102
+
103
+
test("should emit label event on label for profile", async (done) => {
104
+
subscriber.on("label", (label) => {
105
+
expect(label.uri).toBe("did:plc:user");
106
+
done();
107
+
});
108
+
await subscriber.start();
109
+
mockWsInstance.emit(
110
+
"message",
111
+
Buffer.from(
112
+
JSON.stringify({
113
+
op: 1,
114
+
t: "#labels",
115
+
labels: [
116
+
{
117
+
src: "did:plc:labeler",
118
+
uri: "did:plc:user",
119
+
val: "spam",
120
+
cts: "2025-01-15T12:00:00Z",
121
+
},
122
+
],
123
+
})
124
+
)
125
+
);
126
+
});
127
+
128
+
test("should handle multiple labels in one message", async () => {
129
+
let labelCount = 0;
130
+
subscriber.on("label", () => {
131
+
labelCount++;
132
+
});
133
+
await subscriber.start();
134
+
mockWsInstance.emit(
135
+
"message",
136
+
Buffer.from(
137
+
JSON.stringify({
138
+
op: 1,
139
+
t: "#labels",
140
+
labels: [
141
+
{
142
+
src: "did:plc:labeler",
143
+
uri: "at://did:plc:user/app.bsky.feed.post/123",
144
+
val: "spam",
145
+
cts: "2025-01-15T12:00:00Z",
146
+
},
147
+
{
148
+
src: "did:plc:labeler",
149
+
uri: "did:plc:user",
150
+
val: "spam",
151
+
cts: "2025-01-15T12:00:00Z",
152
+
},
153
+
],
154
+
})
155
+
)
156
+
);
157
+
expect(labelCount).toBe(2);
158
+
});
159
+
160
+
test("should attempt to reconnect on close", (done) => {
161
+
let connectAttempts = 0;
162
+
(subscriber as any).connect = () => {
163
+
connectAttempts++;
164
+
if (connectAttempts > 1) {
165
+
done();
166
+
}
167
+
const url = new URL("ws://localhost:1234");
168
+
const ws = new WebSocket(url.toString());
169
+
(subscriber as any).ws = ws;
170
+
mockWsInstance = ws as any;
171
+
ws.on("close", () => {
172
+
(subscriber as any).ws = null;
173
+
subscriber.emit("disconnected");
174
+
if ((subscriber as any).shouldReconnect) {
175
+
(subscriber as any).scheduleReconnect();
176
+
}
177
+
});
178
+
};
179
+
180
+
subscriber.start();
181
+
mockWsInstance.emit("close");
182
+
});
183
+
184
+
test("should stop reconnecting after stop() is called", async (done) => {
185
+
let connectAttempts = 0;
186
+
(subscriber as any).connect = () => {
187
+
connectAttempts++;
188
+
const url = new URL("ws://localhost:1234");
189
+
const ws = new WebSocket(url.toString());
190
+
(subscriber as any).ws = ws;
191
+
mockWsInstance = ws as any;
192
+
ws.on("close", () => {
193
+
(subscriber as any).ws = null;
194
+
subscriber.emit("disconnected");
195
+
if ((subscriber as any).shouldReconnect) {
196
+
(subscriber as any).scheduleReconnect();
197
+
}
198
+
});
199
+
};
200
+
201
+
await subscriber.start();
202
+
subscriber.stop();
203
+
mockWsInstance.emit("close");
204
+
205
+
setTimeout(() => {
206
+
expect(connectAttempts).toBe(1);
207
+
done();
208
+
}, 2000);
209
+
});
210
+
211
+
test("should increase backoff delay on multiple reconnects", () => {
212
+
subscriber.start();
213
+
(subscriber as any).reconnectAttempts = 0;
214
+
(subscriber as any).scheduleReconnect();
215
+
const initialBackoff = (subscriber as any).reconnectAttempts;
216
+
(subscriber as any).scheduleReconnect();
217
+
const secondBackoff = (subscriber as any).reconnectAttempts;
218
+
expect(secondBackoff).toBeGreaterThan(initialBackoff);
219
+
});
220
+
});