atproto user agency toolkit for individuals and groups
1import { describe, it, expect, beforeEach, afterEach } from "vitest";
2import { mkdtempSync, rmSync } from "node:fs";
3import { tmpdir } from "node:os";
4import { join } from "node:path";
5import Database from "better-sqlite3";
6import { IpfsService } from "../ipfs.js";
7import { RepoManager } from "../repo-manager.js";
8import type { Config } from "../config.js";
9import { BlockMap, readCarWithRoot } from "@atproto/repo";
10import { CID } from "@atproto/lex-data";
11import {
12 create as createCid,
13 CODEC_RAW,
14 toString as cidToString,
15} from "@atcute/cid";
16
17import { SyncStorage } from "./sync-storage.js";
18import {
19 didToRkey,
20 rkeyToDid,
21 PEER_NSID,
22 MANIFEST_NSID,
23 DEFAULT_VERIFICATION_CONFIG,
24} from "./types.js";
25import { BlockVerifier, RemoteVerifier } from "./verification.js";
26import { RepoFetcher, extractPdsEndpoint } from "./repo-fetcher.js";
27import { PeerDiscovery } from "./peer-discovery.js";
28import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js";
29import { ReplicatedRepoReader } from "./replicated-repo-reader.js";
30import { decode as cborDecode } from "../cbor-compat.js";
31import { Firehose } from "../firehose.js";
32import { createApp } from "../index.js";
33
34/** Create a CID string from raw bytes using SHA-256. */
35async function makeCidStr(bytes: Uint8Array): Promise<string> {
36 const cid = await createCid(CODEC_RAW, bytes);
37 return cidToString(cid);
38}
39
40function testConfig(dataDir: string, replicateDids: string[] = []): Config {
41 return {
42 DID: "did:plc:test123",
43 HANDLE: "test.example.com",
44 PDS_HOSTNAME: "test.example.com",
45 AUTH_TOKEN: "test-auth-token",
46 SIGNING_KEY:
47 "0000000000000000000000000000000000000000000000000000000000000001",
48 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
49 JWT_SECRET: "test-jwt-secret",
50 PASSWORD_HASH: "$2a$10$test",
51 DATA_DIR: dataDir,
52 PORT: 3000,
53 IPFS_ENABLED: true,
54 IPFS_NETWORKING: false,
55 REPLICATE_DIDS: replicateDids,
56 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
57 FIREHOSE_ENABLED: false,
58 RATE_LIMIT_ENABLED: false,
59 RATE_LIMIT_READ_PER_MIN: 300,
60 RATE_LIMIT_SYNC_PER_MIN: 30,
61 RATE_LIMIT_SESSION_PER_MIN: 10,
62 RATE_LIMIT_WRITE_PER_MIN: 200,
63 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
64 RATE_LIMIT_MAX_CONNECTIONS: 100,
65 RATE_LIMIT_FIREHOSE_PER_IP: 3,
66 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
67 };
68}
69
70// ============================================
71// didToRkey / rkeyToDid
72// ============================================
73
74describe("didToRkey", () => {
75 it("replaces colons with hyphens", () => {
76 expect(didToRkey("did:plc:abc123")).toBe("did-plc-abc123");
77 });
78
79 it("handles did:web", () => {
80 expect(didToRkey("did:web:example.com")).toBe("did-web-example.com");
81 });
82
83 it("roundtrips with rkeyToDid for simple DIDs", () => {
84 const did = "did:plc:abc123";
85 const rkey = didToRkey(did);
86 // rkeyToDid replaces ALL hyphens, so roundtrip only works for DIDs without hyphens
87 expect(rkeyToDid(rkey)).toBe(did);
88 });
89});
90
91// ============================================
92// SyncStorage
93// ============================================
94
95describe("SyncStorage", () => {
96 let tmpDir: string;
97 let db: InstanceType<typeof Database>;
98 let storage: SyncStorage;
99
100 beforeEach(() => {
101 tmpDir = mkdtempSync(join(tmpdir(), "sync-storage-test-"));
102 db = new Database(join(tmpDir, "test.db"));
103 storage = new SyncStorage(db);
104 storage.initSchema();
105 });
106
107 afterEach(() => {
108 db.close();
109 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
110 });
111
112 it("creates table and retrieves empty states", () => {
113 const states = storage.getAllStates();
114 expect(states).toEqual([]);
115 });
116
117 it("upserts and retrieves state", () => {
118 storage.upsertState({
119 did: "did:plc:test1",
120 pdsEndpoint: "https://pds.example.com",
121 });
122
123 const state = storage.getState("did:plc:test1");
124 expect(state).not.toBeNull();
125 expect(state!.did).toBe("did:plc:test1");
126 expect(state!.pdsEndpoint).toBe("https://pds.example.com");
127 expect(state!.status).toBe("pending");
128 expect(state!.peerId).toBeNull();
129 });
130
131 it("updates status", () => {
132 storage.upsertState({
133 did: "did:plc:test1",
134 pdsEndpoint: "https://pds.example.com",
135 });
136
137 storage.updateStatus("did:plc:test1", "syncing");
138 expect(storage.getState("did:plc:test1")!.status).toBe("syncing");
139 });
140
141 it("updates status with error message", () => {
142 storage.upsertState({
143 did: "did:plc:test1",
144 pdsEndpoint: "https://pds.example.com",
145 });
146
147 storage.updateStatus("did:plc:test1", "error", "Connection refused");
148 const state = storage.getState("did:plc:test1")!;
149 expect(state.status).toBe("error");
150 expect(state.errorMessage).toBe("Connection refused");
151 });
152
153 it("updates sync progress", () => {
154 storage.upsertState({
155 did: "did:plc:test1",
156 pdsEndpoint: "https://pds.example.com",
157 });
158
159 storage.updateSyncProgress("did:plc:test1", "rev123");
160 const state = storage.getState("did:plc:test1")!;
161 expect(state.lastSyncRev).toBe("rev123");
162 expect(state.lastSyncAt).not.toBeNull();
163 expect(state.status).toBe("synced");
164 });
165
166 it("updates and clears peer info", () => {
167 storage.upsertState({
168 did: "did:plc:test1",
169 pdsEndpoint: "https://pds.example.com",
170 });
171
172 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest");
173 let state = storage.getState("did:plc:test1")!;
174 expect(state.peerId).toBe("12D3KooWTest");
175 expect(state.peerInfoFetchedAt).not.toBeNull();
176
177 storage.clearPeerInfo("did:plc:test1");
178 state = storage.getState("did:plc:test1")!;
179 expect(state.peerId).toBeNull();
180 expect(state.peerInfoFetchedAt).toBeNull();
181 });
182
183 it("stores and retrieves peer multiaddrs", () => {
184 storage.upsertState({
185 did: "did:plc:test1",
186 pdsEndpoint: "https://pds.example.com",
187 });
188
189 const multiaddrs = [
190 "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest",
191 "/ip4/192.168.1.1/tcp/4001/p2p/12D3KooWTest",
192 ];
193 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", multiaddrs);
194
195 const state = storage.getState("did:plc:test1")!;
196 expect(state.peerId).toBe("12D3KooWTest");
197 expect(state.peerMultiaddrs).toEqual(multiaddrs);
198 });
199
200 it("clearPeerInfo also clears multiaddrs", () => {
201 storage.upsertState({
202 did: "did:plc:test1",
203 pdsEndpoint: "https://pds.example.com",
204 });
205
206 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", ["/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"]);
207 storage.clearPeerInfo("did:plc:test1");
208
209 const state = storage.getState("did:plc:test1")!;
210 expect(state.peerId).toBeNull();
211 expect(state.peerMultiaddrs).toEqual([]);
212 });
213
214 it("getMultiaddrForPdsEndpoint returns multiaddr with /p2p/", () => {
215 storage.upsertState({
216 did: "did:plc:test1",
217 pdsEndpoint: "https://pds.example.com",
218 });
219
220 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [
221 "/ip4/127.0.0.1/tcp/4001",
222 "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest",
223 ]);
224
225 const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com");
226 expect(ma).toBe("/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest");
227 });
228
229 it("getMultiaddrForPdsEndpoint returns null for unknown endpoint", () => {
230 const ma = storage.getMultiaddrForPdsEndpoint("https://unknown.example.com");
231 expect(ma).toBeNull();
232 });
233
234 it("getMultiaddrForPdsEndpoint returns null when no multiaddrs stored", () => {
235 storage.upsertState({
236 did: "did:plc:test1",
237 pdsEndpoint: "https://pds.example.com",
238 });
239
240 const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com");
241 expect(ma).toBeNull();
242 });
243
244 it("getMultiaddrForPdsEndpoint falls back to first addr when none have /p2p/", () => {
245 storage.upsertState({
246 did: "did:plc:test1",
247 pdsEndpoint: "https://pds.example.com",
248 });
249
250 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [
251 "/ip4/127.0.0.1/tcp/4001",
252 ]);
253
254 const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com");
255 expect(ma).toBe("/ip4/127.0.0.1/tcp/4001");
256 });
257
258 it("getAllStates returns all entries sorted by DID", () => {
259 storage.upsertState({
260 did: "did:plc:bbb",
261 pdsEndpoint: "https://b.example.com",
262 });
263 storage.upsertState({
264 did: "did:plc:aaa",
265 pdsEndpoint: "https://a.example.com",
266 });
267
268 const states = storage.getAllStates();
269 expect(states).toHaveLength(2);
270 expect(states[0]!.did).toBe("did:plc:aaa");
271 expect(states[1]!.did).toBe("did:plc:bbb");
272 });
273});
274
275// ============================================
276// Record Publishing (peer identity + manifests)
277// ============================================
278
279describe("Record publishing", () => {
280 let tmpDir: string;
281 let db: InstanceType<typeof Database>;
282 let ipfsService: IpfsService;
283 let repoManager: RepoManager;
284
285 beforeEach(async () => {
286 tmpDir = mkdtempSync(join(tmpdir(), "repl-publish-test-"));
287 const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]);
288
289 db = new Database(join(tmpDir, "test.db"));
290 ipfsService = new IpfsService({
291 db,
292 networking: false,
293 });
294 await ipfsService.start();
295
296 repoManager = new RepoManager(db, config);
297 repoManager.init(undefined, ipfsService, ipfsService);
298 });
299
300 afterEach(async () => {
301 if (ipfsService.isRunning()) {
302 await ipfsService.stop();
303 }
304 db.close();
305 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
306 });
307
308 it("peer identity record not created when networking is off", async () => {
309 // getPeerId() returns null when networking=false
310 expect(ipfsService.getPeerId()).toBeNull();
311
312 // Simulate peer identity publish: no record created because getPeerId() is null
313 const peerId = ipfsService.getPeerId();
314 if (peerId) {
315 await repoManager.putRecord(PEER_NSID, "self", {
316 $type: PEER_NSID,
317 peerId,
318 multiaddrs: [],
319 createdAt: new Date().toISOString(),
320 });
321 }
322
323 // No record should have been created
324 const record = await repoManager.getRecord(PEER_NSID, "self");
325 expect(record).toBeNull();
326 });
327
328 it("syncManifests creates manifest records for each configured DID", async () => {
329 const dids = ["did:plc:remote1", "did:plc:remote2"];
330 for (const did of dids) {
331 const rkey = didToRkey(did);
332 await repoManager.putRecord(MANIFEST_NSID, rkey, {
333 $type: MANIFEST_NSID,
334 subject: did,
335 status: "active",
336 lastSyncRev: null,
337 lastSyncAt: null,
338 createdAt: new Date().toISOString(),
339 });
340 }
341
342 // Both should be readable
343 for (const did of dids) {
344 const rkey = didToRkey(did);
345 const result = await repoManager.getRecord(MANIFEST_NSID, rkey);
346 expect(result).not.toBeNull();
347 expect((result!.record as Record<string, unknown>).subject).toBe(did);
348 expect((result!.record as Record<string, unknown>).status).toBe("active");
349 }
350 });
351
352 it("syncManifests is idempotent", async () => {
353 const did = "did:plc:remote1";
354 const rkey = didToRkey(did);
355 const manifest = {
356 $type: MANIFEST_NSID,
357 subject: did,
358 status: "active",
359 lastSyncRev: null,
360 lastSyncAt: null,
361 createdAt: new Date().toISOString(),
362 };
363
364 // Write twice
365 await repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
366 await repoManager.putRecord(MANIFEST_NSID, rkey, manifest);
367
368 // Should still be readable, no error
369 const result = await repoManager.getRecord(MANIFEST_NSID, rkey);
370 expect(result).not.toBeNull();
371 });
372});
373
374// ============================================
375// RepoFetcher
376// ============================================
377
378describe("RepoFetcher", () => {
379 it("resolvePds with mock DidResolver returning test DID document", async () => {
380 const mockDidResolver = {
381 resolve: async (did: string) => {
382 if (did === "did:plc:test1") {
383 return {
384 id: "did:plc:test1",
385 service: [
386 {
387 id: "#atproto_pds",
388 type: "AtprotoPersonalDataServer",
389 serviceEndpoint: "https://pds.example.com",
390 },
391 ],
392 };
393 }
394 return null;
395 },
396 };
397
398 const fetcher = new RepoFetcher(mockDidResolver as any);
399 const pds = await fetcher.resolvePds("did:plc:test1");
400 expect(pds).toBe("https://pds.example.com");
401 });
402
403 it("returns null for unresolvable DID", async () => {
404 const mockDidResolver = {
405 resolve: async () => null,
406 };
407
408 const fetcher = new RepoFetcher(mockDidResolver as any);
409 const pds = await fetcher.resolvePds("did:plc:unknown");
410 expect(pds).toBeNull();
411 });
412});
413
414describe("extractPdsEndpoint", () => {
415 it("extracts service endpoint from DID document", () => {
416 const doc = {
417 id: "did:plc:test1",
418 service: [
419 {
420 id: "#atproto_pds",
421 type: "AtprotoPersonalDataServer",
422 serviceEndpoint: "https://pds.example.com",
423 },
424 ],
425 };
426 expect(extractPdsEndpoint(doc as any)).toBe("https://pds.example.com");
427 });
428
429 it("returns null when no service array", () => {
430 const doc = { id: "did:plc:test1" };
431 expect(extractPdsEndpoint(doc as any)).toBeNull();
432 });
433
434 it("returns null when no matching service", () => {
435 const doc = {
436 id: "did:plc:test1",
437 service: [
438 {
439 id: "#other",
440 type: "Other",
441 serviceEndpoint: "https://other.example.com",
442 },
443 ],
444 };
445 expect(extractPdsEndpoint(doc as any)).toBeNull();
446 });
447});
448
449// ============================================
450// PeerDiscovery
451// ============================================
452
453describe("PeerDiscovery", () => {
454 it("returns null for unresolvable DID", async () => {
455 const mockFetcher = {
456 resolvePds: async () => null,
457 fetchRecord: async () => null,
458 listRecords: async () => [],
459 };
460
461 const discovery = new PeerDiscovery(mockFetcher as any);
462 const result = await discovery.discoverPeer("did:plc:unknown");
463 expect(result).toBeNull();
464 });
465
466 it("returns pdsEndpoint with null peerId when no peer record", async () => {
467 const mockFetcher = {
468 resolvePds: async () => "https://pds.example.com",
469 fetchRecord: async () => null,
470 listRecords: async () => [],
471 };
472
473 const discovery = new PeerDiscovery(mockFetcher as any);
474 const result = await discovery.discoverPeer("did:plc:test1");
475 expect(result).not.toBeNull();
476 expect(result!.pdsEndpoint).toBe("https://pds.example.com");
477 expect(result!.peerId).toBeNull();
478 expect(result!.multiaddrs).toEqual([]);
479 });
480
481 it("returns peer info when record exists", async () => {
482 const mockFetcher = {
483 resolvePds: async () => "https://pds.example.com",
484 fetchRecord: async () => ({
485 $type: PEER_NSID,
486 peerId: "12D3KooWTest",
487 multiaddrs: ["/ip4/127.0.0.1/tcp/4001"],
488 createdAt: new Date().toISOString(),
489 }),
490 listRecords: async () => [],
491 };
492
493 const discovery = new PeerDiscovery(mockFetcher as any);
494 const result = await discovery.discoverPeer("did:plc:test1");
495 expect(result).not.toBeNull();
496 expect(result!.peerId).toBe("12D3KooWTest");
497 expect(result!.multiaddrs).toEqual(["/ip4/127.0.0.1/tcp/4001"]);
498 });
499});
500
501// ============================================
502// BlockVerifier
503// ============================================
504
505describe("BlockVerifier", () => {
506 let tmpDir: string;
507 let db: InstanceType<typeof Database>;
508 let ipfsService: IpfsService;
509
510 beforeEach(async () => {
511 tmpDir = mkdtempSync(join(tmpdir(), "verifier-test-"));
512 db = new Database(join(tmpDir, "test.db"));
513 ipfsService = new IpfsService({
514 db,
515 networking: false,
516 });
517 await ipfsService.start();
518 });
519
520 afterEach(async () => {
521 if (ipfsService.isRunning()) {
522 await ipfsService.stop();
523 }
524 db.close();
525 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
526 });
527
528 it("all blocks available reports 100%", async () => {
529 const verifier = new BlockVerifier(ipfsService);
530
531 const cids: string[] = [];
532 for (let i = 0; i < 3; i++) {
533 const bytes = new TextEncoder().encode(`block-${i}`);
534 const cidStr = await makeCidStr(bytes);
535 await ipfsService.putBlock(cidStr, bytes);
536 cids.push(cidStr);
537 }
538
539 const result = await verifier.verifyBlockAvailability(cids, 10);
540 expect(result.checked).toBe(3);
541 expect(result.available).toBe(3);
542 expect(result.missing).toEqual([]);
543 });
544
545 it("some blocks missing reports correctly", async () => {
546 const verifier = new BlockVerifier(ipfsService);
547
548 const bytes1 = new TextEncoder().encode("present");
549 const cid1 = await makeCidStr(bytes1);
550 await ipfsService.putBlock(cid1, bytes1);
551
552 const bytes2 = new TextEncoder().encode("missing");
553 const cid2 = await makeCidStr(bytes2);
554 // Don't store cid2
555
556 const result = await verifier.verifyBlockAvailability(
557 [cid1, cid2],
558 10,
559 );
560 expect(result.checked).toBe(2);
561 expect(result.available).toBe(1);
562 expect(result.missing).toContain(cid2);
563 });
564
565 it("sample size > array length checks all", async () => {
566 const verifier = new BlockVerifier(ipfsService);
567
568 const bytes = new TextEncoder().encode("single");
569 const cidStr = await makeCidStr(bytes);
570 await ipfsService.putBlock(cidStr, bytes);
571
572 const result = await verifier.verifyBlockAvailability([cidStr], 100);
573 expect(result.checked).toBe(1);
574 expect(result.available).toBe(1);
575 });
576
577 it("empty array returns zeros", async () => {
578 const verifier = new BlockVerifier(ipfsService);
579
580 const result = await verifier.verifyBlockAvailability([]);
581 expect(result.checked).toBe(0);
582 expect(result.available).toBe(0);
583 expect(result.missing).toEqual([]);
584 });
585});
586
587// ============================================
588// Integration: repo sync (two in-process repos)
589// ============================================
590
591describe("Integration: repo sync via CAR roundtrip", () => {
592 let tmpDir: string;
593 let sourceDb: InstanceType<typeof Database>;
594 let replicaDb: InstanceType<typeof Database>;
595 let sourceIpfs: IpfsService;
596 let replicaIpfs: IpfsService;
597 let sourceRepo: RepoManager;
598 let replicaRepo: RepoManager;
599
600 beforeEach(async () => {
601 tmpDir = mkdtempSync(join(tmpdir(), "repl-integration-test-"));
602
603 // Source setup
604 const sourceConfig = testConfig(join(tmpDir, "source"), []);
605 sourceDb = new Database(join(tmpDir, "source.db"));
606 sourceIpfs = new IpfsService({
607 db: sourceDb,
608 networking: false,
609 });
610 await sourceIpfs.start();
611 sourceRepo = new RepoManager(sourceDb, sourceConfig);
612 sourceRepo.init(undefined, sourceIpfs, sourceIpfs);
613
614 // Replica setup (different DID for the local node, but will replicate source's data)
615 const replicaConfig = testConfig(join(tmpDir, "replica"), [
616 sourceConfig.DID!,
617 ]);
618 replicaConfig.DID = "did:plc:replica456";
619 replicaConfig.SIGNING_KEY =
620 "0000000000000000000000000000000000000000000000000000000000000002";
621 replicaDb = new Database(join(tmpDir, "replica.db"));
622 replicaIpfs = new IpfsService({
623 db: replicaDb,
624 networking: false,
625 });
626 await replicaIpfs.start();
627 replicaRepo = new RepoManager(replicaDb, replicaConfig);
628 replicaRepo.init(undefined, replicaIpfs, replicaIpfs);
629 });
630
631 afterEach(async () => {
632 if (sourceIpfs.isRunning()) await sourceIpfs.stop();
633 if (replicaIpfs.isRunning()) await replicaIpfs.stop();
634 sourceDb.close();
635 replicaDb.close();
636 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
637 });
638
639 it("source records can be replicated via CAR export/import to IPFS", async () => {
640 // 1. Create records in source
641 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
642 $type: "app.bsky.feed.post",
643 text: "Hello from source!",
644 createdAt: new Date().toISOString(),
645 });
646 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
647 $type: "app.bsky.feed.post",
648 text: "Second post",
649 createdAt: new Date().toISOString(),
650 });
651
652 // 2. Export as CAR
653 const carBytes = await sourceRepo.getRepoCar();
654 expect(carBytes.length).toBeGreaterThan(0);
655
656 // 3. Parse CAR on replica side
657 const { root, blocks } = await readCarWithRoot(carBytes);
658 expect(root).toBeDefined();
659
660 // 4. Store blocks in replica's IPFS
661 await replicaIpfs.putBlocks(blocks);
662
663 // 5. Verify blocks are retrievable from replica's IPFS
664 const internalMap = (
665 blocks as unknown as { map: Map<string, Uint8Array> }
666 ).map;
667 expect(internalMap).toBeDefined();
668 expect(internalMap.size).toBeGreaterThan(0);
669
670 for (const cidStr of internalMap.keys()) {
671 const has = await replicaIpfs.hasBlock(cidStr);
672 expect(has).toBe(true);
673 }
674
675 // 6. Verify block availability
676 const verifier = new BlockVerifier(replicaIpfs);
677 const cidStrs = Array.from(internalMap.keys());
678 const verification =
679 await verifier.verifyBlockAvailability(cidStrs);
680 expect(verification.available).toBe(verification.checked);
681 expect(verification.missing).toEqual([]);
682 });
683
684 it("tracks block CIDs for verification", async () => {
685 // 1. Create records in source
686 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
687 $type: "app.bsky.feed.post",
688 text: "tracking test",
689 createdAt: new Date().toISOString(),
690 });
691
692 // 2. Export as CAR, parse, store blocks
693 const carBytes = await sourceRepo.getRepoCar();
694 const { blocks } = await readCarWithRoot(carBytes);
695 await replicaIpfs.putBlocks(blocks);
696
697 // 3. Collect CID strings
698 const internalMap = (
699 blocks as unknown as { map: Map<string, Uint8Array> }
700 ).map;
701 const cidStrs = Array.from(internalMap.keys());
702
703 // 4. Track blocks in sync storage
704 const syncStorage = new SyncStorage(replicaDb);
705 syncStorage.initSchema();
706 syncStorage.trackBlocks("did:plc:test123", cidStrs);
707
708 // 5. Verify tracking
709 expect(syncStorage.getBlockCount("did:plc:test123")).toBe(
710 cidStrs.length,
711 );
712 const stored = syncStorage.getBlockCids("did:plc:test123");
713 expect(stored.sort()).toEqual(cidStrs.sort());
714
715 // 6. Tracking is idempotent
716 syncStorage.trackBlocks("did:plc:test123", cidStrs);
717 expect(syncStorage.getBlockCount("did:plc:test123")).toBe(
718 cidStrs.length,
719 );
720
721 // 7. Clear blocks
722 syncStorage.clearBlocks("did:plc:test123");
723 expect(syncStorage.getBlockCount("did:plc:test123")).toBe(0);
724 });
725
726 it("manifest record updated with sync rev after replication", async () => {
727 // Create a manifest record in replica
728 const sourceDid = "did:plc:test123";
729 const rkey = didToRkey(sourceDid);
730 await replicaRepo.putRecord(MANIFEST_NSID, rkey, {
731 $type: MANIFEST_NSID,
732 subject: sourceDid,
733 status: "active",
734 lastSyncRev: null,
735 lastSyncAt: null,
736 createdAt: new Date().toISOString(),
737 });
738
739 // Simulate sync: create records in source, export, import to IPFS
740 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
741 $type: "app.bsky.feed.post",
742 text: "test post",
743 createdAt: new Date().toISOString(),
744 });
745
746 const carBytes = await sourceRepo.getRepoCar();
747 const { root, blocks } = await readCarWithRoot(carBytes);
748 await replicaIpfs.putBlocks(blocks);
749
750 // Update manifest with sync rev
751 const syncRev = root.toString();
752 await replicaRepo.putRecord(MANIFEST_NSID, rkey, {
753 $type: MANIFEST_NSID,
754 subject: sourceDid,
755 status: "active",
756 lastSyncRev: syncRev,
757 lastSyncAt: new Date().toISOString(),
758 createdAt: new Date().toISOString(),
759 });
760
761 // Verify manifest was updated
762 const result = await replicaRepo.getRecord(MANIFEST_NSID, rkey);
763 expect(result).not.toBeNull();
764 const record = result!.record as Record<string, unknown>;
765 expect(record.lastSyncRev).toBe(syncRev);
766 expect(record.lastSyncAt).not.toBeNull();
767 expect(record.status).toBe("active");
768 });
769});
770
771// ============================================
772// SyncStorage: Block Tracking
773// ============================================
774
775describe("SyncStorage block tracking", () => {
776 let tmpDir: string;
777 let db: InstanceType<typeof Database>;
778 let storage: SyncStorage;
779
780 beforeEach(() => {
781 tmpDir = mkdtempSync(join(tmpdir(), "block-tracking-test-"));
782 db = new Database(join(tmpDir, "test.db"));
783 storage = new SyncStorage(db);
784 storage.initSchema();
785 });
786
787 afterEach(() => {
788 db.close();
789 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
790 });
791
792 it("tracks blocks for a DID", () => {
793 storage.trackBlocks("did:plc:a", ["cid1", "cid2", "cid3"]);
794 expect(storage.getBlockCount("did:plc:a")).toBe(3);
795 expect(storage.getBlockCids("did:plc:a").sort()).toEqual([
796 "cid1",
797 "cid2",
798 "cid3",
799 ]);
800 });
801
802 it("ignores duplicate CIDs", () => {
803 storage.trackBlocks("did:plc:a", ["cid1", "cid2"]);
804 storage.trackBlocks("did:plc:a", ["cid2", "cid3"]);
805 expect(storage.getBlockCount("did:plc:a")).toBe(3);
806 });
807
808 it("tracks blocks per-DID independently", () => {
809 storage.trackBlocks("did:plc:a", ["cid1", "cid2"]);
810 storage.trackBlocks("did:plc:b", ["cid2", "cid3"]);
811 expect(storage.getBlockCount("did:plc:a")).toBe(2);
812 expect(storage.getBlockCount("did:plc:b")).toBe(2);
813 });
814
815 it("clears blocks for a specific DID", () => {
816 storage.trackBlocks("did:plc:a", ["cid1"]);
817 storage.trackBlocks("did:plc:b", ["cid2"]);
818 storage.clearBlocks("did:plc:a");
819 expect(storage.getBlockCount("did:plc:a")).toBe(0);
820 expect(storage.getBlockCount("did:plc:b")).toBe(1);
821 });
822
823 it("returns 0 count for unknown DID", () => {
824 expect(storage.getBlockCount("did:plc:unknown")).toBe(0);
825 });
826
827 it("returns empty array for unknown DID", () => {
828 expect(storage.getBlockCids("did:plc:unknown")).toEqual([]);
829 });
830
831 it("handles empty CID array", () => {
832 storage.trackBlocks("did:plc:a", []);
833 expect(storage.getBlockCount("did:plc:a")).toBe(0);
834 });
835});
836
837// ============================================
838// RemoteVerifier
839// ============================================
840
841describe("RemoteVerifier", () => {
842 let tmpDir: string;
843 let db: InstanceType<typeof Database>;
844 let ipfsService: IpfsService;
845
846 beforeEach(async () => {
847 tmpDir = mkdtempSync(join(tmpdir(), "remote-verifier-test-"));
848 db = new Database(join(tmpDir, "test.db"));
849 ipfsService = new IpfsService({
850 db,
851 networking: false,
852 });
853 await ipfsService.start();
854 });
855
856 afterEach(async () => {
857 if (ipfsService.isRunning()) {
858 await ipfsService.stop();
859 }
860 db.close();
861 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
862 });
863
864 it("Layer 0: passes when local root matches remote getHead", async () => {
865 const bytes = new TextEncoder().encode("root-block");
866 const cidStr = await makeCidStr(bytes);
867 await ipfsService.putBlock(cidStr, bytes);
868
869 const mockFetch = async (url: string | URL | Request) => {
870 const urlStr = typeof url === "string" ? url : url.toString();
871 if (urlStr.includes("getHead")) {
872 return new Response(JSON.stringify({ root: cidStr }), {
873 status: 200,
874 headers: { "Content-Type": "application/json" },
875 });
876 }
877 return new Response(null, { status: 404 });
878 };
879
880 const verifier = new RemoteVerifier(
881 ipfsService,
882 { raslSampleSize: 10 },
883 mockFetch as unknown as typeof fetch,
884 );
885
886 const result = await verifier.verifyPeer(
887 "did:plc:test",
888 "https://pds.example.com",
889 cidStr,
890 [],
891 );
892
893 const layer0 = result.layers.find((l) => l.layer === 0);
894 expect(layer0).toBeDefined();
895 expect(layer0!.passed).toBe(true);
896 expect(layer0!.available).toBe(1);
897 });
898
899 it("Layer 0: fails when commit root returns 404", async () => {
900 const bytes = new TextEncoder().encode("missing-root");
901 const cidStr = await makeCidStr(bytes);
902
903 const mockFetch = async () => {
904 return new Response(null, { status: 404 });
905 };
906
907 const verifier = new RemoteVerifier(
908 ipfsService,
909 {},
910 mockFetch as unknown as typeof fetch,
911 );
912
913 const result = await verifier.verifyPeer(
914 "did:plc:test",
915 "https://pds.example.com",
916 cidStr,
917 [],
918 );
919
920 const layer0 = result.layers.find((l) => l.layer === 0);
921 expect(layer0!.passed).toBe(false);
922 expect(layer0!.missing).toContain(cidStr);
923 });
924
925 it("Layer 0: fails on network error", async () => {
926 const bytes = new TextEncoder().encode("error-root");
927 const cidStr = await makeCidStr(bytes);
928
929 const mockFetch = async () => {
930 throw new Error("Connection refused");
931 };
932
933 const verifier = new RemoteVerifier(
934 ipfsService,
935 {},
936 mockFetch as unknown as typeof fetch,
937 );
938
939 const result = await verifier.verifyPeer(
940 "did:plc:test",
941 "https://pds.example.com",
942 cidStr,
943 [],
944 );
945
946 const layer0 = result.layers.find((l) => l.layer === 0);
947 expect(layer0!.passed).toBe(false);
948 expect(layer0!.error).toBe("Connection refused");
949 });
950
951 it("Layer 1: passes when all sampled blocks match local", async () => {
952 const blocks: Array<{ cid: string; bytes: Uint8Array }> = [];
953 for (let i = 0; i < 5; i++) {
954 const bytes = new TextEncoder().encode(`rasl-block-${i}`);
955 const cidStr = await makeCidStr(bytes);
956 await ipfsService.putBlock(cidStr, bytes);
957 blocks.push({ cid: cidStr, bytes });
958 }
959
960 const mockFetch = async (url: string | URL | Request) => {
961 const urlStr = typeof url === "string" ? url : url.toString();
962 const block = blocks.find((b) => urlStr.includes(b.cid));
963 if (block) {
964 return new Response(block.bytes, { status: 200 });
965 }
966 return new Response(null, { status: 404 });
967 };
968
969 const verifier = new RemoteVerifier(
970 ipfsService,
971 { raslSampleSize: 100 },
972 mockFetch as unknown as typeof fetch,
973 );
974
975 const result = await verifier.verifyPeer(
976 "did:plc:test",
977 "https://pds.example.com",
978 null,
979 blocks.map((b) => b.cid),
980 );
981
982 const layer1 = result.layers.find((l) => l.layer === 1);
983 expect(layer1).toBeDefined();
984 expect(layer1!.passed).toBe(true);
985 expect(layer1!.checked).toBe(5);
986 expect(layer1!.available).toBe(5);
987 });
988
989 it("Layer 1: fails when some blocks missing locally", async () => {
990 const presentBytes = new TextEncoder().encode("present-block");
991 const presentCid = await makeCidStr(presentBytes);
992 await ipfsService.putBlock(presentCid, presentBytes);
993
994 const missingBytes = new TextEncoder().encode("missing-block");
995 const missingCid = await makeCidStr(missingBytes);
996 // Don't store missingCid — it's tracked but not in blockstore
997
998 const mockFetch = async () => new Response(null, { status: 404 });
999
1000 const verifier = new RemoteVerifier(
1001 ipfsService,
1002 { raslSampleSize: 100 },
1003 mockFetch as unknown as typeof fetch,
1004 );
1005
1006 const result = await verifier.verifyPeer(
1007 "did:plc:test",
1008 "https://pds.example.com",
1009 null,
1010 [presentCid, missingCid],
1011 );
1012
1013 const layer1 = result.layers.find((l) => l.layer === 1);
1014 expect(layer1!.passed).toBe(false);
1015 expect(layer1!.available).toBe(1);
1016 expect(layer1!.missing).toContain(missingCid);
1017 });
1018
1019 it("combined: overallPassed is true when all layers pass", async () => {
1020 const bytes = new TextEncoder().encode("all-pass");
1021 const cidStr = await makeCidStr(bytes);
1022 await ipfsService.putBlock(cidStr, bytes);
1023
1024 const mockFetch = async (url: string | URL | Request) => {
1025 const urlStr = typeof url === "string" ? url : url.toString();
1026 if (urlStr.includes("getHead")) {
1027 return new Response(JSON.stringify({ root: cidStr }), {
1028 status: 200,
1029 headers: { "Content-Type": "application/json" },
1030 });
1031 }
1032 return new Response(null, { status: 404 });
1033 };
1034
1035 const verifier = new RemoteVerifier(
1036 ipfsService,
1037 { raslSampleSize: 100 },
1038 mockFetch as unknown as typeof fetch,
1039 );
1040
1041 const result = await verifier.verifyPeer(
1042 "did:plc:test",
1043 "https://pds.example.com",
1044 cidStr,
1045 [cidStr],
1046 );
1047
1048 expect(result.overallPassed).toBe(true);
1049 expect(result.did).toBe("did:plc:test");
1050 expect(result.pdsEndpoint).toBe("https://pds.example.com");
1051 expect(result.layers.length).toBe(4); // 0, 1, 2 (stub), 3 (stub)
1052 });
1053
1054 it("combined: overallPassed is false when any layer fails", async () => {
1055 const bytes = new TextEncoder().encode("fail-test");
1056 const cidStr = await makeCidStr(bytes);
1057
1058 const mockFetch = async () => {
1059 return new Response(null, { status: 404 });
1060 };
1061
1062 const verifier = new RemoteVerifier(
1063 ipfsService,
1064 {},
1065 mockFetch as unknown as typeof fetch,
1066 );
1067
1068 const result = await verifier.verifyPeer(
1069 "did:plc:test",
1070 "https://pds.example.com",
1071 cidStr,
1072 [cidStr],
1073 );
1074
1075 expect(result.overallPassed).toBe(false);
1076 });
1077
1078 it("Layer 0: fails gracefully when rootCid is null", async () => {
1079 const mockFetch = async () => {
1080 return new Response(null, { status: 404 });
1081 };
1082
1083 const verifier = new RemoteVerifier(
1084 ipfsService,
1085 {},
1086 mockFetch as unknown as typeof fetch,
1087 );
1088
1089 const result = await verifier.verifyPeer(
1090 "did:plc:test",
1091 "https://pds.example.com",
1092 null,
1093 [],
1094 );
1095
1096 const layer0 = result.layers.find((l) => l.layer === 0);
1097 expect(layer0).toBeDefined();
1098 expect(layer0!.passed).toBe(false);
1099 expect(layer0!.error).toContain("no local root CID");
1100 });
1101
1102 it("skips Layer 1 when blockCids is empty", async () => {
1103 const mockFetch = async () => {
1104 return new Response(null, { status: 200 });
1105 };
1106
1107 const verifier = new RemoteVerifier(
1108 ipfsService,
1109 {},
1110 mockFetch as unknown as typeof fetch,
1111 );
1112
1113 const result = await verifier.verifyPeer(
1114 "did:plc:test",
1115 "https://pds.example.com",
1116 null,
1117 [],
1118 );
1119
1120 expect(result.layers.find((l) => l.layer === 1)).toBeUndefined();
1121 });
1122
1123 it("Layer 2 and 3 are stubs", async () => {
1124 const mockFetch = async () => {
1125 return new Response(null, { status: 200 });
1126 };
1127
1128 const verifier = new RemoteVerifier(
1129 ipfsService,
1130 {},
1131 mockFetch as unknown as typeof fetch,
1132 );
1133
1134 const result = await verifier.verifyPeer(
1135 "did:plc:test",
1136 "https://pds.example.com",
1137 null,
1138 [],
1139 );
1140
1141 const layer2 = result.layers.find((l) => l.layer === 2);
1142 expect(layer2).toBeDefined();
1143 expect(layer2!.checked).toBe(0);
1144 expect(layer2!.error).toContain("not implemented");
1145
1146 const layer3 = result.layers.find((l) => l.layer === 3);
1147 expect(layer3).toBeDefined();
1148 expect(layer3!.checked).toBe(0);
1149 expect(layer3!.error).toContain("not implemented");
1150 });
1151
1152 it("respects raslSampleSize config", async () => {
1153 const blocks: Array<{ cid: string; bytes: Uint8Array }> = [];
1154 for (let i = 0; i < 20; i++) {
1155 const bytes = new TextEncoder().encode(`sample-block-${i}`);
1156 const cidStr = await makeCidStr(bytes);
1157 await ipfsService.putBlock(cidStr, bytes);
1158 blocks.push({ cid: cidStr, bytes });
1159 }
1160
1161 const mockFetch = async (url: string | URL | Request) => {
1162 const urlStr = typeof url === "string" ? url : url.toString();
1163 const block = blocks.find((b) => urlStr.includes(b.cid));
1164 if (block) {
1165 return new Response(block.bytes, { status: 200 });
1166 }
1167 return new Response(null, { status: 404 });
1168 };
1169
1170 const verifier = new RemoteVerifier(
1171 ipfsService,
1172 { raslSampleSize: 5 },
1173 mockFetch as unknown as typeof fetch,
1174 );
1175
1176 const result = await verifier.verifyPeer(
1177 "did:plc:test",
1178 "https://pds.example.com",
1179 null,
1180 blocks.map((b) => b.cid),
1181 );
1182
1183 const layer1 = result.layers.find((l) => l.layer === 1);
1184 expect(layer1!.checked).toBe(5);
1185 });
1186});
1187
1188// ============================================
1189// IpfsReadableBlockstore
1190// ============================================
1191
1192describe("IpfsReadableBlockstore", () => {
1193 let tmpDir: string;
1194 let db: InstanceType<typeof Database>;
1195 let ipfsService: IpfsService;
1196 let readableBlockstore: IpfsReadableBlockstore;
1197
1198 beforeEach(async () => {
1199 tmpDir = mkdtempSync(join(tmpdir(), "ipfs-readable-bs-test-"));
1200 db = new Database(join(tmpDir, "test.db"));
1201 ipfsService = new IpfsService({
1202 db,
1203 networking: false,
1204 });
1205 await ipfsService.start();
1206 readableBlockstore = new IpfsReadableBlockstore(ipfsService);
1207 });
1208
1209 afterEach(async () => {
1210 if (ipfsService.isRunning()) await ipfsService.stop();
1211 db.close();
1212 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
1213 });
1214
1215 it("getBytes roundtrip", async () => {
1216 const bytes = new TextEncoder().encode("readable-test");
1217 const cidStr = await makeCidStr(bytes);
1218 await ipfsService.putBlock(cidStr, bytes);
1219
1220 const { CID: MfCID } = await import("multiformats");
1221 const cid = MfCID.parse(cidStr);
1222 const result = await readableBlockstore.getBytes(cid);
1223 expect(result).not.toBeNull();
1224 expect(Buffer.from(result!)).toEqual(Buffer.from(bytes));
1225 });
1226
1227 it("has returns true/false correctly", async () => {
1228 const bytes = new TextEncoder().encode("has-test");
1229 const cidStr = await makeCidStr(bytes);
1230 await ipfsService.putBlock(cidStr, bytes);
1231
1232 const { CID: MfCID } = await import("multiformats");
1233 const present = MfCID.parse(cidStr);
1234 expect(await readableBlockstore.has(present)).toBe(true);
1235
1236 const missingBytes = new TextEncoder().encode("missing-test");
1237 const missingCidStr = await makeCidStr(missingBytes);
1238 const missing = MfCID.parse(missingCidStr);
1239 expect(await readableBlockstore.has(missing)).toBe(false);
1240 });
1241
1242 it("getBlocks returns blocks + missing", async () => {
1243 const bytes1 = new TextEncoder().encode("block-a");
1244 const cidStr1 = await makeCidStr(bytes1);
1245 await ipfsService.putBlock(cidStr1, bytes1);
1246
1247 const bytes2 = new TextEncoder().encode("block-b-missing");
1248 const cidStr2 = await makeCidStr(bytes2);
1249
1250 const { CID: MfCID } = await import("multiformats");
1251 const cid1 = MfCID.parse(cidStr1);
1252 const cid2 = MfCID.parse(cidStr2);
1253
1254 const { blocks, missing } = await readableBlockstore.getBlocks([
1255 cid1,
1256 cid2,
1257 ]);
1258 expect(blocks.has(cid1)).toBe(true);
1259 expect(missing).toHaveLength(1);
1260 expect(missing[0]!.toString()).toBe(cidStr2);
1261 });
1262});
1263
1264// ============================================
1265// ReplicatedRepoReader
1266// ============================================
1267
1268describe("ReplicatedRepoReader", () => {
1269 let tmpDir: string;
1270 let sourceDb: InstanceType<typeof Database>;
1271 let replicaDb: InstanceType<typeof Database>;
1272 let sourceIpfs: IpfsService;
1273 let replicaIpfs: IpfsService;
1274 let sourceRepo: RepoManager;
1275 let syncStorage: SyncStorage;
1276 let reader: ReplicatedRepoReader;
1277 const sourceDid = "did:plc:test123";
1278
1279 beforeEach(async () => {
1280 tmpDir = mkdtempSync(join(tmpdir(), "replicated-reader-test-"));
1281
1282 // Source setup
1283 const sourceConfig = testConfig(join(tmpDir, "source"), []);
1284 sourceDb = new Database(join(tmpDir, "source.db"));
1285 sourceIpfs = new IpfsService({
1286 db: sourceDb,
1287 networking: false,
1288 });
1289 await sourceIpfs.start();
1290 sourceRepo = new RepoManager(sourceDb, sourceConfig);
1291 sourceRepo.init(undefined, sourceIpfs, sourceIpfs);
1292
1293 // Replica IPFS + sync storage
1294 replicaDb = new Database(join(tmpDir, "replica.db"));
1295 replicaIpfs = new IpfsService({
1296 db: replicaDb,
1297 networking: false,
1298 });
1299 await replicaIpfs.start();
1300
1301 syncStorage = new SyncStorage(replicaDb);
1302 syncStorage.initSchema();
1303
1304 reader = new ReplicatedRepoReader(replicaIpfs, syncStorage);
1305 });
1306
1307 afterEach(async () => {
1308 if (sourceIpfs.isRunning()) await sourceIpfs.stop();
1309 if (replicaIpfs.isRunning()) await replicaIpfs.stop();
1310 sourceDb.close();
1311 replicaDb.close();
1312 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
1313 });
1314
1315 /** Helper: create records in source, export CAR, store blocks in replica IPFS, record root_cid. */
1316 async function replicateSource(): Promise<{ rootCid: string; rev: string }> {
1317 const carBytes = await sourceRepo.getRepoCar();
1318 const { root, blocks } = await readCarWithRoot(carBytes);
1319 await replicaIpfs.putBlocks(blocks);
1320
1321 const rootCidStr = root.toString();
1322
1323 // Extract rev from commit block
1324 const internalMap = (
1325 blocks as unknown as { map: Map<string, Uint8Array> }
1326 ).map;
1327 let rev = rootCidStr;
1328 const commitBytes = internalMap?.get(rootCidStr);
1329 if (commitBytes) {
1330 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1331 if (typeof commitObj.rev === "string") {
1332 rev = commitObj.rev;
1333 }
1334 }
1335
1336 // Record in sync storage
1337 syncStorage.upsertState({
1338 did: sourceDid,
1339 pdsEndpoint: "https://pds.example.com",
1340 });
1341 syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr);
1342
1343 return { rootCid: rootCidStr, rev };
1344 }
1345
1346 it("returns null for unknown DID", async () => {
1347 const result = await reader.getRecord(
1348 "did:plc:unknown",
1349 "app.bsky.feed.post",
1350 "abc",
1351 );
1352 expect(result).toBeNull();
1353 expect(reader.isReplicatedDid("did:plc:unknown")).toBe(false);
1354 });
1355
1356 it("getRecord returns record with correct CID and value", async () => {
1357 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1358 $type: "app.bsky.feed.post",
1359 text: "Hello replicated!",
1360 createdAt: "2025-01-01T00:00:00.000Z",
1361 });
1362 await replicateSource();
1363
1364 // List records from source to get the rkey
1365 const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", {
1366 limit: 10,
1367 });
1368 const firstRecord = sourceRecords.records[0]!;
1369 const rkey = firstRecord.uri.split("/").pop()!;
1370
1371 const result = await reader.getRecord(
1372 sourceDid,
1373 "app.bsky.feed.post",
1374 rkey,
1375 );
1376 expect(result).not.toBeNull();
1377 expect(result!.cid).toBeDefined();
1378 expect(typeof result!.cid).toBe("string");
1379 const value = result!.value as Record<string, unknown>;
1380 expect(value.text).toBe("Hello replicated!");
1381 });
1382
1383 it("listRecords returns records with cursor support", async () => {
1384 // Create multiple records
1385 for (let i = 0; i < 5; i++) {
1386 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1387 $type: "app.bsky.feed.post",
1388 text: `Post ${i}`,
1389 createdAt: new Date().toISOString(),
1390 });
1391 }
1392 await replicateSource();
1393
1394 // First page
1395 const page1 = await reader.listRecords(sourceDid, "app.bsky.feed.post", {
1396 limit: 3,
1397 });
1398 expect(page1.records).toHaveLength(3);
1399 expect(page1.cursor).toBeDefined();
1400
1401 // Second page
1402 const page2 = await reader.listRecords(sourceDid, "app.bsky.feed.post", {
1403 limit: 3,
1404 cursor: page1.cursor,
1405 });
1406 expect(page2.records).toHaveLength(2);
1407
1408 // All records have correct URIs
1409 for (const rec of [...page1.records, ...page2.records]) {
1410 expect(rec.uri).toMatch(
1411 new RegExp(`^at://${sourceDid}/app\\.bsky\\.feed\\.post/`),
1412 );
1413 expect(rec.cid).toBeDefined();
1414 expect(rec.value).toBeDefined();
1415 }
1416 });
1417
1418 it("describeRepo returns collections list", async () => {
1419 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1420 $type: "app.bsky.feed.post",
1421 text: "post",
1422 createdAt: new Date().toISOString(),
1423 });
1424 await sourceRepo.putRecord("app.bsky.actor.profile", "self", {
1425 $type: "app.bsky.actor.profile",
1426 displayName: "Test",
1427 });
1428 await replicateSource();
1429
1430 const result = await reader.describeRepo(sourceDid);
1431 expect(result).not.toBeNull();
1432 expect(result!.did).toBe(sourceDid);
1433 expect(result!.collections).toContain("app.bsky.feed.post");
1434 expect(result!.collections).toContain("app.bsky.actor.profile");
1435 });
1436
1437 it("getRepoStatus returns rev and root CID", async () => {
1438 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1439 $type: "app.bsky.feed.post",
1440 text: "status test",
1441 createdAt: new Date().toISOString(),
1442 });
1443 const { rootCid, rev } = await replicateSource();
1444
1445 const status = reader.getRepoStatus(sourceDid);
1446 expect(status).not.toBeNull();
1447 expect(status!.did).toBe(sourceDid);
1448 expect(status!.rootCid).toBe(rootCid);
1449 expect(status!.rev).toBe(rev);
1450 expect(status!.active).toBe(true);
1451 });
1452
1453 it("cache invalidation works", async () => {
1454 // First sync
1455 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1456 $type: "app.bsky.feed.post",
1457 text: "First post",
1458 createdAt: new Date().toISOString(),
1459 });
1460 await replicateSource();
1461
1462 const result1 = await reader.listRecords(
1463 sourceDid,
1464 "app.bsky.feed.post",
1465 { limit: 100 },
1466 );
1467 expect(result1.records).toHaveLength(1);
1468
1469 // Add another record and re-sync
1470 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1471 $type: "app.bsky.feed.post",
1472 text: "Second post",
1473 createdAt: new Date().toISOString(),
1474 });
1475 reader.invalidateCache(sourceDid);
1476 await replicateSource();
1477
1478 const result2 = await reader.listRecords(
1479 sourceDid,
1480 "app.bsky.feed.post",
1481 { limit: 100 },
1482 );
1483 expect(result2.records).toHaveLength(2);
1484 });
1485});
1486
1487// ============================================
1488// XRPC integration (replicated repos)
1489// ============================================
1490
1491describe("XRPC integration: replicated repos", () => {
1492 let tmpDir: string;
1493 let sourceDb: InstanceType<typeof Database>;
1494 let replicaDb: InstanceType<typeof Database>;
1495 let sourceIpfs: IpfsService;
1496 let replicaIpfs: IpfsService;
1497 let sourceRepo: RepoManager;
1498 let replicaRepo: RepoManager;
1499 let syncStorage: SyncStorage;
1500 let reader: ReplicatedRepoReader;
1501 let app: ReturnType<typeof createApp>;
1502 const sourceDid = "did:plc:test123";
1503 const replicaDid = "did:plc:replica456";
1504
1505 beforeEach(async () => {
1506 tmpDir = mkdtempSync(join(tmpdir(), "xrpc-replicated-test-"));
1507
1508 // Source setup
1509 const sourceConfig = testConfig(join(tmpDir, "source"), []);
1510 sourceDb = new Database(join(tmpDir, "source.db"));
1511 sourceIpfs = new IpfsService({
1512 db: sourceDb,
1513 networking: false,
1514 });
1515 await sourceIpfs.start();
1516 sourceRepo = new RepoManager(sourceDb, sourceConfig);
1517 sourceRepo.init(undefined, sourceIpfs, sourceIpfs);
1518
1519 // Replica setup
1520 const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]);
1521 replicaConfig.DID = replicaDid;
1522 replicaConfig.SIGNING_KEY =
1523 "0000000000000000000000000000000000000000000000000000000000000002";
1524 replicaDb = new Database(join(tmpDir, "replica.db"));
1525 replicaIpfs = new IpfsService({
1526 db: replicaDb,
1527 networking: false,
1528 });
1529 await replicaIpfs.start();
1530 replicaRepo = new RepoManager(replicaDb, replicaConfig);
1531 replicaRepo.init(undefined, replicaIpfs, replicaIpfs);
1532
1533 syncStorage = new SyncStorage(replicaDb);
1534 syncStorage.initSchema();
1535 reader = new ReplicatedRepoReader(replicaIpfs, syncStorage);
1536
1537 const firehose = new Firehose(replicaRepo);
1538 app = createApp(
1539 replicaConfig,
1540 firehose,
1541 replicaIpfs,
1542 replicaIpfs,
1543 undefined,
1544 undefined,
1545 reader,
1546 replicaRepo,
1547 );
1548 });
1549
1550 afterEach(async () => {
1551 if (sourceIpfs.isRunning()) await sourceIpfs.stop();
1552 if (replicaIpfs.isRunning()) await replicaIpfs.stop();
1553 sourceDb.close();
1554 replicaDb.close();
1555 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
1556 });
1557
1558 async function replicateSource(): Promise<void> {
1559 const carBytes = await sourceRepo.getRepoCar();
1560 const { root, blocks } = await readCarWithRoot(carBytes);
1561 await replicaIpfs.putBlocks(blocks);
1562
1563 const rootCidStr = root.toString();
1564 const internalMap = (
1565 blocks as unknown as { map: Map<string, Uint8Array> }
1566 ).map;
1567 let rev = rootCidStr;
1568 const commitBytes = internalMap?.get(rootCidStr);
1569 if (commitBytes) {
1570 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1571 if (typeof commitObj.rev === "string") {
1572 rev = commitObj.rev;
1573 }
1574 }
1575
1576 syncStorage.upsertState({
1577 did: sourceDid,
1578 pdsEndpoint: "https://pds.example.com",
1579 });
1580 syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr);
1581 reader.invalidateCache(sourceDid);
1582 }
1583
1584 it("GET getRecord for replicated DID → 200", async () => {
1585 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1586 $type: "app.bsky.feed.post",
1587 text: "XRPC replicated test",
1588 createdAt: "2025-01-01T00:00:00.000Z",
1589 });
1590 await replicateSource();
1591
1592 const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", {
1593 limit: 10,
1594 });
1595 const rkey = sourceRecords.records[0]!.uri.split("/").pop()!;
1596
1597 const res = await app.request(
1598 `/xrpc/com.atproto.repo.getRecord?repo=${sourceDid}&collection=app.bsky.feed.post&rkey=${rkey}`,
1599 undefined,
1600 {},
1601 );
1602 expect(res.status).toBe(200);
1603
1604 const json = (await res.json()) as {
1605 uri: string;
1606 cid: string;
1607 value: Record<string, unknown>;
1608 };
1609 expect(json.uri).toContain(sourceDid);
1610 expect(json.value.text).toBe("XRPC replicated test");
1611 });
1612
1613 it("GET listRecords for replicated DID → 200", async () => {
1614 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1615 $type: "app.bsky.feed.post",
1616 text: "list test 1",
1617 createdAt: new Date().toISOString(),
1618 });
1619 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1620 $type: "app.bsky.feed.post",
1621 text: "list test 2",
1622 createdAt: new Date().toISOString(),
1623 });
1624 await replicateSource();
1625
1626 const res = await app.request(
1627 `/xrpc/com.atproto.repo.listRecords?repo=${sourceDid}&collection=app.bsky.feed.post`,
1628 undefined,
1629 {},
1630 );
1631 expect(res.status).toBe(200);
1632
1633 const json = (await res.json()) as {
1634 records: Array<{ uri: string; value: Record<string, unknown> }>;
1635 };
1636 expect(json.records.length).toBe(2);
1637 });
1638
1639 it("GET describeRepo for replicated DID → 200", async () => {
1640 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1641 $type: "app.bsky.feed.post",
1642 text: "describe test",
1643 createdAt: new Date().toISOString(),
1644 });
1645 await replicateSource();
1646
1647 const res = await app.request(
1648 `/xrpc/com.atproto.repo.describeRepo?repo=${sourceDid}`,
1649 undefined,
1650 {},
1651 );
1652 expect(res.status).toBe(200);
1653
1654 const json = (await res.json()) as {
1655 did: string;
1656 collections: string[];
1657 };
1658 expect(json.did).toBe(sourceDid);
1659 expect(json.collections).toContain("app.bsky.feed.post");
1660 });
1661
1662 it("non-replicated foreign DID → 404", async () => {
1663 const res = await app.request(
1664 `/xrpc/com.atproto.repo.getRecord?repo=did:plc:nonexistent&collection=app.bsky.feed.post&rkey=abc`,
1665 undefined,
1666 {},
1667 );
1668 expect(res.status).toBe(404);
1669 });
1670
1671 it("local DID still works via existing path", async () => {
1672 await replicaRepo.createRecord("app.bsky.feed.post", undefined, {
1673 $type: "app.bsky.feed.post",
1674 text: "local post",
1675 createdAt: new Date().toISOString(),
1676 });
1677
1678 const records = await replicaRepo.listRecords("app.bsky.feed.post", {
1679 limit: 10,
1680 });
1681 const rkey = records.records[0]!.uri.split("/").pop()!;
1682
1683 const res = await app.request(
1684 `/xrpc/com.atproto.repo.getRecord?repo=${replicaDid}&collection=app.bsky.feed.post&rkey=${rkey}`,
1685 undefined,
1686 {},
1687 );
1688 expect(res.status).toBe(200);
1689
1690 const json = (await res.json()) as {
1691 uri: string;
1692 value: Record<string, unknown>;
1693 };
1694 expect(json.value.text).toBe("local post");
1695 });
1696});
1697
1698// ============================================
1699// root_cid + rev extraction
1700// ============================================
1701
1702describe("root_cid + rev extraction", () => {
1703 let tmpDir: string;
1704 let sourceDb: InstanceType<typeof Database>;
1705 let sourceIpfs: IpfsService;
1706 let sourceRepo: RepoManager;
1707 let replicaDb: InstanceType<typeof Database>;
1708 let replicaIpfs: IpfsService;
1709 let syncStorage: SyncStorage;
1710
1711 beforeEach(async () => {
1712 tmpDir = mkdtempSync(join(tmpdir(), "rev-extraction-test-"));
1713
1714 const sourceConfig = testConfig(join(tmpDir, "source"), []);
1715 sourceDb = new Database(join(tmpDir, "source.db"));
1716 sourceIpfs = new IpfsService({
1717 db: sourceDb,
1718 networking: false,
1719 });
1720 await sourceIpfs.start();
1721 sourceRepo = new RepoManager(sourceDb, sourceConfig);
1722 sourceRepo.init(undefined, sourceIpfs, sourceIpfs);
1723
1724 replicaDb = new Database(join(tmpDir, "replica.db"));
1725 replicaIpfs = new IpfsService({
1726 db: replicaDb,
1727 networking: false,
1728 });
1729 await replicaIpfs.start();
1730
1731 syncStorage = new SyncStorage(replicaDb);
1732 syncStorage.initSchema();
1733 });
1734
1735 afterEach(async () => {
1736 if (sourceIpfs.isRunning()) await sourceIpfs.stop();
1737 if (replicaIpfs.isRunning()) await replicaIpfs.stop();
1738 sourceDb.close();
1739 replicaDb.close();
1740 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
1741 });
1742
1743 it("after sync, replication_state has both root_cid and last_sync_rev (actual TID)", async () => {
1744 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1745 $type: "app.bsky.feed.post",
1746 text: "rev test",
1747 createdAt: new Date().toISOString(),
1748 });
1749
1750 const carBytes = await sourceRepo.getRepoCar();
1751 const { root, blocks } = await readCarWithRoot(carBytes);
1752 await replicaIpfs.putBlocks(blocks);
1753
1754 const rootCidStr = root.toString();
1755 const internalMap = (
1756 blocks as unknown as { map: Map<string, Uint8Array> }
1757 ).map;
1758 let rev = rootCidStr;
1759 const commitBytes = internalMap?.get(rootCidStr);
1760 if (commitBytes) {
1761 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1762 if (typeof commitObj.rev === "string") {
1763 rev = commitObj.rev;
1764 }
1765 }
1766
1767 const did = "did:plc:test123";
1768 syncStorage.upsertState({
1769 did,
1770 pdsEndpoint: "https://pds.example.com",
1771 });
1772 syncStorage.updateSyncProgress(did, rev, rootCidStr);
1773
1774 const state = syncStorage.getState(did);
1775 expect(state).not.toBeNull();
1776 expect(state!.rootCid).toBe(rootCidStr);
1777 expect(state!.lastSyncRev).toBe(rev);
1778 // Rev should be a TID (not a CID)
1779 expect(state!.lastSyncRev).not.toBe(rootCidStr);
1780 });
1781
1782 it("root_cid is a valid CID string and last_sync_rev is a TID", async () => {
1783 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
1784 $type: "app.bsky.feed.post",
1785 text: "cid validation test",
1786 createdAt: new Date().toISOString(),
1787 });
1788
1789 const carBytes = await sourceRepo.getRepoCar();
1790 const { root, blocks } = await readCarWithRoot(carBytes);
1791 await replicaIpfs.putBlocks(blocks);
1792
1793 const rootCidStr = root.toString();
1794 const internalMap = (
1795 blocks as unknown as { map: Map<string, Uint8Array> }
1796 ).map;
1797 let rev = rootCidStr;
1798 const commitBytes = internalMap?.get(rootCidStr);
1799 if (commitBytes) {
1800 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1801 if (typeof commitObj.rev === "string") {
1802 rev = commitObj.rev;
1803 }
1804 }
1805
1806 // root_cid should be parseable as a CID
1807 const { CID: MfCID } = await import("multiformats");
1808 expect(() => MfCID.parse(rootCidStr)).not.toThrow();
1809
1810 // rev should be a TID (base32-sortable, 13 chars)
1811 expect(rev).toMatch(/^[a-z2-7]{13}$/);
1812 });
1813});
1814
1815// ============================================
1816// SyncStorage: Peer Endpoint Tracking
1817// ============================================
1818
1819describe("SyncStorage peer endpoint tracking", () => {
1820 let tmpDir: string;
1821 let db: InstanceType<typeof Database>;
1822 let storage: SyncStorage;
1823
1824 beforeEach(() => {
1825 tmpDir = mkdtempSync(join(tmpdir(), "peer-endpoint-test-"));
1826 db = new Database(join(tmpDir, "test.db"));
1827 storage = new SyncStorage(db);
1828 storage.initSchema();
1829 });
1830
1831 afterEach(() => {
1832 db.close();
1833 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
1834 });
1835
1836 it("upsertPeerEndpoint + getPeerEndpoints returns entries", () => {
1837 storage.upsertPeerEndpoint(
1838 "did:plc:target1",
1839 "did:plc:peer1",
1840 "https://peer1.example.com",
1841 "rev123",
1842 );
1843 const peers = storage.getPeerEndpoints("did:plc:target1");
1844 expect(peers).toHaveLength(1);
1845 expect(peers[0]!.peerDid).toBe("did:plc:peer1");
1846 expect(peers[0]!.pdsEndpoint).toBe("https://peer1.example.com");
1847 expect(peers[0]!.lastSyncRev).toBe("rev123");
1848 });
1849
1850 it("duplicate peer_did+target_did upsert updates existing", () => {
1851 storage.upsertPeerEndpoint(
1852 "did:plc:target1",
1853 "did:plc:peer1",
1854 "https://old.example.com",
1855 "rev1",
1856 );
1857 storage.upsertPeerEndpoint(
1858 "did:plc:target1",
1859 "did:plc:peer1",
1860 "https://new.example.com",
1861 "rev2",
1862 );
1863 const peers = storage.getPeerEndpoints("did:plc:target1");
1864 expect(peers).toHaveLength(1);
1865 expect(peers[0]!.pdsEndpoint).toBe("https://new.example.com");
1866 expect(peers[0]!.lastSyncRev).toBe("rev2");
1867 });
1868
1869 it("getPeerEndpoints returns empty for unknown DID", () => {
1870 const peers = storage.getPeerEndpoints("did:plc:unknown");
1871 expect(peers).toEqual([]);
1872 });
1873
1874 it("clearPeerEndpoints removes all entries for a DID", () => {
1875 storage.upsertPeerEndpoint(
1876 "did:plc:target1",
1877 "did:plc:peer1",
1878 "https://peer1.example.com",
1879 null,
1880 );
1881 storage.upsertPeerEndpoint(
1882 "did:plc:target1",
1883 "did:plc:peer2",
1884 "https://peer2.example.com",
1885 null,
1886 );
1887 storage.upsertPeerEndpoint(
1888 "did:plc:target2",
1889 "did:plc:peer1",
1890 "https://peer1.example.com",
1891 null,
1892 );
1893
1894 storage.clearPeerEndpoints("did:plc:target1");
1895 expect(storage.getPeerEndpoints("did:plc:target1")).toEqual([]);
1896 // Other target DID's entries should remain
1897 expect(storage.getPeerEndpoints("did:plc:target2")).toHaveLength(1);
1898 });
1899});
1900
1901// ============================================
1902// XRPC: getRepo / getBlocks for replicated DIDs
1903// ============================================
1904
1905describe("XRPC: getRepo + getBlocks for replicated DIDs", () => {
1906 let tmpDir: string;
1907 let sourceDb: InstanceType<typeof Database>;
1908 let replicaDb: InstanceType<typeof Database>;
1909 let sourceIpfs: IpfsService;
1910 let replicaIpfs: IpfsService;
1911 let sourceRepo: RepoManager;
1912 let replicaRepo: RepoManager;
1913 let syncStorage: SyncStorage;
1914 let app: ReturnType<typeof createApp>;
1915 const sourceDid = "did:plc:test123";
1916 const replicaDid = "did:plc:replica456";
1917 let trackedCids: string[] = [];
1918
1919 beforeEach(async () => {
1920 tmpDir = mkdtempSync(join(tmpdir(), "xrpc-getrepo-test-"));
1921
1922 // Source setup
1923 const sourceConfig = testConfig(join(tmpDir, "source"), []);
1924 sourceDb = new Database(join(tmpDir, "source.db"));
1925 sourceIpfs = new IpfsService({
1926 db: sourceDb,
1927 networking: false,
1928 });
1929 await sourceIpfs.start();
1930 sourceRepo = new RepoManager(sourceDb, sourceConfig);
1931 sourceRepo.init(undefined, sourceIpfs, sourceIpfs);
1932
1933 // Replica setup
1934 const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]);
1935 replicaConfig.DID = replicaDid;
1936 replicaConfig.SIGNING_KEY =
1937 "0000000000000000000000000000000000000000000000000000000000000002";
1938 replicaDb = new Database(join(tmpDir, "replica.db"));
1939 replicaIpfs = new IpfsService({
1940 db: replicaDb,
1941 networking: false,
1942 });
1943 await replicaIpfs.start();
1944 replicaRepo = new RepoManager(replicaDb, replicaConfig);
1945 replicaRepo.init(undefined, replicaIpfs, replicaIpfs);
1946
1947 syncStorage = new SyncStorage(replicaDb);
1948 syncStorage.initSchema();
1949
1950 // Create a mock replicationManager with getSyncStorage
1951 const mockReplicationManager = {
1952 getSyncStorage: () => syncStorage,
1953 } as unknown as import("./replication-manager.js").ReplicationManager;
1954
1955 const firehose = new Firehose(replicaRepo);
1956 app = createApp(
1957 replicaConfig,
1958 firehose,
1959 replicaIpfs, // blockStore
1960 replicaIpfs, // networkService
1961 undefined, // blobStore
1962 mockReplicationManager,
1963 undefined, // replicatedRepoReader
1964 replicaRepo,
1965 );
1966 });
1967
1968 afterEach(async () => {
1969 if (sourceIpfs.isRunning()) await sourceIpfs.stop();
1970 if (replicaIpfs.isRunning()) await replicaIpfs.stop();
1971 sourceDb.close();
1972 replicaDb.close();
1973 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
1974 });
1975
1976 async function replicateSource(): Promise<void> {
1977 const carBytes = await sourceRepo.getRepoCar();
1978 const { root, blocks } = await readCarWithRoot(carBytes);
1979 await replicaIpfs.putBlocks(blocks);
1980
1981 const rootCidStr = root.toString();
1982 const internalMap = (
1983 blocks as unknown as { map: Map<string, Uint8Array> }
1984 ).map;
1985 let rev = rootCidStr;
1986 const commitBytes = internalMap?.get(rootCidStr);
1987 if (commitBytes) {
1988 const commitObj = cborDecode(commitBytes) as Record<string, unknown>;
1989 if (typeof commitObj.rev === "string") {
1990 rev = commitObj.rev;
1991 }
1992 }
1993
1994 // Track block CIDs
1995 trackedCids = Array.from(internalMap.keys());
1996
1997 syncStorage.upsertState({
1998 did: sourceDid,
1999 pdsEndpoint: "https://pds.example.com",
2000 });
2001 syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr);
2002 syncStorage.trackBlocks(sourceDid, trackedCids);
2003 }
2004
2005 it("getRepo serves replicated DID repo as valid CAR", async () => {
2006 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
2007 $type: "app.bsky.feed.post",
2008 text: "replicated getRepo test",
2009 createdAt: new Date().toISOString(),
2010 });
2011 await replicateSource();
2012
2013 const res = await app.request(
2014 `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`,
2015 undefined,
2016 {},
2017 );
2018 expect(res.status).toBe(200);
2019 expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car");
2020
2021 // Parse the CAR and verify it has a valid root + blocks
2022 const carBytes = new Uint8Array(await res.arrayBuffer());
2023 const { root, blocks } = await readCarWithRoot(carBytes);
2024 expect(root).toBeDefined();
2025 const internalMap = (
2026 blocks as unknown as { map: Map<string, Uint8Array> }
2027 ).map;
2028 expect(internalMap.size).toBeGreaterThan(0);
2029 });
2030
2031 it("getRepo returns 404 for non-replicated DID", async () => {
2032 const res = await app.request(
2033 `/xrpc/com.atproto.sync.getRepo?did=did:plc:nonexistent`,
2034 undefined,
2035 {},
2036 );
2037 expect(res.status).toBe(404);
2038 });
2039
2040 it("getRepo returns 404 for replicated DID with no rootCid yet", async () => {
2041 // Create state without rootCid
2042 syncStorage.upsertState({
2043 did: sourceDid,
2044 pdsEndpoint: "https://pds.example.com",
2045 });
2046
2047 const res = await app.request(
2048 `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`,
2049 undefined,
2050 {},
2051 );
2052 expect(res.status).toBe(404);
2053 });
2054
2055 it("getBlocks serves requested blocks for replicated DID", async () => {
2056 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
2057 $type: "app.bsky.feed.post",
2058 text: "replicated getBlocks test",
2059 createdAt: new Date().toISOString(),
2060 });
2061 await replicateSource();
2062
2063 // Request a subset of tracked CIDs
2064 const requestCids = trackedCids.slice(0, 2);
2065 const cidsQuery = requestCids.map((c) => `cids=${c}`).join("&");
2066 const res = await app.request(
2067 `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&${cidsQuery}`,
2068 undefined,
2069 {},
2070 );
2071 expect(res.status).toBe(200);
2072 expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car");
2073
2074 const carBytes = new Uint8Array(await res.arrayBuffer());
2075 const { blocks } = await readCarWithRoot(carBytes);
2076 const internalMap = (
2077 blocks as unknown as { map: Map<string, Uint8Array> }
2078 ).map;
2079 // Should contain the requested blocks
2080 for (const cid of requestCids) {
2081 expect(internalMap.has(cid)).toBe(true);
2082 }
2083 });
2084
2085 it("getBlocks returns 404 for non-tracked DID", async () => {
2086 const res = await app.request(
2087 `/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent&cids=bafytest`,
2088 undefined,
2089 {},
2090 );
2091 expect(res.status).toBe(404);
2092 });
2093
2094 it("getBlocks only serves blocks tracked for the requested DID", async () => {
2095 await sourceRepo.createRecord("app.bsky.feed.post", undefined, {
2096 $type: "app.bsky.feed.post",
2097 text: "security test",
2098 createdAt: new Date().toISOString(),
2099 });
2100 await replicateSource();
2101
2102 // Store a block that is NOT tracked for sourceDid
2103 const untrackedBytes = new TextEncoder().encode("untracked-block");
2104 const untrackedCid = await makeCidStr(untrackedBytes);
2105 await replicaIpfs.putBlock(untrackedCid, untrackedBytes);
2106
2107 // Request the untracked CID for the tracked DID
2108 const res = await app.request(
2109 `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&cids=${untrackedCid}`,
2110 undefined,
2111 {},
2112 );
2113 expect(res.status).toBe(200);
2114 // The CAR should be returned but the untracked block should not be in it
2115 const carBytes = new Uint8Array(await res.arrayBuffer());
2116 const { blocks } = await readCarWithRoot(carBytes);
2117 const internalMap = (
2118 blocks as unknown as { map: Map<string, Uint8Array> }
2119 ).map;
2120 expect(internalMap.has(untrackedCid)).toBe(false);
2121 });
2122});
2123
2124// ============================================
2125// Peer fallback sync
2126// ============================================
2127
2128describe("Peer fallback in syncDid", () => {
2129 let tmpDir: string;
2130 let db: InstanceType<typeof Database>;
2131 let ipfsService: IpfsService;
2132 let repoManager: RepoManager;
2133 let syncStorage: SyncStorage;
2134 const localDid = "did:plc:local";
2135 const remoteDid = "did:plc:remote1";
2136
2137 beforeEach(async () => {
2138 tmpDir = mkdtempSync(join(tmpdir(), "peer-fallback-test-"));
2139 const config = testConfig(join(tmpDir, "data"), [remoteDid]);
2140 config.DID = localDid;
2141
2142 db = new Database(join(tmpDir, "test.db"));
2143 ipfsService = new IpfsService({
2144 db,
2145 networking: false,
2146 });
2147 await ipfsService.start();
2148
2149 repoManager = new RepoManager(db, config);
2150 repoManager.init(undefined, ipfsService, ipfsService);
2151
2152 syncStorage = new SyncStorage(db);
2153 syncStorage.initSchema();
2154 });
2155
2156 afterEach(async () => {
2157 if (ipfsService.isRunning()) await ipfsService.stop();
2158 db.close();
2159 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {}
2160 });
2161
2162 it("source PDS fails → fetches from peer endpoint successfully", async () => {
2163 // Create a real CAR from the local repo to serve as peer response
2164 await repoManager.createRecord("app.bsky.feed.post", undefined, {
2165 $type: "app.bsky.feed.post",
2166 text: "peer fallback test",
2167 createdAt: new Date().toISOString(),
2168 });
2169 const carBytes = await repoManager.getRepoCar();
2170
2171 // Set up sync state
2172 syncStorage.upsertState({
2173 did: remoteDid,
2174 pdsEndpoint: "https://source-pds.example.com",
2175 });
2176
2177 // Register a peer endpoint
2178 syncStorage.upsertPeerEndpoint(
2179 remoteDid,
2180 "did:plc:peer1",
2181 "https://peer1.example.com",
2182 "rev1",
2183 );
2184
2185 // Mock RepoFetcher: source fails, peer succeeds
2186 const mockDidResolver = {
2187 resolve: async (did: string) => ({
2188 id: did,
2189 service: [
2190 {
2191 id: "#atproto_pds",
2192 type: "AtprotoPersonalDataServer",
2193 serviceEndpoint: "https://source-pds.example.com",
2194 },
2195 ],
2196 }),
2197 };
2198 const { RepoFetcher: RF } = await import("./repo-fetcher.js");
2199 const fetcher = new RF(mockDidResolver as any);
2200 const originalFetchRepo = fetcher.fetchRepo.bind(fetcher);
2201
2202 let callCount = 0;
2203 fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => {
2204 callCount++;
2205 if (endpoint === "https://source-pds.example.com") {
2206 throw new Error("Source PDS unreachable");
2207 }
2208 if (endpoint === "https://peer1.example.com") {
2209 return carBytes;
2210 }
2211 return originalFetchRepo(endpoint, did, since);
2212 };
2213
2214 // Access private method via prototype
2215 const { ReplicationManager: RM } = await import("./replication-manager.js");
2216 const manager = new RM(
2217 db,
2218 testConfig(join(tmpDir, "data"), [remoteDid]),
2219 repoManager,
2220 ipfsService,
2221 ipfsService,
2222 mockDidResolver as any,
2223 );
2224
2225 // Replace the internal repoFetcher with our mock
2226 (manager as any).repoFetcher = fetcher;
2227 (manager as any).syncStorage = syncStorage;
2228
2229 // Should succeed via peer fallback
2230 await manager.syncDid(remoteDid);
2231
2232 // Source PDS was tried (at least once), and peer was used
2233 expect(callCount).toBeGreaterThanOrEqual(2);
2234
2235 // Verify sync state was updated
2236 const state = syncStorage.getState(remoteDid);
2237 expect(state).not.toBeNull();
2238 expect(state!.status).toBe("synced");
2239 });
2240
2241 it("source PDS fails + all peers fail → throws original error", async () => {
2242 syncStorage.upsertState({
2243 did: remoteDid,
2244 pdsEndpoint: "https://source-pds.example.com",
2245 });
2246
2247 // Register a peer that will also fail
2248 syncStorage.upsertPeerEndpoint(
2249 remoteDid,
2250 "did:plc:peer1",
2251 "https://peer1.example.com",
2252 null,
2253 );
2254
2255 const mockDidResolver = {
2256 resolve: async (did: string) => ({
2257 id: did,
2258 service: [
2259 {
2260 id: "#atproto_pds",
2261 type: "AtprotoPersonalDataServer",
2262 serviceEndpoint: "https://source-pds.example.com",
2263 },
2264 ],
2265 }),
2266 };
2267
2268 const { RepoFetcher: RF } = await import("./repo-fetcher.js");
2269 const fetcher = new RF(mockDidResolver as any);
2270 fetcher.fetchRepo = async () => {
2271 throw new Error("Connection refused");
2272 };
2273
2274 const { ReplicationManager: RM } = await import("./replication-manager.js");
2275 const manager = new RM(
2276 db,
2277 testConfig(join(tmpDir, "data"), [remoteDid]),
2278 repoManager,
2279 ipfsService,
2280 ipfsService,
2281 mockDidResolver as any,
2282 );
2283 (manager as any).repoFetcher = fetcher;
2284 (manager as any).syncStorage = syncStorage;
2285
2286 // Should throw the original error
2287 await expect(manager.syncDid(remoteDid)).rejects.toThrow("Connection refused");
2288 });
2289
2290 it("source PDS fails without since → tries peers without since", async () => {
2291 // Create a real CAR
2292 await repoManager.createRecord("app.bsky.feed.post", undefined, {
2293 $type: "app.bsky.feed.post",
2294 text: "no-since test",
2295 createdAt: new Date().toISOString(),
2296 });
2297 const carBytes = await repoManager.getRepoCar();
2298
2299 // Set up sync state WITHOUT a previous rev (no since)
2300 syncStorage.upsertState({
2301 did: remoteDid,
2302 pdsEndpoint: "https://source-pds.example.com",
2303 });
2304
2305 syncStorage.upsertPeerEndpoint(
2306 remoteDid,
2307 "did:plc:peer1",
2308 "https://peer1.example.com",
2309 null,
2310 );
2311
2312 const mockDidResolver = {
2313 resolve: async (did: string) => ({
2314 id: did,
2315 service: [
2316 {
2317 id: "#atproto_pds",
2318 type: "AtprotoPersonalDataServer",
2319 serviceEndpoint: "https://source-pds.example.com",
2320 },
2321 ],
2322 }),
2323 };
2324
2325 const { RepoFetcher: RF } = await import("./repo-fetcher.js");
2326 const fetcher = new RF(mockDidResolver as any);
2327
2328 let peerSincePassed: string | undefined = "NOT_CALLED";
2329 fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => {
2330 if (endpoint === "https://source-pds.example.com") {
2331 throw new Error("Source PDS down");
2332 }
2333 if (endpoint === "https://peer1.example.com") {
2334 peerSincePassed = since;
2335 return carBytes;
2336 }
2337 throw new Error("Unknown endpoint");
2338 };
2339
2340 const { ReplicationManager: RM } = await import("./replication-manager.js");
2341 const manager = new RM(
2342 db,
2343 testConfig(join(tmpDir, "data"), [remoteDid]),
2344 repoManager,
2345 ipfsService,
2346 ipfsService,
2347 mockDidResolver as any,
2348 );
2349 (manager as any).repoFetcher = fetcher;
2350 (manager as any).syncStorage = syncStorage;
2351
2352 await manager.syncDid(remoteDid);
2353
2354 // Peer should have been called without `since` (undefined)
2355 expect(peerSincePassed).toBeUndefined();
2356 });
2357});