atproto user agency toolkit for individuals and groups
at main 2357 lines 69 kB view raw
1import { describe, it, expect, beforeEach, afterEach } from "vitest"; 2import { mkdtempSync, rmSync } from "node:fs"; 3import { tmpdir } from "node:os"; 4import { join } from "node:path"; 5import Database from "better-sqlite3"; 6import { IpfsService } from "../ipfs.js"; 7import { RepoManager } from "../repo-manager.js"; 8import type { Config } from "../config.js"; 9import { BlockMap, readCarWithRoot } from "@atproto/repo"; 10import { CID } from "@atproto/lex-data"; 11import { 12 create as createCid, 13 CODEC_RAW, 14 toString as cidToString, 15} from "@atcute/cid"; 16 17import { SyncStorage } from "./sync-storage.js"; 18import { 19 didToRkey, 20 rkeyToDid, 21 PEER_NSID, 22 MANIFEST_NSID, 23 DEFAULT_VERIFICATION_CONFIG, 24} from "./types.js"; 25import { BlockVerifier, RemoteVerifier } from "./verification.js"; 26import { RepoFetcher, extractPdsEndpoint } from "./repo-fetcher.js"; 27import { PeerDiscovery } from "./peer-discovery.js"; 28import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js"; 29import { ReplicatedRepoReader } from "./replicated-repo-reader.js"; 30import { decode as cborDecode } from "../cbor-compat.js"; 31import { Firehose } from "../firehose.js"; 32import { createApp } from "../index.js"; 33 34/** Create a CID string from raw bytes using SHA-256. */ 35async function makeCidStr(bytes: Uint8Array): Promise<string> { 36 const cid = await createCid(CODEC_RAW, bytes); 37 return cidToString(cid); 38} 39 40function testConfig(dataDir: string, replicateDids: string[] = []): Config { 41 return { 42 DID: "did:plc:test123", 43 HANDLE: "test.example.com", 44 PDS_HOSTNAME: "test.example.com", 45 AUTH_TOKEN: "test-auth-token", 46 SIGNING_KEY: 47 "0000000000000000000000000000000000000000000000000000000000000001", 48 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 49 JWT_SECRET: "test-jwt-secret", 50 PASSWORD_HASH: "$2a$10$test", 51 DATA_DIR: dataDir, 52 PORT: 3000, 53 IPFS_ENABLED: true, 54 IPFS_NETWORKING: false, 55 REPLICATE_DIDS: replicateDids, 56 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 57 FIREHOSE_ENABLED: false, 58 RATE_LIMIT_ENABLED: false, 59 RATE_LIMIT_READ_PER_MIN: 300, 60 RATE_LIMIT_SYNC_PER_MIN: 30, 61 RATE_LIMIT_SESSION_PER_MIN: 10, 62 RATE_LIMIT_WRITE_PER_MIN: 200, 63 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 64 RATE_LIMIT_MAX_CONNECTIONS: 100, 65 RATE_LIMIT_FIREHOSE_PER_IP: 3, 66 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 67 }; 68} 69 70// ============================================ 71// didToRkey / rkeyToDid 72// ============================================ 73 74describe("didToRkey", () => { 75 it("replaces colons with hyphens", () => { 76 expect(didToRkey("did:plc:abc123")).toBe("did-plc-abc123"); 77 }); 78 79 it("handles did:web", () => { 80 expect(didToRkey("did:web:example.com")).toBe("did-web-example.com"); 81 }); 82 83 it("roundtrips with rkeyToDid for simple DIDs", () => { 84 const did = "did:plc:abc123"; 85 const rkey = didToRkey(did); 86 // rkeyToDid replaces ALL hyphens, so roundtrip only works for DIDs without hyphens 87 expect(rkeyToDid(rkey)).toBe(did); 88 }); 89}); 90 91// ============================================ 92// SyncStorage 93// ============================================ 94 95describe("SyncStorage", () => { 96 let tmpDir: string; 97 let db: InstanceType<typeof Database>; 98 let storage: SyncStorage; 99 100 beforeEach(() => { 101 tmpDir = mkdtempSync(join(tmpdir(), "sync-storage-test-")); 102 db = new Database(join(tmpDir, "test.db")); 103 storage = new SyncStorage(db); 104 storage.initSchema(); 105 }); 106 107 afterEach(() => { 108 db.close(); 109 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 110 }); 111 112 it("creates table and retrieves empty states", () => { 113 const states = storage.getAllStates(); 114 expect(states).toEqual([]); 115 }); 116 117 it("upserts and retrieves state", () => { 118 storage.upsertState({ 119 did: "did:plc:test1", 120 pdsEndpoint: "https://pds.example.com", 121 }); 122 123 const state = storage.getState("did:plc:test1"); 124 expect(state).not.toBeNull(); 125 expect(state!.did).toBe("did:plc:test1"); 126 expect(state!.pdsEndpoint).toBe("https://pds.example.com"); 127 expect(state!.status).toBe("pending"); 128 expect(state!.peerId).toBeNull(); 129 }); 130 131 it("updates status", () => { 132 storage.upsertState({ 133 did: "did:plc:test1", 134 pdsEndpoint: "https://pds.example.com", 135 }); 136 137 storage.updateStatus("did:plc:test1", "syncing"); 138 expect(storage.getState("did:plc:test1")!.status).toBe("syncing"); 139 }); 140 141 it("updates status with error message", () => { 142 storage.upsertState({ 143 did: "did:plc:test1", 144 pdsEndpoint: "https://pds.example.com", 145 }); 146 147 storage.updateStatus("did:plc:test1", "error", "Connection refused"); 148 const state = storage.getState("did:plc:test1")!; 149 expect(state.status).toBe("error"); 150 expect(state.errorMessage).toBe("Connection refused"); 151 }); 152 153 it("updates sync progress", () => { 154 storage.upsertState({ 155 did: "did:plc:test1", 156 pdsEndpoint: "https://pds.example.com", 157 }); 158 159 storage.updateSyncProgress("did:plc:test1", "rev123"); 160 const state = storage.getState("did:plc:test1")!; 161 expect(state.lastSyncRev).toBe("rev123"); 162 expect(state.lastSyncAt).not.toBeNull(); 163 expect(state.status).toBe("synced"); 164 }); 165 166 it("updates and clears peer info", () => { 167 storage.upsertState({ 168 did: "did:plc:test1", 169 pdsEndpoint: "https://pds.example.com", 170 }); 171 172 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest"); 173 let state = storage.getState("did:plc:test1")!; 174 expect(state.peerId).toBe("12D3KooWTest"); 175 expect(state.peerInfoFetchedAt).not.toBeNull(); 176 177 storage.clearPeerInfo("did:plc:test1"); 178 state = storage.getState("did:plc:test1")!; 179 expect(state.peerId).toBeNull(); 180 expect(state.peerInfoFetchedAt).toBeNull(); 181 }); 182 183 it("stores and retrieves peer multiaddrs", () => { 184 storage.upsertState({ 185 did: "did:plc:test1", 186 pdsEndpoint: "https://pds.example.com", 187 }); 188 189 const multiaddrs = [ 190 "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest", 191 "/ip4/192.168.1.1/tcp/4001/p2p/12D3KooWTest", 192 ]; 193 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", multiaddrs); 194 195 const state = storage.getState("did:plc:test1")!; 196 expect(state.peerId).toBe("12D3KooWTest"); 197 expect(state.peerMultiaddrs).toEqual(multiaddrs); 198 }); 199 200 it("clearPeerInfo also clears multiaddrs", () => { 201 storage.upsertState({ 202 did: "did:plc:test1", 203 pdsEndpoint: "https://pds.example.com", 204 }); 205 206 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", ["/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"]); 207 storage.clearPeerInfo("did:plc:test1"); 208 209 const state = storage.getState("did:plc:test1")!; 210 expect(state.peerId).toBeNull(); 211 expect(state.peerMultiaddrs).toEqual([]); 212 }); 213 214 it("getMultiaddrForPdsEndpoint returns multiaddr with /p2p/", () => { 215 storage.upsertState({ 216 did: "did:plc:test1", 217 pdsEndpoint: "https://pds.example.com", 218 }); 219 220 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [ 221 "/ip4/127.0.0.1/tcp/4001", 222 "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest", 223 ]); 224 225 const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); 226 expect(ma).toBe("/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"); 227 }); 228 229 it("getMultiaddrForPdsEndpoint returns null for unknown endpoint", () => { 230 const ma = storage.getMultiaddrForPdsEndpoint("https://unknown.example.com"); 231 expect(ma).toBeNull(); 232 }); 233 234 it("getMultiaddrForPdsEndpoint returns null when no multiaddrs stored", () => { 235 storage.upsertState({ 236 did: "did:plc:test1", 237 pdsEndpoint: "https://pds.example.com", 238 }); 239 240 const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); 241 expect(ma).toBeNull(); 242 }); 243 244 it("getMultiaddrForPdsEndpoint falls back to first addr when none have /p2p/", () => { 245 storage.upsertState({ 246 did: "did:plc:test1", 247 pdsEndpoint: "https://pds.example.com", 248 }); 249 250 storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [ 251 "/ip4/127.0.0.1/tcp/4001", 252 ]); 253 254 const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); 255 expect(ma).toBe("/ip4/127.0.0.1/tcp/4001"); 256 }); 257 258 it("getAllStates returns all entries sorted by DID", () => { 259 storage.upsertState({ 260 did: "did:plc:bbb", 261 pdsEndpoint: "https://b.example.com", 262 }); 263 storage.upsertState({ 264 did: "did:plc:aaa", 265 pdsEndpoint: "https://a.example.com", 266 }); 267 268 const states = storage.getAllStates(); 269 expect(states).toHaveLength(2); 270 expect(states[0]!.did).toBe("did:plc:aaa"); 271 expect(states[1]!.did).toBe("did:plc:bbb"); 272 }); 273}); 274 275// ============================================ 276// Record Publishing (peer identity + manifests) 277// ============================================ 278 279describe("Record publishing", () => { 280 let tmpDir: string; 281 let db: InstanceType<typeof Database>; 282 let ipfsService: IpfsService; 283 let repoManager: RepoManager; 284 285 beforeEach(async () => { 286 tmpDir = mkdtempSync(join(tmpdir(), "repl-publish-test-")); 287 const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); 288 289 db = new Database(join(tmpDir, "test.db")); 290 ipfsService = new IpfsService({ 291 db, 292 networking: false, 293 }); 294 await ipfsService.start(); 295 296 repoManager = new RepoManager(db, config); 297 repoManager.init(undefined, ipfsService, ipfsService); 298 }); 299 300 afterEach(async () => { 301 if (ipfsService.isRunning()) { 302 await ipfsService.stop(); 303 } 304 db.close(); 305 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 306 }); 307 308 it("peer identity record not created when networking is off", async () => { 309 // getPeerId() returns null when networking=false 310 expect(ipfsService.getPeerId()).toBeNull(); 311 312 // Simulate peer identity publish: no record created because getPeerId() is null 313 const peerId = ipfsService.getPeerId(); 314 if (peerId) { 315 await repoManager.putRecord(PEER_NSID, "self", { 316 $type: PEER_NSID, 317 peerId, 318 multiaddrs: [], 319 createdAt: new Date().toISOString(), 320 }); 321 } 322 323 // No record should have been created 324 const record = await repoManager.getRecord(PEER_NSID, "self"); 325 expect(record).toBeNull(); 326 }); 327 328 it("syncManifests creates manifest records for each configured DID", async () => { 329 const dids = ["did:plc:remote1", "did:plc:remote2"]; 330 for (const did of dids) { 331 const rkey = didToRkey(did); 332 await repoManager.putRecord(MANIFEST_NSID, rkey, { 333 $type: MANIFEST_NSID, 334 subject: did, 335 status: "active", 336 lastSyncRev: null, 337 lastSyncAt: null, 338 createdAt: new Date().toISOString(), 339 }); 340 } 341 342 // Both should be readable 343 for (const did of dids) { 344 const rkey = didToRkey(did); 345 const result = await repoManager.getRecord(MANIFEST_NSID, rkey); 346 expect(result).not.toBeNull(); 347 expect((result!.record as Record<string, unknown>).subject).toBe(did); 348 expect((result!.record as Record<string, unknown>).status).toBe("active"); 349 } 350 }); 351 352 it("syncManifests is idempotent", async () => { 353 const did = "did:plc:remote1"; 354 const rkey = didToRkey(did); 355 const manifest = { 356 $type: MANIFEST_NSID, 357 subject: did, 358 status: "active", 359 lastSyncRev: null, 360 lastSyncAt: null, 361 createdAt: new Date().toISOString(), 362 }; 363 364 // Write twice 365 await repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 366 await repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 367 368 // Should still be readable, no error 369 const result = await repoManager.getRecord(MANIFEST_NSID, rkey); 370 expect(result).not.toBeNull(); 371 }); 372}); 373 374// ============================================ 375// RepoFetcher 376// ============================================ 377 378describe("RepoFetcher", () => { 379 it("resolvePds with mock DidResolver returning test DID document", async () => { 380 const mockDidResolver = { 381 resolve: async (did: string) => { 382 if (did === "did:plc:test1") { 383 return { 384 id: "did:plc:test1", 385 service: [ 386 { 387 id: "#atproto_pds", 388 type: "AtprotoPersonalDataServer", 389 serviceEndpoint: "https://pds.example.com", 390 }, 391 ], 392 }; 393 } 394 return null; 395 }, 396 }; 397 398 const fetcher = new RepoFetcher(mockDidResolver as any); 399 const pds = await fetcher.resolvePds("did:plc:test1"); 400 expect(pds).toBe("https://pds.example.com"); 401 }); 402 403 it("returns null for unresolvable DID", async () => { 404 const mockDidResolver = { 405 resolve: async () => null, 406 }; 407 408 const fetcher = new RepoFetcher(mockDidResolver as any); 409 const pds = await fetcher.resolvePds("did:plc:unknown"); 410 expect(pds).toBeNull(); 411 }); 412}); 413 414describe("extractPdsEndpoint", () => { 415 it("extracts service endpoint from DID document", () => { 416 const doc = { 417 id: "did:plc:test1", 418 service: [ 419 { 420 id: "#atproto_pds", 421 type: "AtprotoPersonalDataServer", 422 serviceEndpoint: "https://pds.example.com", 423 }, 424 ], 425 }; 426 expect(extractPdsEndpoint(doc as any)).toBe("https://pds.example.com"); 427 }); 428 429 it("returns null when no service array", () => { 430 const doc = { id: "did:plc:test1" }; 431 expect(extractPdsEndpoint(doc as any)).toBeNull(); 432 }); 433 434 it("returns null when no matching service", () => { 435 const doc = { 436 id: "did:plc:test1", 437 service: [ 438 { 439 id: "#other", 440 type: "Other", 441 serviceEndpoint: "https://other.example.com", 442 }, 443 ], 444 }; 445 expect(extractPdsEndpoint(doc as any)).toBeNull(); 446 }); 447}); 448 449// ============================================ 450// PeerDiscovery 451// ============================================ 452 453describe("PeerDiscovery", () => { 454 it("returns null for unresolvable DID", async () => { 455 const mockFetcher = { 456 resolvePds: async () => null, 457 fetchRecord: async () => null, 458 listRecords: async () => [], 459 }; 460 461 const discovery = new PeerDiscovery(mockFetcher as any); 462 const result = await discovery.discoverPeer("did:plc:unknown"); 463 expect(result).toBeNull(); 464 }); 465 466 it("returns pdsEndpoint with null peerId when no peer record", async () => { 467 const mockFetcher = { 468 resolvePds: async () => "https://pds.example.com", 469 fetchRecord: async () => null, 470 listRecords: async () => [], 471 }; 472 473 const discovery = new PeerDiscovery(mockFetcher as any); 474 const result = await discovery.discoverPeer("did:plc:test1"); 475 expect(result).not.toBeNull(); 476 expect(result!.pdsEndpoint).toBe("https://pds.example.com"); 477 expect(result!.peerId).toBeNull(); 478 expect(result!.multiaddrs).toEqual([]); 479 }); 480 481 it("returns peer info when record exists", async () => { 482 const mockFetcher = { 483 resolvePds: async () => "https://pds.example.com", 484 fetchRecord: async () => ({ 485 $type: PEER_NSID, 486 peerId: "12D3KooWTest", 487 multiaddrs: ["/ip4/127.0.0.1/tcp/4001"], 488 createdAt: new Date().toISOString(), 489 }), 490 listRecords: async () => [], 491 }; 492 493 const discovery = new PeerDiscovery(mockFetcher as any); 494 const result = await discovery.discoverPeer("did:plc:test1"); 495 expect(result).not.toBeNull(); 496 expect(result!.peerId).toBe("12D3KooWTest"); 497 expect(result!.multiaddrs).toEqual(["/ip4/127.0.0.1/tcp/4001"]); 498 }); 499}); 500 501// ============================================ 502// BlockVerifier 503// ============================================ 504 505describe("BlockVerifier", () => { 506 let tmpDir: string; 507 let db: InstanceType<typeof Database>; 508 let ipfsService: IpfsService; 509 510 beforeEach(async () => { 511 tmpDir = mkdtempSync(join(tmpdir(), "verifier-test-")); 512 db = new Database(join(tmpDir, "test.db")); 513 ipfsService = new IpfsService({ 514 db, 515 networking: false, 516 }); 517 await ipfsService.start(); 518 }); 519 520 afterEach(async () => { 521 if (ipfsService.isRunning()) { 522 await ipfsService.stop(); 523 } 524 db.close(); 525 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 526 }); 527 528 it("all blocks available reports 100%", async () => { 529 const verifier = new BlockVerifier(ipfsService); 530 531 const cids: string[] = []; 532 for (let i = 0; i < 3; i++) { 533 const bytes = new TextEncoder().encode(`block-${i}`); 534 const cidStr = await makeCidStr(bytes); 535 await ipfsService.putBlock(cidStr, bytes); 536 cids.push(cidStr); 537 } 538 539 const result = await verifier.verifyBlockAvailability(cids, 10); 540 expect(result.checked).toBe(3); 541 expect(result.available).toBe(3); 542 expect(result.missing).toEqual([]); 543 }); 544 545 it("some blocks missing reports correctly", async () => { 546 const verifier = new BlockVerifier(ipfsService); 547 548 const bytes1 = new TextEncoder().encode("present"); 549 const cid1 = await makeCidStr(bytes1); 550 await ipfsService.putBlock(cid1, bytes1); 551 552 const bytes2 = new TextEncoder().encode("missing"); 553 const cid2 = await makeCidStr(bytes2); 554 // Don't store cid2 555 556 const result = await verifier.verifyBlockAvailability( 557 [cid1, cid2], 558 10, 559 ); 560 expect(result.checked).toBe(2); 561 expect(result.available).toBe(1); 562 expect(result.missing).toContain(cid2); 563 }); 564 565 it("sample size > array length checks all", async () => { 566 const verifier = new BlockVerifier(ipfsService); 567 568 const bytes = new TextEncoder().encode("single"); 569 const cidStr = await makeCidStr(bytes); 570 await ipfsService.putBlock(cidStr, bytes); 571 572 const result = await verifier.verifyBlockAvailability([cidStr], 100); 573 expect(result.checked).toBe(1); 574 expect(result.available).toBe(1); 575 }); 576 577 it("empty array returns zeros", async () => { 578 const verifier = new BlockVerifier(ipfsService); 579 580 const result = await verifier.verifyBlockAvailability([]); 581 expect(result.checked).toBe(0); 582 expect(result.available).toBe(0); 583 expect(result.missing).toEqual([]); 584 }); 585}); 586 587// ============================================ 588// Integration: repo sync (two in-process repos) 589// ============================================ 590 591describe("Integration: repo sync via CAR roundtrip", () => { 592 let tmpDir: string; 593 let sourceDb: InstanceType<typeof Database>; 594 let replicaDb: InstanceType<typeof Database>; 595 let sourceIpfs: IpfsService; 596 let replicaIpfs: IpfsService; 597 let sourceRepo: RepoManager; 598 let replicaRepo: RepoManager; 599 600 beforeEach(async () => { 601 tmpDir = mkdtempSync(join(tmpdir(), "repl-integration-test-")); 602 603 // Source setup 604 const sourceConfig = testConfig(join(tmpDir, "source"), []); 605 sourceDb = new Database(join(tmpDir, "source.db")); 606 sourceIpfs = new IpfsService({ 607 db: sourceDb, 608 networking: false, 609 }); 610 await sourceIpfs.start(); 611 sourceRepo = new RepoManager(sourceDb, sourceConfig); 612 sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 613 614 // Replica setup (different DID for the local node, but will replicate source's data) 615 const replicaConfig = testConfig(join(tmpDir, "replica"), [ 616 sourceConfig.DID!, 617 ]); 618 replicaConfig.DID = "did:plc:replica456"; 619 replicaConfig.SIGNING_KEY = 620 "0000000000000000000000000000000000000000000000000000000000000002"; 621 replicaDb = new Database(join(tmpDir, "replica.db")); 622 replicaIpfs = new IpfsService({ 623 db: replicaDb, 624 networking: false, 625 }); 626 await replicaIpfs.start(); 627 replicaRepo = new RepoManager(replicaDb, replicaConfig); 628 replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 629 }); 630 631 afterEach(async () => { 632 if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 633 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 634 sourceDb.close(); 635 replicaDb.close(); 636 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 637 }); 638 639 it("source records can be replicated via CAR export/import to IPFS", async () => { 640 // 1. Create records in source 641 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 642 $type: "app.bsky.feed.post", 643 text: "Hello from source!", 644 createdAt: new Date().toISOString(), 645 }); 646 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 647 $type: "app.bsky.feed.post", 648 text: "Second post", 649 createdAt: new Date().toISOString(), 650 }); 651 652 // 2. Export as CAR 653 const carBytes = await sourceRepo.getRepoCar(); 654 expect(carBytes.length).toBeGreaterThan(0); 655 656 // 3. Parse CAR on replica side 657 const { root, blocks } = await readCarWithRoot(carBytes); 658 expect(root).toBeDefined(); 659 660 // 4. Store blocks in replica's IPFS 661 await replicaIpfs.putBlocks(blocks); 662 663 // 5. Verify blocks are retrievable from replica's IPFS 664 const internalMap = ( 665 blocks as unknown as { map: Map<string, Uint8Array> } 666 ).map; 667 expect(internalMap).toBeDefined(); 668 expect(internalMap.size).toBeGreaterThan(0); 669 670 for (const cidStr of internalMap.keys()) { 671 const has = await replicaIpfs.hasBlock(cidStr); 672 expect(has).toBe(true); 673 } 674 675 // 6. Verify block availability 676 const verifier = new BlockVerifier(replicaIpfs); 677 const cidStrs = Array.from(internalMap.keys()); 678 const verification = 679 await verifier.verifyBlockAvailability(cidStrs); 680 expect(verification.available).toBe(verification.checked); 681 expect(verification.missing).toEqual([]); 682 }); 683 684 it("tracks block CIDs for verification", async () => { 685 // 1. Create records in source 686 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 687 $type: "app.bsky.feed.post", 688 text: "tracking test", 689 createdAt: new Date().toISOString(), 690 }); 691 692 // 2. Export as CAR, parse, store blocks 693 const carBytes = await sourceRepo.getRepoCar(); 694 const { blocks } = await readCarWithRoot(carBytes); 695 await replicaIpfs.putBlocks(blocks); 696 697 // 3. Collect CID strings 698 const internalMap = ( 699 blocks as unknown as { map: Map<string, Uint8Array> } 700 ).map; 701 const cidStrs = Array.from(internalMap.keys()); 702 703 // 4. Track blocks in sync storage 704 const syncStorage = new SyncStorage(replicaDb); 705 syncStorage.initSchema(); 706 syncStorage.trackBlocks("did:plc:test123", cidStrs); 707 708 // 5. Verify tracking 709 expect(syncStorage.getBlockCount("did:plc:test123")).toBe( 710 cidStrs.length, 711 ); 712 const stored = syncStorage.getBlockCids("did:plc:test123"); 713 expect(stored.sort()).toEqual(cidStrs.sort()); 714 715 // 6. Tracking is idempotent 716 syncStorage.trackBlocks("did:plc:test123", cidStrs); 717 expect(syncStorage.getBlockCount("did:plc:test123")).toBe( 718 cidStrs.length, 719 ); 720 721 // 7. Clear blocks 722 syncStorage.clearBlocks("did:plc:test123"); 723 expect(syncStorage.getBlockCount("did:plc:test123")).toBe(0); 724 }); 725 726 it("manifest record updated with sync rev after replication", async () => { 727 // Create a manifest record in replica 728 const sourceDid = "did:plc:test123"; 729 const rkey = didToRkey(sourceDid); 730 await replicaRepo.putRecord(MANIFEST_NSID, rkey, { 731 $type: MANIFEST_NSID, 732 subject: sourceDid, 733 status: "active", 734 lastSyncRev: null, 735 lastSyncAt: null, 736 createdAt: new Date().toISOString(), 737 }); 738 739 // Simulate sync: create records in source, export, import to IPFS 740 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 741 $type: "app.bsky.feed.post", 742 text: "test post", 743 createdAt: new Date().toISOString(), 744 }); 745 746 const carBytes = await sourceRepo.getRepoCar(); 747 const { root, blocks } = await readCarWithRoot(carBytes); 748 await replicaIpfs.putBlocks(blocks); 749 750 // Update manifest with sync rev 751 const syncRev = root.toString(); 752 await replicaRepo.putRecord(MANIFEST_NSID, rkey, { 753 $type: MANIFEST_NSID, 754 subject: sourceDid, 755 status: "active", 756 lastSyncRev: syncRev, 757 lastSyncAt: new Date().toISOString(), 758 createdAt: new Date().toISOString(), 759 }); 760 761 // Verify manifest was updated 762 const result = await replicaRepo.getRecord(MANIFEST_NSID, rkey); 763 expect(result).not.toBeNull(); 764 const record = result!.record as Record<string, unknown>; 765 expect(record.lastSyncRev).toBe(syncRev); 766 expect(record.lastSyncAt).not.toBeNull(); 767 expect(record.status).toBe("active"); 768 }); 769}); 770 771// ============================================ 772// SyncStorage: Block Tracking 773// ============================================ 774 775describe("SyncStorage block tracking", () => { 776 let tmpDir: string; 777 let db: InstanceType<typeof Database>; 778 let storage: SyncStorage; 779 780 beforeEach(() => { 781 tmpDir = mkdtempSync(join(tmpdir(), "block-tracking-test-")); 782 db = new Database(join(tmpDir, "test.db")); 783 storage = new SyncStorage(db); 784 storage.initSchema(); 785 }); 786 787 afterEach(() => { 788 db.close(); 789 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 790 }); 791 792 it("tracks blocks for a DID", () => { 793 storage.trackBlocks("did:plc:a", ["cid1", "cid2", "cid3"]); 794 expect(storage.getBlockCount("did:plc:a")).toBe(3); 795 expect(storage.getBlockCids("did:plc:a").sort()).toEqual([ 796 "cid1", 797 "cid2", 798 "cid3", 799 ]); 800 }); 801 802 it("ignores duplicate CIDs", () => { 803 storage.trackBlocks("did:plc:a", ["cid1", "cid2"]); 804 storage.trackBlocks("did:plc:a", ["cid2", "cid3"]); 805 expect(storage.getBlockCount("did:plc:a")).toBe(3); 806 }); 807 808 it("tracks blocks per-DID independently", () => { 809 storage.trackBlocks("did:plc:a", ["cid1", "cid2"]); 810 storage.trackBlocks("did:plc:b", ["cid2", "cid3"]); 811 expect(storage.getBlockCount("did:plc:a")).toBe(2); 812 expect(storage.getBlockCount("did:plc:b")).toBe(2); 813 }); 814 815 it("clears blocks for a specific DID", () => { 816 storage.trackBlocks("did:plc:a", ["cid1"]); 817 storage.trackBlocks("did:plc:b", ["cid2"]); 818 storage.clearBlocks("did:plc:a"); 819 expect(storage.getBlockCount("did:plc:a")).toBe(0); 820 expect(storage.getBlockCount("did:plc:b")).toBe(1); 821 }); 822 823 it("returns 0 count for unknown DID", () => { 824 expect(storage.getBlockCount("did:plc:unknown")).toBe(0); 825 }); 826 827 it("returns empty array for unknown DID", () => { 828 expect(storage.getBlockCids("did:plc:unknown")).toEqual([]); 829 }); 830 831 it("handles empty CID array", () => { 832 storage.trackBlocks("did:plc:a", []); 833 expect(storage.getBlockCount("did:plc:a")).toBe(0); 834 }); 835}); 836 837// ============================================ 838// RemoteVerifier 839// ============================================ 840 841describe("RemoteVerifier", () => { 842 let tmpDir: string; 843 let db: InstanceType<typeof Database>; 844 let ipfsService: IpfsService; 845 846 beforeEach(async () => { 847 tmpDir = mkdtempSync(join(tmpdir(), "remote-verifier-test-")); 848 db = new Database(join(tmpDir, "test.db")); 849 ipfsService = new IpfsService({ 850 db, 851 networking: false, 852 }); 853 await ipfsService.start(); 854 }); 855 856 afterEach(async () => { 857 if (ipfsService.isRunning()) { 858 await ipfsService.stop(); 859 } 860 db.close(); 861 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 862 }); 863 864 it("Layer 0: passes when local root matches remote getHead", async () => { 865 const bytes = new TextEncoder().encode("root-block"); 866 const cidStr = await makeCidStr(bytes); 867 await ipfsService.putBlock(cidStr, bytes); 868 869 const mockFetch = async (url: string | URL | Request) => { 870 const urlStr = typeof url === "string" ? url : url.toString(); 871 if (urlStr.includes("getHead")) { 872 return new Response(JSON.stringify({ root: cidStr }), { 873 status: 200, 874 headers: { "Content-Type": "application/json" }, 875 }); 876 } 877 return new Response(null, { status: 404 }); 878 }; 879 880 const verifier = new RemoteVerifier( 881 ipfsService, 882 { raslSampleSize: 10 }, 883 mockFetch as unknown as typeof fetch, 884 ); 885 886 const result = await verifier.verifyPeer( 887 "did:plc:test", 888 "https://pds.example.com", 889 cidStr, 890 [], 891 ); 892 893 const layer0 = result.layers.find((l) => l.layer === 0); 894 expect(layer0).toBeDefined(); 895 expect(layer0!.passed).toBe(true); 896 expect(layer0!.available).toBe(1); 897 }); 898 899 it("Layer 0: fails when commit root returns 404", async () => { 900 const bytes = new TextEncoder().encode("missing-root"); 901 const cidStr = await makeCidStr(bytes); 902 903 const mockFetch = async () => { 904 return new Response(null, { status: 404 }); 905 }; 906 907 const verifier = new RemoteVerifier( 908 ipfsService, 909 {}, 910 mockFetch as unknown as typeof fetch, 911 ); 912 913 const result = await verifier.verifyPeer( 914 "did:plc:test", 915 "https://pds.example.com", 916 cidStr, 917 [], 918 ); 919 920 const layer0 = result.layers.find((l) => l.layer === 0); 921 expect(layer0!.passed).toBe(false); 922 expect(layer0!.missing).toContain(cidStr); 923 }); 924 925 it("Layer 0: fails on network error", async () => { 926 const bytes = new TextEncoder().encode("error-root"); 927 const cidStr = await makeCidStr(bytes); 928 929 const mockFetch = async () => { 930 throw new Error("Connection refused"); 931 }; 932 933 const verifier = new RemoteVerifier( 934 ipfsService, 935 {}, 936 mockFetch as unknown as typeof fetch, 937 ); 938 939 const result = await verifier.verifyPeer( 940 "did:plc:test", 941 "https://pds.example.com", 942 cidStr, 943 [], 944 ); 945 946 const layer0 = result.layers.find((l) => l.layer === 0); 947 expect(layer0!.passed).toBe(false); 948 expect(layer0!.error).toBe("Connection refused"); 949 }); 950 951 it("Layer 1: passes when all sampled blocks match local", async () => { 952 const blocks: Array<{ cid: string; bytes: Uint8Array }> = []; 953 for (let i = 0; i < 5; i++) { 954 const bytes = new TextEncoder().encode(`rasl-block-${i}`); 955 const cidStr = await makeCidStr(bytes); 956 await ipfsService.putBlock(cidStr, bytes); 957 blocks.push({ cid: cidStr, bytes }); 958 } 959 960 const mockFetch = async (url: string | URL | Request) => { 961 const urlStr = typeof url === "string" ? url : url.toString(); 962 const block = blocks.find((b) => urlStr.includes(b.cid)); 963 if (block) { 964 return new Response(block.bytes, { status: 200 }); 965 } 966 return new Response(null, { status: 404 }); 967 }; 968 969 const verifier = new RemoteVerifier( 970 ipfsService, 971 { raslSampleSize: 100 }, 972 mockFetch as unknown as typeof fetch, 973 ); 974 975 const result = await verifier.verifyPeer( 976 "did:plc:test", 977 "https://pds.example.com", 978 null, 979 blocks.map((b) => b.cid), 980 ); 981 982 const layer1 = result.layers.find((l) => l.layer === 1); 983 expect(layer1).toBeDefined(); 984 expect(layer1!.passed).toBe(true); 985 expect(layer1!.checked).toBe(5); 986 expect(layer1!.available).toBe(5); 987 }); 988 989 it("Layer 1: fails when some blocks missing locally", async () => { 990 const presentBytes = new TextEncoder().encode("present-block"); 991 const presentCid = await makeCidStr(presentBytes); 992 await ipfsService.putBlock(presentCid, presentBytes); 993 994 const missingBytes = new TextEncoder().encode("missing-block"); 995 const missingCid = await makeCidStr(missingBytes); 996 // Don't store missingCid — it's tracked but not in blockstore 997 998 const mockFetch = async () => new Response(null, { status: 404 }); 999 1000 const verifier = new RemoteVerifier( 1001 ipfsService, 1002 { raslSampleSize: 100 }, 1003 mockFetch as unknown as typeof fetch, 1004 ); 1005 1006 const result = await verifier.verifyPeer( 1007 "did:plc:test", 1008 "https://pds.example.com", 1009 null, 1010 [presentCid, missingCid], 1011 ); 1012 1013 const layer1 = result.layers.find((l) => l.layer === 1); 1014 expect(layer1!.passed).toBe(false); 1015 expect(layer1!.available).toBe(1); 1016 expect(layer1!.missing).toContain(missingCid); 1017 }); 1018 1019 it("combined: overallPassed is true when all layers pass", async () => { 1020 const bytes = new TextEncoder().encode("all-pass"); 1021 const cidStr = await makeCidStr(bytes); 1022 await ipfsService.putBlock(cidStr, bytes); 1023 1024 const mockFetch = async (url: string | URL | Request) => { 1025 const urlStr = typeof url === "string" ? url : url.toString(); 1026 if (urlStr.includes("getHead")) { 1027 return new Response(JSON.stringify({ root: cidStr }), { 1028 status: 200, 1029 headers: { "Content-Type": "application/json" }, 1030 }); 1031 } 1032 return new Response(null, { status: 404 }); 1033 }; 1034 1035 const verifier = new RemoteVerifier( 1036 ipfsService, 1037 { raslSampleSize: 100 }, 1038 mockFetch as unknown as typeof fetch, 1039 ); 1040 1041 const result = await verifier.verifyPeer( 1042 "did:plc:test", 1043 "https://pds.example.com", 1044 cidStr, 1045 [cidStr], 1046 ); 1047 1048 expect(result.overallPassed).toBe(true); 1049 expect(result.did).toBe("did:plc:test"); 1050 expect(result.pdsEndpoint).toBe("https://pds.example.com"); 1051 expect(result.layers.length).toBe(4); // 0, 1, 2 (stub), 3 (stub) 1052 }); 1053 1054 it("combined: overallPassed is false when any layer fails", async () => { 1055 const bytes = new TextEncoder().encode("fail-test"); 1056 const cidStr = await makeCidStr(bytes); 1057 1058 const mockFetch = async () => { 1059 return new Response(null, { status: 404 }); 1060 }; 1061 1062 const verifier = new RemoteVerifier( 1063 ipfsService, 1064 {}, 1065 mockFetch as unknown as typeof fetch, 1066 ); 1067 1068 const result = await verifier.verifyPeer( 1069 "did:plc:test", 1070 "https://pds.example.com", 1071 cidStr, 1072 [cidStr], 1073 ); 1074 1075 expect(result.overallPassed).toBe(false); 1076 }); 1077 1078 it("Layer 0: fails gracefully when rootCid is null", async () => { 1079 const mockFetch = async () => { 1080 return new Response(null, { status: 404 }); 1081 }; 1082 1083 const verifier = new RemoteVerifier( 1084 ipfsService, 1085 {}, 1086 mockFetch as unknown as typeof fetch, 1087 ); 1088 1089 const result = await verifier.verifyPeer( 1090 "did:plc:test", 1091 "https://pds.example.com", 1092 null, 1093 [], 1094 ); 1095 1096 const layer0 = result.layers.find((l) => l.layer === 0); 1097 expect(layer0).toBeDefined(); 1098 expect(layer0!.passed).toBe(false); 1099 expect(layer0!.error).toContain("no local root CID"); 1100 }); 1101 1102 it("skips Layer 1 when blockCids is empty", async () => { 1103 const mockFetch = async () => { 1104 return new Response(null, { status: 200 }); 1105 }; 1106 1107 const verifier = new RemoteVerifier( 1108 ipfsService, 1109 {}, 1110 mockFetch as unknown as typeof fetch, 1111 ); 1112 1113 const result = await verifier.verifyPeer( 1114 "did:plc:test", 1115 "https://pds.example.com", 1116 null, 1117 [], 1118 ); 1119 1120 expect(result.layers.find((l) => l.layer === 1)).toBeUndefined(); 1121 }); 1122 1123 it("Layer 2 and 3 are stubs", async () => { 1124 const mockFetch = async () => { 1125 return new Response(null, { status: 200 }); 1126 }; 1127 1128 const verifier = new RemoteVerifier( 1129 ipfsService, 1130 {}, 1131 mockFetch as unknown as typeof fetch, 1132 ); 1133 1134 const result = await verifier.verifyPeer( 1135 "did:plc:test", 1136 "https://pds.example.com", 1137 null, 1138 [], 1139 ); 1140 1141 const layer2 = result.layers.find((l) => l.layer === 2); 1142 expect(layer2).toBeDefined(); 1143 expect(layer2!.checked).toBe(0); 1144 expect(layer2!.error).toContain("not implemented"); 1145 1146 const layer3 = result.layers.find((l) => l.layer === 3); 1147 expect(layer3).toBeDefined(); 1148 expect(layer3!.checked).toBe(0); 1149 expect(layer3!.error).toContain("not implemented"); 1150 }); 1151 1152 it("respects raslSampleSize config", async () => { 1153 const blocks: Array<{ cid: string; bytes: Uint8Array }> = []; 1154 for (let i = 0; i < 20; i++) { 1155 const bytes = new TextEncoder().encode(`sample-block-${i}`); 1156 const cidStr = await makeCidStr(bytes); 1157 await ipfsService.putBlock(cidStr, bytes); 1158 blocks.push({ cid: cidStr, bytes }); 1159 } 1160 1161 const mockFetch = async (url: string | URL | Request) => { 1162 const urlStr = typeof url === "string" ? url : url.toString(); 1163 const block = blocks.find((b) => urlStr.includes(b.cid)); 1164 if (block) { 1165 return new Response(block.bytes, { status: 200 }); 1166 } 1167 return new Response(null, { status: 404 }); 1168 }; 1169 1170 const verifier = new RemoteVerifier( 1171 ipfsService, 1172 { raslSampleSize: 5 }, 1173 mockFetch as unknown as typeof fetch, 1174 ); 1175 1176 const result = await verifier.verifyPeer( 1177 "did:plc:test", 1178 "https://pds.example.com", 1179 null, 1180 blocks.map((b) => b.cid), 1181 ); 1182 1183 const layer1 = result.layers.find((l) => l.layer === 1); 1184 expect(layer1!.checked).toBe(5); 1185 }); 1186}); 1187 1188// ============================================ 1189// IpfsReadableBlockstore 1190// ============================================ 1191 1192describe("IpfsReadableBlockstore", () => { 1193 let tmpDir: string; 1194 let db: InstanceType<typeof Database>; 1195 let ipfsService: IpfsService; 1196 let readableBlockstore: IpfsReadableBlockstore; 1197 1198 beforeEach(async () => { 1199 tmpDir = mkdtempSync(join(tmpdir(), "ipfs-readable-bs-test-")); 1200 db = new Database(join(tmpDir, "test.db")); 1201 ipfsService = new IpfsService({ 1202 db, 1203 networking: false, 1204 }); 1205 await ipfsService.start(); 1206 readableBlockstore = new IpfsReadableBlockstore(ipfsService); 1207 }); 1208 1209 afterEach(async () => { 1210 if (ipfsService.isRunning()) await ipfsService.stop(); 1211 db.close(); 1212 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1213 }); 1214 1215 it("getBytes roundtrip", async () => { 1216 const bytes = new TextEncoder().encode("readable-test"); 1217 const cidStr = await makeCidStr(bytes); 1218 await ipfsService.putBlock(cidStr, bytes); 1219 1220 const { CID: MfCID } = await import("multiformats"); 1221 const cid = MfCID.parse(cidStr); 1222 const result = await readableBlockstore.getBytes(cid); 1223 expect(result).not.toBeNull(); 1224 expect(Buffer.from(result!)).toEqual(Buffer.from(bytes)); 1225 }); 1226 1227 it("has returns true/false correctly", async () => { 1228 const bytes = new TextEncoder().encode("has-test"); 1229 const cidStr = await makeCidStr(bytes); 1230 await ipfsService.putBlock(cidStr, bytes); 1231 1232 const { CID: MfCID } = await import("multiformats"); 1233 const present = MfCID.parse(cidStr); 1234 expect(await readableBlockstore.has(present)).toBe(true); 1235 1236 const missingBytes = new TextEncoder().encode("missing-test"); 1237 const missingCidStr = await makeCidStr(missingBytes); 1238 const missing = MfCID.parse(missingCidStr); 1239 expect(await readableBlockstore.has(missing)).toBe(false); 1240 }); 1241 1242 it("getBlocks returns blocks + missing", async () => { 1243 const bytes1 = new TextEncoder().encode("block-a"); 1244 const cidStr1 = await makeCidStr(bytes1); 1245 await ipfsService.putBlock(cidStr1, bytes1); 1246 1247 const bytes2 = new TextEncoder().encode("block-b-missing"); 1248 const cidStr2 = await makeCidStr(bytes2); 1249 1250 const { CID: MfCID } = await import("multiformats"); 1251 const cid1 = MfCID.parse(cidStr1); 1252 const cid2 = MfCID.parse(cidStr2); 1253 1254 const { blocks, missing } = await readableBlockstore.getBlocks([ 1255 cid1, 1256 cid2, 1257 ]); 1258 expect(blocks.has(cid1)).toBe(true); 1259 expect(missing).toHaveLength(1); 1260 expect(missing[0]!.toString()).toBe(cidStr2); 1261 }); 1262}); 1263 1264// ============================================ 1265// ReplicatedRepoReader 1266// ============================================ 1267 1268describe("ReplicatedRepoReader", () => { 1269 let tmpDir: string; 1270 let sourceDb: InstanceType<typeof Database>; 1271 let replicaDb: InstanceType<typeof Database>; 1272 let sourceIpfs: IpfsService; 1273 let replicaIpfs: IpfsService; 1274 let sourceRepo: RepoManager; 1275 let syncStorage: SyncStorage; 1276 let reader: ReplicatedRepoReader; 1277 const sourceDid = "did:plc:test123"; 1278 1279 beforeEach(async () => { 1280 tmpDir = mkdtempSync(join(tmpdir(), "replicated-reader-test-")); 1281 1282 // Source setup 1283 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1284 sourceDb = new Database(join(tmpDir, "source.db")); 1285 sourceIpfs = new IpfsService({ 1286 db: sourceDb, 1287 networking: false, 1288 }); 1289 await sourceIpfs.start(); 1290 sourceRepo = new RepoManager(sourceDb, sourceConfig); 1291 sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1292 1293 // Replica IPFS + sync storage 1294 replicaDb = new Database(join(tmpDir, "replica.db")); 1295 replicaIpfs = new IpfsService({ 1296 db: replicaDb, 1297 networking: false, 1298 }); 1299 await replicaIpfs.start(); 1300 1301 syncStorage = new SyncStorage(replicaDb); 1302 syncStorage.initSchema(); 1303 1304 reader = new ReplicatedRepoReader(replicaIpfs, syncStorage); 1305 }); 1306 1307 afterEach(async () => { 1308 if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1309 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1310 sourceDb.close(); 1311 replicaDb.close(); 1312 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1313 }); 1314 1315 /** Helper: create records in source, export CAR, store blocks in replica IPFS, record root_cid. */ 1316 async function replicateSource(): Promise<{ rootCid: string; rev: string }> { 1317 const carBytes = await sourceRepo.getRepoCar(); 1318 const { root, blocks } = await readCarWithRoot(carBytes); 1319 await replicaIpfs.putBlocks(blocks); 1320 1321 const rootCidStr = root.toString(); 1322 1323 // Extract rev from commit block 1324 const internalMap = ( 1325 blocks as unknown as { map: Map<string, Uint8Array> } 1326 ).map; 1327 let rev = rootCidStr; 1328 const commitBytes = internalMap?.get(rootCidStr); 1329 if (commitBytes) { 1330 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1331 if (typeof commitObj.rev === "string") { 1332 rev = commitObj.rev; 1333 } 1334 } 1335 1336 // Record in sync storage 1337 syncStorage.upsertState({ 1338 did: sourceDid, 1339 pdsEndpoint: "https://pds.example.com", 1340 }); 1341 syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); 1342 1343 return { rootCid: rootCidStr, rev }; 1344 } 1345 1346 it("returns null for unknown DID", async () => { 1347 const result = await reader.getRecord( 1348 "did:plc:unknown", 1349 "app.bsky.feed.post", 1350 "abc", 1351 ); 1352 expect(result).toBeNull(); 1353 expect(reader.isReplicatedDid("did:plc:unknown")).toBe(false); 1354 }); 1355 1356 it("getRecord returns record with correct CID and value", async () => { 1357 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1358 $type: "app.bsky.feed.post", 1359 text: "Hello replicated!", 1360 createdAt: "2025-01-01T00:00:00.000Z", 1361 }); 1362 await replicateSource(); 1363 1364 // List records from source to get the rkey 1365 const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", { 1366 limit: 10, 1367 }); 1368 const firstRecord = sourceRecords.records[0]!; 1369 const rkey = firstRecord.uri.split("/").pop()!; 1370 1371 const result = await reader.getRecord( 1372 sourceDid, 1373 "app.bsky.feed.post", 1374 rkey, 1375 ); 1376 expect(result).not.toBeNull(); 1377 expect(result!.cid).toBeDefined(); 1378 expect(typeof result!.cid).toBe("string"); 1379 const value = result!.value as Record<string, unknown>; 1380 expect(value.text).toBe("Hello replicated!"); 1381 }); 1382 1383 it("listRecords returns records with cursor support", async () => { 1384 // Create multiple records 1385 for (let i = 0; i < 5; i++) { 1386 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1387 $type: "app.bsky.feed.post", 1388 text: `Post ${i}`, 1389 createdAt: new Date().toISOString(), 1390 }); 1391 } 1392 await replicateSource(); 1393 1394 // First page 1395 const page1 = await reader.listRecords(sourceDid, "app.bsky.feed.post", { 1396 limit: 3, 1397 }); 1398 expect(page1.records).toHaveLength(3); 1399 expect(page1.cursor).toBeDefined(); 1400 1401 // Second page 1402 const page2 = await reader.listRecords(sourceDid, "app.bsky.feed.post", { 1403 limit: 3, 1404 cursor: page1.cursor, 1405 }); 1406 expect(page2.records).toHaveLength(2); 1407 1408 // All records have correct URIs 1409 for (const rec of [...page1.records, ...page2.records]) { 1410 expect(rec.uri).toMatch( 1411 new RegExp(`^at://${sourceDid}/app\\.bsky\\.feed\\.post/`), 1412 ); 1413 expect(rec.cid).toBeDefined(); 1414 expect(rec.value).toBeDefined(); 1415 } 1416 }); 1417 1418 it("describeRepo returns collections list", async () => { 1419 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1420 $type: "app.bsky.feed.post", 1421 text: "post", 1422 createdAt: new Date().toISOString(), 1423 }); 1424 await sourceRepo.putRecord("app.bsky.actor.profile", "self", { 1425 $type: "app.bsky.actor.profile", 1426 displayName: "Test", 1427 }); 1428 await replicateSource(); 1429 1430 const result = await reader.describeRepo(sourceDid); 1431 expect(result).not.toBeNull(); 1432 expect(result!.did).toBe(sourceDid); 1433 expect(result!.collections).toContain("app.bsky.feed.post"); 1434 expect(result!.collections).toContain("app.bsky.actor.profile"); 1435 }); 1436 1437 it("getRepoStatus returns rev and root CID", async () => { 1438 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1439 $type: "app.bsky.feed.post", 1440 text: "status test", 1441 createdAt: new Date().toISOString(), 1442 }); 1443 const { rootCid, rev } = await replicateSource(); 1444 1445 const status = reader.getRepoStatus(sourceDid); 1446 expect(status).not.toBeNull(); 1447 expect(status!.did).toBe(sourceDid); 1448 expect(status!.rootCid).toBe(rootCid); 1449 expect(status!.rev).toBe(rev); 1450 expect(status!.active).toBe(true); 1451 }); 1452 1453 it("cache invalidation works", async () => { 1454 // First sync 1455 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1456 $type: "app.bsky.feed.post", 1457 text: "First post", 1458 createdAt: new Date().toISOString(), 1459 }); 1460 await replicateSource(); 1461 1462 const result1 = await reader.listRecords( 1463 sourceDid, 1464 "app.bsky.feed.post", 1465 { limit: 100 }, 1466 ); 1467 expect(result1.records).toHaveLength(1); 1468 1469 // Add another record and re-sync 1470 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1471 $type: "app.bsky.feed.post", 1472 text: "Second post", 1473 createdAt: new Date().toISOString(), 1474 }); 1475 reader.invalidateCache(sourceDid); 1476 await replicateSource(); 1477 1478 const result2 = await reader.listRecords( 1479 sourceDid, 1480 "app.bsky.feed.post", 1481 { limit: 100 }, 1482 ); 1483 expect(result2.records).toHaveLength(2); 1484 }); 1485}); 1486 1487// ============================================ 1488// XRPC integration (replicated repos) 1489// ============================================ 1490 1491describe("XRPC integration: replicated repos", () => { 1492 let tmpDir: string; 1493 let sourceDb: InstanceType<typeof Database>; 1494 let replicaDb: InstanceType<typeof Database>; 1495 let sourceIpfs: IpfsService; 1496 let replicaIpfs: IpfsService; 1497 let sourceRepo: RepoManager; 1498 let replicaRepo: RepoManager; 1499 let syncStorage: SyncStorage; 1500 let reader: ReplicatedRepoReader; 1501 let app: ReturnType<typeof createApp>; 1502 const sourceDid = "did:plc:test123"; 1503 const replicaDid = "did:plc:replica456"; 1504 1505 beforeEach(async () => { 1506 tmpDir = mkdtempSync(join(tmpdir(), "xrpc-replicated-test-")); 1507 1508 // Source setup 1509 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1510 sourceDb = new Database(join(tmpDir, "source.db")); 1511 sourceIpfs = new IpfsService({ 1512 db: sourceDb, 1513 networking: false, 1514 }); 1515 await sourceIpfs.start(); 1516 sourceRepo = new RepoManager(sourceDb, sourceConfig); 1517 sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1518 1519 // Replica setup 1520 const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); 1521 replicaConfig.DID = replicaDid; 1522 replicaConfig.SIGNING_KEY = 1523 "0000000000000000000000000000000000000000000000000000000000000002"; 1524 replicaDb = new Database(join(tmpDir, "replica.db")); 1525 replicaIpfs = new IpfsService({ 1526 db: replicaDb, 1527 networking: false, 1528 }); 1529 await replicaIpfs.start(); 1530 replicaRepo = new RepoManager(replicaDb, replicaConfig); 1531 replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 1532 1533 syncStorage = new SyncStorage(replicaDb); 1534 syncStorage.initSchema(); 1535 reader = new ReplicatedRepoReader(replicaIpfs, syncStorage); 1536 1537 const firehose = new Firehose(replicaRepo); 1538 app = createApp( 1539 replicaConfig, 1540 firehose, 1541 replicaIpfs, 1542 replicaIpfs, 1543 undefined, 1544 undefined, 1545 reader, 1546 replicaRepo, 1547 ); 1548 }); 1549 1550 afterEach(async () => { 1551 if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1552 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1553 sourceDb.close(); 1554 replicaDb.close(); 1555 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1556 }); 1557 1558 async function replicateSource(): Promise<void> { 1559 const carBytes = await sourceRepo.getRepoCar(); 1560 const { root, blocks } = await readCarWithRoot(carBytes); 1561 await replicaIpfs.putBlocks(blocks); 1562 1563 const rootCidStr = root.toString(); 1564 const internalMap = ( 1565 blocks as unknown as { map: Map<string, Uint8Array> } 1566 ).map; 1567 let rev = rootCidStr; 1568 const commitBytes = internalMap?.get(rootCidStr); 1569 if (commitBytes) { 1570 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1571 if (typeof commitObj.rev === "string") { 1572 rev = commitObj.rev; 1573 } 1574 } 1575 1576 syncStorage.upsertState({ 1577 did: sourceDid, 1578 pdsEndpoint: "https://pds.example.com", 1579 }); 1580 syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); 1581 reader.invalidateCache(sourceDid); 1582 } 1583 1584 it("GET getRecord for replicated DID → 200", async () => { 1585 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1586 $type: "app.bsky.feed.post", 1587 text: "XRPC replicated test", 1588 createdAt: "2025-01-01T00:00:00.000Z", 1589 }); 1590 await replicateSource(); 1591 1592 const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", { 1593 limit: 10, 1594 }); 1595 const rkey = sourceRecords.records[0]!.uri.split("/").pop()!; 1596 1597 const res = await app.request( 1598 `/xrpc/com.atproto.repo.getRecord?repo=${sourceDid}&collection=app.bsky.feed.post&rkey=${rkey}`, 1599 undefined, 1600 {}, 1601 ); 1602 expect(res.status).toBe(200); 1603 1604 const json = (await res.json()) as { 1605 uri: string; 1606 cid: string; 1607 value: Record<string, unknown>; 1608 }; 1609 expect(json.uri).toContain(sourceDid); 1610 expect(json.value.text).toBe("XRPC replicated test"); 1611 }); 1612 1613 it("GET listRecords for replicated DID → 200", async () => { 1614 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1615 $type: "app.bsky.feed.post", 1616 text: "list test 1", 1617 createdAt: new Date().toISOString(), 1618 }); 1619 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1620 $type: "app.bsky.feed.post", 1621 text: "list test 2", 1622 createdAt: new Date().toISOString(), 1623 }); 1624 await replicateSource(); 1625 1626 const res = await app.request( 1627 `/xrpc/com.atproto.repo.listRecords?repo=${sourceDid}&collection=app.bsky.feed.post`, 1628 undefined, 1629 {}, 1630 ); 1631 expect(res.status).toBe(200); 1632 1633 const json = (await res.json()) as { 1634 records: Array<{ uri: string; value: Record<string, unknown> }>; 1635 }; 1636 expect(json.records.length).toBe(2); 1637 }); 1638 1639 it("GET describeRepo for replicated DID → 200", async () => { 1640 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1641 $type: "app.bsky.feed.post", 1642 text: "describe test", 1643 createdAt: new Date().toISOString(), 1644 }); 1645 await replicateSource(); 1646 1647 const res = await app.request( 1648 `/xrpc/com.atproto.repo.describeRepo?repo=${sourceDid}`, 1649 undefined, 1650 {}, 1651 ); 1652 expect(res.status).toBe(200); 1653 1654 const json = (await res.json()) as { 1655 did: string; 1656 collections: string[]; 1657 }; 1658 expect(json.did).toBe(sourceDid); 1659 expect(json.collections).toContain("app.bsky.feed.post"); 1660 }); 1661 1662 it("non-replicated foreign DID → 404", async () => { 1663 const res = await app.request( 1664 `/xrpc/com.atproto.repo.getRecord?repo=did:plc:nonexistent&collection=app.bsky.feed.post&rkey=abc`, 1665 undefined, 1666 {}, 1667 ); 1668 expect(res.status).toBe(404); 1669 }); 1670 1671 it("local DID still works via existing path", async () => { 1672 await replicaRepo.createRecord("app.bsky.feed.post", undefined, { 1673 $type: "app.bsky.feed.post", 1674 text: "local post", 1675 createdAt: new Date().toISOString(), 1676 }); 1677 1678 const records = await replicaRepo.listRecords("app.bsky.feed.post", { 1679 limit: 10, 1680 }); 1681 const rkey = records.records[0]!.uri.split("/").pop()!; 1682 1683 const res = await app.request( 1684 `/xrpc/com.atproto.repo.getRecord?repo=${replicaDid}&collection=app.bsky.feed.post&rkey=${rkey}`, 1685 undefined, 1686 {}, 1687 ); 1688 expect(res.status).toBe(200); 1689 1690 const json = (await res.json()) as { 1691 uri: string; 1692 value: Record<string, unknown>; 1693 }; 1694 expect(json.value.text).toBe("local post"); 1695 }); 1696}); 1697 1698// ============================================ 1699// root_cid + rev extraction 1700// ============================================ 1701 1702describe("root_cid + rev extraction", () => { 1703 let tmpDir: string; 1704 let sourceDb: InstanceType<typeof Database>; 1705 let sourceIpfs: IpfsService; 1706 let sourceRepo: RepoManager; 1707 let replicaDb: InstanceType<typeof Database>; 1708 let replicaIpfs: IpfsService; 1709 let syncStorage: SyncStorage; 1710 1711 beforeEach(async () => { 1712 tmpDir = mkdtempSync(join(tmpdir(), "rev-extraction-test-")); 1713 1714 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1715 sourceDb = new Database(join(tmpDir, "source.db")); 1716 sourceIpfs = new IpfsService({ 1717 db: sourceDb, 1718 networking: false, 1719 }); 1720 await sourceIpfs.start(); 1721 sourceRepo = new RepoManager(sourceDb, sourceConfig); 1722 sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1723 1724 replicaDb = new Database(join(tmpDir, "replica.db")); 1725 replicaIpfs = new IpfsService({ 1726 db: replicaDb, 1727 networking: false, 1728 }); 1729 await replicaIpfs.start(); 1730 1731 syncStorage = new SyncStorage(replicaDb); 1732 syncStorage.initSchema(); 1733 }); 1734 1735 afterEach(async () => { 1736 if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1737 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1738 sourceDb.close(); 1739 replicaDb.close(); 1740 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1741 }); 1742 1743 it("after sync, replication_state has both root_cid and last_sync_rev (actual TID)", async () => { 1744 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1745 $type: "app.bsky.feed.post", 1746 text: "rev test", 1747 createdAt: new Date().toISOString(), 1748 }); 1749 1750 const carBytes = await sourceRepo.getRepoCar(); 1751 const { root, blocks } = await readCarWithRoot(carBytes); 1752 await replicaIpfs.putBlocks(blocks); 1753 1754 const rootCidStr = root.toString(); 1755 const internalMap = ( 1756 blocks as unknown as { map: Map<string, Uint8Array> } 1757 ).map; 1758 let rev = rootCidStr; 1759 const commitBytes = internalMap?.get(rootCidStr); 1760 if (commitBytes) { 1761 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1762 if (typeof commitObj.rev === "string") { 1763 rev = commitObj.rev; 1764 } 1765 } 1766 1767 const did = "did:plc:test123"; 1768 syncStorage.upsertState({ 1769 did, 1770 pdsEndpoint: "https://pds.example.com", 1771 }); 1772 syncStorage.updateSyncProgress(did, rev, rootCidStr); 1773 1774 const state = syncStorage.getState(did); 1775 expect(state).not.toBeNull(); 1776 expect(state!.rootCid).toBe(rootCidStr); 1777 expect(state!.lastSyncRev).toBe(rev); 1778 // Rev should be a TID (not a CID) 1779 expect(state!.lastSyncRev).not.toBe(rootCidStr); 1780 }); 1781 1782 it("root_cid is a valid CID string and last_sync_rev is a TID", async () => { 1783 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1784 $type: "app.bsky.feed.post", 1785 text: "cid validation test", 1786 createdAt: new Date().toISOString(), 1787 }); 1788 1789 const carBytes = await sourceRepo.getRepoCar(); 1790 const { root, blocks } = await readCarWithRoot(carBytes); 1791 await replicaIpfs.putBlocks(blocks); 1792 1793 const rootCidStr = root.toString(); 1794 const internalMap = ( 1795 blocks as unknown as { map: Map<string, Uint8Array> } 1796 ).map; 1797 let rev = rootCidStr; 1798 const commitBytes = internalMap?.get(rootCidStr); 1799 if (commitBytes) { 1800 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1801 if (typeof commitObj.rev === "string") { 1802 rev = commitObj.rev; 1803 } 1804 } 1805 1806 // root_cid should be parseable as a CID 1807 const { CID: MfCID } = await import("multiformats"); 1808 expect(() => MfCID.parse(rootCidStr)).not.toThrow(); 1809 1810 // rev should be a TID (base32-sortable, 13 chars) 1811 expect(rev).toMatch(/^[a-z2-7]{13}$/); 1812 }); 1813}); 1814 1815// ============================================ 1816// SyncStorage: Peer Endpoint Tracking 1817// ============================================ 1818 1819describe("SyncStorage peer endpoint tracking", () => { 1820 let tmpDir: string; 1821 let db: InstanceType<typeof Database>; 1822 let storage: SyncStorage; 1823 1824 beforeEach(() => { 1825 tmpDir = mkdtempSync(join(tmpdir(), "peer-endpoint-test-")); 1826 db = new Database(join(tmpDir, "test.db")); 1827 storage = new SyncStorage(db); 1828 storage.initSchema(); 1829 }); 1830 1831 afterEach(() => { 1832 db.close(); 1833 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1834 }); 1835 1836 it("upsertPeerEndpoint + getPeerEndpoints returns entries", () => { 1837 storage.upsertPeerEndpoint( 1838 "did:plc:target1", 1839 "did:plc:peer1", 1840 "https://peer1.example.com", 1841 "rev123", 1842 ); 1843 const peers = storage.getPeerEndpoints("did:plc:target1"); 1844 expect(peers).toHaveLength(1); 1845 expect(peers[0]!.peerDid).toBe("did:plc:peer1"); 1846 expect(peers[0]!.pdsEndpoint).toBe("https://peer1.example.com"); 1847 expect(peers[0]!.lastSyncRev).toBe("rev123"); 1848 }); 1849 1850 it("duplicate peer_did+target_did upsert updates existing", () => { 1851 storage.upsertPeerEndpoint( 1852 "did:plc:target1", 1853 "did:plc:peer1", 1854 "https://old.example.com", 1855 "rev1", 1856 ); 1857 storage.upsertPeerEndpoint( 1858 "did:plc:target1", 1859 "did:plc:peer1", 1860 "https://new.example.com", 1861 "rev2", 1862 ); 1863 const peers = storage.getPeerEndpoints("did:plc:target1"); 1864 expect(peers).toHaveLength(1); 1865 expect(peers[0]!.pdsEndpoint).toBe("https://new.example.com"); 1866 expect(peers[0]!.lastSyncRev).toBe("rev2"); 1867 }); 1868 1869 it("getPeerEndpoints returns empty for unknown DID", () => { 1870 const peers = storage.getPeerEndpoints("did:plc:unknown"); 1871 expect(peers).toEqual([]); 1872 }); 1873 1874 it("clearPeerEndpoints removes all entries for a DID", () => { 1875 storage.upsertPeerEndpoint( 1876 "did:plc:target1", 1877 "did:plc:peer1", 1878 "https://peer1.example.com", 1879 null, 1880 ); 1881 storage.upsertPeerEndpoint( 1882 "did:plc:target1", 1883 "did:plc:peer2", 1884 "https://peer2.example.com", 1885 null, 1886 ); 1887 storage.upsertPeerEndpoint( 1888 "did:plc:target2", 1889 "did:plc:peer1", 1890 "https://peer1.example.com", 1891 null, 1892 ); 1893 1894 storage.clearPeerEndpoints("did:plc:target1"); 1895 expect(storage.getPeerEndpoints("did:plc:target1")).toEqual([]); 1896 // Other target DID's entries should remain 1897 expect(storage.getPeerEndpoints("did:plc:target2")).toHaveLength(1); 1898 }); 1899}); 1900 1901// ============================================ 1902// XRPC: getRepo / getBlocks for replicated DIDs 1903// ============================================ 1904 1905describe("XRPC: getRepo + getBlocks for replicated DIDs", () => { 1906 let tmpDir: string; 1907 let sourceDb: InstanceType<typeof Database>; 1908 let replicaDb: InstanceType<typeof Database>; 1909 let sourceIpfs: IpfsService; 1910 let replicaIpfs: IpfsService; 1911 let sourceRepo: RepoManager; 1912 let replicaRepo: RepoManager; 1913 let syncStorage: SyncStorage; 1914 let app: ReturnType<typeof createApp>; 1915 const sourceDid = "did:plc:test123"; 1916 const replicaDid = "did:plc:replica456"; 1917 let trackedCids: string[] = []; 1918 1919 beforeEach(async () => { 1920 tmpDir = mkdtempSync(join(tmpdir(), "xrpc-getrepo-test-")); 1921 1922 // Source setup 1923 const sourceConfig = testConfig(join(tmpDir, "source"), []); 1924 sourceDb = new Database(join(tmpDir, "source.db")); 1925 sourceIpfs = new IpfsService({ 1926 db: sourceDb, 1927 networking: false, 1928 }); 1929 await sourceIpfs.start(); 1930 sourceRepo = new RepoManager(sourceDb, sourceConfig); 1931 sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1932 1933 // Replica setup 1934 const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); 1935 replicaConfig.DID = replicaDid; 1936 replicaConfig.SIGNING_KEY = 1937 "0000000000000000000000000000000000000000000000000000000000000002"; 1938 replicaDb = new Database(join(tmpDir, "replica.db")); 1939 replicaIpfs = new IpfsService({ 1940 db: replicaDb, 1941 networking: false, 1942 }); 1943 await replicaIpfs.start(); 1944 replicaRepo = new RepoManager(replicaDb, replicaConfig); 1945 replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 1946 1947 syncStorage = new SyncStorage(replicaDb); 1948 syncStorage.initSchema(); 1949 1950 // Create a mock replicationManager with getSyncStorage 1951 const mockReplicationManager = { 1952 getSyncStorage: () => syncStorage, 1953 } as unknown as import("./replication-manager.js").ReplicationManager; 1954 1955 const firehose = new Firehose(replicaRepo); 1956 app = createApp( 1957 replicaConfig, 1958 firehose, 1959 replicaIpfs, // blockStore 1960 replicaIpfs, // networkService 1961 undefined, // blobStore 1962 mockReplicationManager, 1963 undefined, // replicatedRepoReader 1964 replicaRepo, 1965 ); 1966 }); 1967 1968 afterEach(async () => { 1969 if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1970 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1971 sourceDb.close(); 1972 replicaDb.close(); 1973 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1974 }); 1975 1976 async function replicateSource(): Promise<void> { 1977 const carBytes = await sourceRepo.getRepoCar(); 1978 const { root, blocks } = await readCarWithRoot(carBytes); 1979 await replicaIpfs.putBlocks(blocks); 1980 1981 const rootCidStr = root.toString(); 1982 const internalMap = ( 1983 blocks as unknown as { map: Map<string, Uint8Array> } 1984 ).map; 1985 let rev = rootCidStr; 1986 const commitBytes = internalMap?.get(rootCidStr); 1987 if (commitBytes) { 1988 const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1989 if (typeof commitObj.rev === "string") { 1990 rev = commitObj.rev; 1991 } 1992 } 1993 1994 // Track block CIDs 1995 trackedCids = Array.from(internalMap.keys()); 1996 1997 syncStorage.upsertState({ 1998 did: sourceDid, 1999 pdsEndpoint: "https://pds.example.com", 2000 }); 2001 syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); 2002 syncStorage.trackBlocks(sourceDid, trackedCids); 2003 } 2004 2005 it("getRepo serves replicated DID repo as valid CAR", async () => { 2006 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 2007 $type: "app.bsky.feed.post", 2008 text: "replicated getRepo test", 2009 createdAt: new Date().toISOString(), 2010 }); 2011 await replicateSource(); 2012 2013 const res = await app.request( 2014 `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`, 2015 undefined, 2016 {}, 2017 ); 2018 expect(res.status).toBe(200); 2019 expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); 2020 2021 // Parse the CAR and verify it has a valid root + blocks 2022 const carBytes = new Uint8Array(await res.arrayBuffer()); 2023 const { root, blocks } = await readCarWithRoot(carBytes); 2024 expect(root).toBeDefined(); 2025 const internalMap = ( 2026 blocks as unknown as { map: Map<string, Uint8Array> } 2027 ).map; 2028 expect(internalMap.size).toBeGreaterThan(0); 2029 }); 2030 2031 it("getRepo returns 404 for non-replicated DID", async () => { 2032 const res = await app.request( 2033 `/xrpc/com.atproto.sync.getRepo?did=did:plc:nonexistent`, 2034 undefined, 2035 {}, 2036 ); 2037 expect(res.status).toBe(404); 2038 }); 2039 2040 it("getRepo returns 404 for replicated DID with no rootCid yet", async () => { 2041 // Create state without rootCid 2042 syncStorage.upsertState({ 2043 did: sourceDid, 2044 pdsEndpoint: "https://pds.example.com", 2045 }); 2046 2047 const res = await app.request( 2048 `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`, 2049 undefined, 2050 {}, 2051 ); 2052 expect(res.status).toBe(404); 2053 }); 2054 2055 it("getBlocks serves requested blocks for replicated DID", async () => { 2056 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 2057 $type: "app.bsky.feed.post", 2058 text: "replicated getBlocks test", 2059 createdAt: new Date().toISOString(), 2060 }); 2061 await replicateSource(); 2062 2063 // Request a subset of tracked CIDs 2064 const requestCids = trackedCids.slice(0, 2); 2065 const cidsQuery = requestCids.map((c) => `cids=${c}`).join("&"); 2066 const res = await app.request( 2067 `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&${cidsQuery}`, 2068 undefined, 2069 {}, 2070 ); 2071 expect(res.status).toBe(200); 2072 expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); 2073 2074 const carBytes = new Uint8Array(await res.arrayBuffer()); 2075 const { blocks } = await readCarWithRoot(carBytes); 2076 const internalMap = ( 2077 blocks as unknown as { map: Map<string, Uint8Array> } 2078 ).map; 2079 // Should contain the requested blocks 2080 for (const cid of requestCids) { 2081 expect(internalMap.has(cid)).toBe(true); 2082 } 2083 }); 2084 2085 it("getBlocks returns 404 for non-tracked DID", async () => { 2086 const res = await app.request( 2087 `/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent&cids=bafytest`, 2088 undefined, 2089 {}, 2090 ); 2091 expect(res.status).toBe(404); 2092 }); 2093 2094 it("getBlocks only serves blocks tracked for the requested DID", async () => { 2095 await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 2096 $type: "app.bsky.feed.post", 2097 text: "security test", 2098 createdAt: new Date().toISOString(), 2099 }); 2100 await replicateSource(); 2101 2102 // Store a block that is NOT tracked for sourceDid 2103 const untrackedBytes = new TextEncoder().encode("untracked-block"); 2104 const untrackedCid = await makeCidStr(untrackedBytes); 2105 await replicaIpfs.putBlock(untrackedCid, untrackedBytes); 2106 2107 // Request the untracked CID for the tracked DID 2108 const res = await app.request( 2109 `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&cids=${untrackedCid}`, 2110 undefined, 2111 {}, 2112 ); 2113 expect(res.status).toBe(200); 2114 // The CAR should be returned but the untracked block should not be in it 2115 const carBytes = new Uint8Array(await res.arrayBuffer()); 2116 const { blocks } = await readCarWithRoot(carBytes); 2117 const internalMap = ( 2118 blocks as unknown as { map: Map<string, Uint8Array> } 2119 ).map; 2120 expect(internalMap.has(untrackedCid)).toBe(false); 2121 }); 2122}); 2123 2124// ============================================ 2125// Peer fallback sync 2126// ============================================ 2127 2128describe("Peer fallback in syncDid", () => { 2129 let tmpDir: string; 2130 let db: InstanceType<typeof Database>; 2131 let ipfsService: IpfsService; 2132 let repoManager: RepoManager; 2133 let syncStorage: SyncStorage; 2134 const localDid = "did:plc:local"; 2135 const remoteDid = "did:plc:remote1"; 2136 2137 beforeEach(async () => { 2138 tmpDir = mkdtempSync(join(tmpdir(), "peer-fallback-test-")); 2139 const config = testConfig(join(tmpDir, "data"), [remoteDid]); 2140 config.DID = localDid; 2141 2142 db = new Database(join(tmpDir, "test.db")); 2143 ipfsService = new IpfsService({ 2144 db, 2145 networking: false, 2146 }); 2147 await ipfsService.start(); 2148 2149 repoManager = new RepoManager(db, config); 2150 repoManager.init(undefined, ipfsService, ipfsService); 2151 2152 syncStorage = new SyncStorage(db); 2153 syncStorage.initSchema(); 2154 }); 2155 2156 afterEach(async () => { 2157 if (ipfsService.isRunning()) await ipfsService.stop(); 2158 db.close(); 2159 try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 2160 }); 2161 2162 it("source PDS fails → fetches from peer endpoint successfully", async () => { 2163 // Create a real CAR from the local repo to serve as peer response 2164 await repoManager.createRecord("app.bsky.feed.post", undefined, { 2165 $type: "app.bsky.feed.post", 2166 text: "peer fallback test", 2167 createdAt: new Date().toISOString(), 2168 }); 2169 const carBytes = await repoManager.getRepoCar(); 2170 2171 // Set up sync state 2172 syncStorage.upsertState({ 2173 did: remoteDid, 2174 pdsEndpoint: "https://source-pds.example.com", 2175 }); 2176 2177 // Register a peer endpoint 2178 syncStorage.upsertPeerEndpoint( 2179 remoteDid, 2180 "did:plc:peer1", 2181 "https://peer1.example.com", 2182 "rev1", 2183 ); 2184 2185 // Mock RepoFetcher: source fails, peer succeeds 2186 const mockDidResolver = { 2187 resolve: async (did: string) => ({ 2188 id: did, 2189 service: [ 2190 { 2191 id: "#atproto_pds", 2192 type: "AtprotoPersonalDataServer", 2193 serviceEndpoint: "https://source-pds.example.com", 2194 }, 2195 ], 2196 }), 2197 }; 2198 const { RepoFetcher: RF } = await import("./repo-fetcher.js"); 2199 const fetcher = new RF(mockDidResolver as any); 2200 const originalFetchRepo = fetcher.fetchRepo.bind(fetcher); 2201 2202 let callCount = 0; 2203 fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => { 2204 callCount++; 2205 if (endpoint === "https://source-pds.example.com") { 2206 throw new Error("Source PDS unreachable"); 2207 } 2208 if (endpoint === "https://peer1.example.com") { 2209 return carBytes; 2210 } 2211 return originalFetchRepo(endpoint, did, since); 2212 }; 2213 2214 // Access private method via prototype 2215 const { ReplicationManager: RM } = await import("./replication-manager.js"); 2216 const manager = new RM( 2217 db, 2218 testConfig(join(tmpDir, "data"), [remoteDid]), 2219 repoManager, 2220 ipfsService, 2221 ipfsService, 2222 mockDidResolver as any, 2223 ); 2224 2225 // Replace the internal repoFetcher with our mock 2226 (manager as any).repoFetcher = fetcher; 2227 (manager as any).syncStorage = syncStorage; 2228 2229 // Should succeed via peer fallback 2230 await manager.syncDid(remoteDid); 2231 2232 // Source PDS was tried (at least once), and peer was used 2233 expect(callCount).toBeGreaterThanOrEqual(2); 2234 2235 // Verify sync state was updated 2236 const state = syncStorage.getState(remoteDid); 2237 expect(state).not.toBeNull(); 2238 expect(state!.status).toBe("synced"); 2239 }); 2240 2241 it("source PDS fails + all peers fail → throws original error", async () => { 2242 syncStorage.upsertState({ 2243 did: remoteDid, 2244 pdsEndpoint: "https://source-pds.example.com", 2245 }); 2246 2247 // Register a peer that will also fail 2248 syncStorage.upsertPeerEndpoint( 2249 remoteDid, 2250 "did:plc:peer1", 2251 "https://peer1.example.com", 2252 null, 2253 ); 2254 2255 const mockDidResolver = { 2256 resolve: async (did: string) => ({ 2257 id: did, 2258 service: [ 2259 { 2260 id: "#atproto_pds", 2261 type: "AtprotoPersonalDataServer", 2262 serviceEndpoint: "https://source-pds.example.com", 2263 }, 2264 ], 2265 }), 2266 }; 2267 2268 const { RepoFetcher: RF } = await import("./repo-fetcher.js"); 2269 const fetcher = new RF(mockDidResolver as any); 2270 fetcher.fetchRepo = async () => { 2271 throw new Error("Connection refused"); 2272 }; 2273 2274 const { ReplicationManager: RM } = await import("./replication-manager.js"); 2275 const manager = new RM( 2276 db, 2277 testConfig(join(tmpDir, "data"), [remoteDid]), 2278 repoManager, 2279 ipfsService, 2280 ipfsService, 2281 mockDidResolver as any, 2282 ); 2283 (manager as any).repoFetcher = fetcher; 2284 (manager as any).syncStorage = syncStorage; 2285 2286 // Should throw the original error 2287 await expect(manager.syncDid(remoteDid)).rejects.toThrow("Connection refused"); 2288 }); 2289 2290 it("source PDS fails without since → tries peers without since", async () => { 2291 // Create a real CAR 2292 await repoManager.createRecord("app.bsky.feed.post", undefined, { 2293 $type: "app.bsky.feed.post", 2294 text: "no-since test", 2295 createdAt: new Date().toISOString(), 2296 }); 2297 const carBytes = await repoManager.getRepoCar(); 2298 2299 // Set up sync state WITHOUT a previous rev (no since) 2300 syncStorage.upsertState({ 2301 did: remoteDid, 2302 pdsEndpoint: "https://source-pds.example.com", 2303 }); 2304 2305 syncStorage.upsertPeerEndpoint( 2306 remoteDid, 2307 "did:plc:peer1", 2308 "https://peer1.example.com", 2309 null, 2310 ); 2311 2312 const mockDidResolver = { 2313 resolve: async (did: string) => ({ 2314 id: did, 2315 service: [ 2316 { 2317 id: "#atproto_pds", 2318 type: "AtprotoPersonalDataServer", 2319 serviceEndpoint: "https://source-pds.example.com", 2320 }, 2321 ], 2322 }), 2323 }; 2324 2325 const { RepoFetcher: RF } = await import("./repo-fetcher.js"); 2326 const fetcher = new RF(mockDidResolver as any); 2327 2328 let peerSincePassed: string | undefined = "NOT_CALLED"; 2329 fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => { 2330 if (endpoint === "https://source-pds.example.com") { 2331 throw new Error("Source PDS down"); 2332 } 2333 if (endpoint === "https://peer1.example.com") { 2334 peerSincePassed = since; 2335 return carBytes; 2336 } 2337 throw new Error("Unknown endpoint"); 2338 }; 2339 2340 const { ReplicationManager: RM } = await import("./replication-manager.js"); 2341 const manager = new RM( 2342 db, 2343 testConfig(join(tmpDir, "data"), [remoteDid]), 2344 repoManager, 2345 ipfsService, 2346 ipfsService, 2347 mockDidResolver as any, 2348 ); 2349 (manager as any).repoFetcher = fetcher; 2350 (manager as any).syncStorage = syncStorage; 2351 2352 await manager.syncDid(remoteDid); 2353 2354 // Peer should have been called without `since` (undefined) 2355 expect(peerSincePassed).toBeUndefined(); 2356 }); 2357});