import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import Database from "better-sqlite3"; import { IpfsService } from "../ipfs.js"; import { RepoManager } from "../repo-manager.js"; import type { Config } from "../config.js"; import { BlockMap, readCarWithRoot } from "@atproto/repo"; import { CID } from "@atproto/lex-data"; import { create as createCid, CODEC_RAW, toString as cidToString, } from "@atcute/cid"; import { SyncStorage } from "./sync-storage.js"; import { didToRkey, rkeyToDid, PEER_NSID, MANIFEST_NSID, DEFAULT_VERIFICATION_CONFIG, } from "./types.js"; import { BlockVerifier, RemoteVerifier } from "./verification.js"; import { RepoFetcher, extractPdsEndpoint } from "./repo-fetcher.js"; import { PeerDiscovery } from "./peer-discovery.js"; import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js"; import { ReplicatedRepoReader } from "./replicated-repo-reader.js"; import { decode as cborDecode } from "../cbor-compat.js"; import { Firehose } from "../firehose.js"; import { createApp } from "../index.js"; /** Create a CID string from raw bytes using SHA-256. */ async function makeCidStr(bytes: Uint8Array): Promise { const cid = await createCid(CODEC_RAW, bytes); return cidToString(cid); } function testConfig(dataDir: string, replicateDids: string[] = []): Config { return { DID: "did:plc:test123", HANDLE: "test.example.com", PDS_HOSTNAME: "test.example.com", AUTH_TOKEN: "test-auth-token", SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 3000, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: replicateDids, FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } // ============================================ // didToRkey / rkeyToDid // ============================================ describe("didToRkey", () => { it("replaces colons with hyphens", () => { expect(didToRkey("did:plc:abc123")).toBe("did-plc-abc123"); }); it("handles did:web", () => { expect(didToRkey("did:web:example.com")).toBe("did-web-example.com"); }); it("roundtrips with rkeyToDid for simple DIDs", () => { const did = "did:plc:abc123"; const rkey = didToRkey(did); // rkeyToDid replaces ALL hyphens, so roundtrip only works for DIDs without hyphens expect(rkeyToDid(rkey)).toBe(did); }); }); // ============================================ // SyncStorage // ============================================ describe("SyncStorage", () => { let tmpDir: string; let db: InstanceType; let storage: SyncStorage; beforeEach(() => { tmpDir = mkdtempSync(join(tmpdir(), "sync-storage-test-")); db = new Database(join(tmpDir, "test.db")); storage = new SyncStorage(db); storage.initSchema(); }); afterEach(() => { db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("creates table and retrieves empty states", () => { const states = storage.getAllStates(); expect(states).toEqual([]); }); it("upserts and retrieves state", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); const state = storage.getState("did:plc:test1"); expect(state).not.toBeNull(); expect(state!.did).toBe("did:plc:test1"); expect(state!.pdsEndpoint).toBe("https://pds.example.com"); expect(state!.status).toBe("pending"); expect(state!.peerId).toBeNull(); }); it("updates status", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updateStatus("did:plc:test1", "syncing"); expect(storage.getState("did:plc:test1")!.status).toBe("syncing"); }); it("updates status with error message", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updateStatus("did:plc:test1", "error", "Connection refused"); const state = storage.getState("did:plc:test1")!; expect(state.status).toBe("error"); expect(state.errorMessage).toBe("Connection refused"); }); it("updates sync progress", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updateSyncProgress("did:plc:test1", "rev123"); const state = storage.getState("did:plc:test1")!; expect(state.lastSyncRev).toBe("rev123"); expect(state.lastSyncAt).not.toBeNull(); expect(state.status).toBe("synced"); }); it("updates and clears peer info", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updatePeerInfo("did:plc:test1", "12D3KooWTest"); let state = storage.getState("did:plc:test1")!; expect(state.peerId).toBe("12D3KooWTest"); expect(state.peerInfoFetchedAt).not.toBeNull(); storage.clearPeerInfo("did:plc:test1"); state = storage.getState("did:plc:test1")!; expect(state.peerId).toBeNull(); expect(state.peerInfoFetchedAt).toBeNull(); }); it("stores and retrieves peer multiaddrs", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); const multiaddrs = [ "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest", "/ip4/192.168.1.1/tcp/4001/p2p/12D3KooWTest", ]; storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", multiaddrs); const state = storage.getState("did:plc:test1")!; expect(state.peerId).toBe("12D3KooWTest"); expect(state.peerMultiaddrs).toEqual(multiaddrs); }); it("clearPeerInfo also clears multiaddrs", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", ["/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"]); storage.clearPeerInfo("did:plc:test1"); const state = storage.getState("did:plc:test1")!; expect(state.peerId).toBeNull(); expect(state.peerMultiaddrs).toEqual([]); }); it("getMultiaddrForPdsEndpoint returns multiaddr with /p2p/", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [ "/ip4/127.0.0.1/tcp/4001", "/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest", ]); const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); expect(ma).toBe("/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWTest"); }); it("getMultiaddrForPdsEndpoint returns null for unknown endpoint", () => { const ma = storage.getMultiaddrForPdsEndpoint("https://unknown.example.com"); expect(ma).toBeNull(); }); it("getMultiaddrForPdsEndpoint returns null when no multiaddrs stored", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); expect(ma).toBeNull(); }); it("getMultiaddrForPdsEndpoint falls back to first addr when none have /p2p/", () => { storage.upsertState({ did: "did:plc:test1", pdsEndpoint: "https://pds.example.com", }); storage.updatePeerInfo("did:plc:test1", "12D3KooWTest", [ "/ip4/127.0.0.1/tcp/4001", ]); const ma = storage.getMultiaddrForPdsEndpoint("https://pds.example.com"); expect(ma).toBe("/ip4/127.0.0.1/tcp/4001"); }); it("getAllStates returns all entries sorted by DID", () => { storage.upsertState({ did: "did:plc:bbb", pdsEndpoint: "https://b.example.com", }); storage.upsertState({ did: "did:plc:aaa", pdsEndpoint: "https://a.example.com", }); const states = storage.getAllStates(); expect(states).toHaveLength(2); expect(states[0]!.did).toBe("did:plc:aaa"); expect(states[1]!.did).toBe("did:plc:bbb"); }); }); // ============================================ // Record Publishing (peer identity + manifests) // ============================================ describe("Record publishing", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; let repoManager: RepoManager; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "repl-publish-test-")); const config = testConfig(tmpDir, ["did:plc:remote1", "did:plc:remote2"]); db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); repoManager = new RepoManager(db, config); repoManager.init(undefined, ipfsService, ipfsService); }); afterEach(async () => { if (ipfsService.isRunning()) { await ipfsService.stop(); } db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("peer identity record not created when networking is off", async () => { // getPeerId() returns null when networking=false expect(ipfsService.getPeerId()).toBeNull(); // Simulate peer identity publish: no record created because getPeerId() is null const peerId = ipfsService.getPeerId(); if (peerId) { await repoManager.putRecord(PEER_NSID, "self", { $type: PEER_NSID, peerId, multiaddrs: [], createdAt: new Date().toISOString(), }); } // No record should have been created const record = await repoManager.getRecord(PEER_NSID, "self"); expect(record).toBeNull(); }); it("syncManifests creates manifest records for each configured DID", async () => { const dids = ["did:plc:remote1", "did:plc:remote2"]; for (const did of dids) { const rkey = didToRkey(did); await repoManager.putRecord(MANIFEST_NSID, rkey, { $type: MANIFEST_NSID, subject: did, status: "active", lastSyncRev: null, lastSyncAt: null, createdAt: new Date().toISOString(), }); } // Both should be readable for (const did of dids) { const rkey = didToRkey(did); const result = await repoManager.getRecord(MANIFEST_NSID, rkey); expect(result).not.toBeNull(); expect((result!.record as Record).subject).toBe(did); expect((result!.record as Record).status).toBe("active"); } }); it("syncManifests is idempotent", async () => { const did = "did:plc:remote1"; const rkey = didToRkey(did); const manifest = { $type: MANIFEST_NSID, subject: did, status: "active", lastSyncRev: null, lastSyncAt: null, createdAt: new Date().toISOString(), }; // Write twice await repoManager.putRecord(MANIFEST_NSID, rkey, manifest); await repoManager.putRecord(MANIFEST_NSID, rkey, manifest); // Should still be readable, no error const result = await repoManager.getRecord(MANIFEST_NSID, rkey); expect(result).not.toBeNull(); }); }); // ============================================ // RepoFetcher // ============================================ describe("RepoFetcher", () => { it("resolvePds with mock DidResolver returning test DID document", async () => { const mockDidResolver = { resolve: async (did: string) => { if (did === "did:plc:test1") { return { id: "did:plc:test1", service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: "https://pds.example.com", }, ], }; } return null; }, }; const fetcher = new RepoFetcher(mockDidResolver as any); const pds = await fetcher.resolvePds("did:plc:test1"); expect(pds).toBe("https://pds.example.com"); }); it("returns null for unresolvable DID", async () => { const mockDidResolver = { resolve: async () => null, }; const fetcher = new RepoFetcher(mockDidResolver as any); const pds = await fetcher.resolvePds("did:plc:unknown"); expect(pds).toBeNull(); }); }); describe("extractPdsEndpoint", () => { it("extracts service endpoint from DID document", () => { const doc = { id: "did:plc:test1", service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: "https://pds.example.com", }, ], }; expect(extractPdsEndpoint(doc as any)).toBe("https://pds.example.com"); }); it("returns null when no service array", () => { const doc = { id: "did:plc:test1" }; expect(extractPdsEndpoint(doc as any)).toBeNull(); }); it("returns null when no matching service", () => { const doc = { id: "did:plc:test1", service: [ { id: "#other", type: "Other", serviceEndpoint: "https://other.example.com", }, ], }; expect(extractPdsEndpoint(doc as any)).toBeNull(); }); }); // ============================================ // PeerDiscovery // ============================================ describe("PeerDiscovery", () => { it("returns null for unresolvable DID", async () => { const mockFetcher = { resolvePds: async () => null, fetchRecord: async () => null, listRecords: async () => [], }; const discovery = new PeerDiscovery(mockFetcher as any); const result = await discovery.discoverPeer("did:plc:unknown"); expect(result).toBeNull(); }); it("returns pdsEndpoint with null peerId when no peer record", async () => { const mockFetcher = { resolvePds: async () => "https://pds.example.com", fetchRecord: async () => null, listRecords: async () => [], }; const discovery = new PeerDiscovery(mockFetcher as any); const result = await discovery.discoverPeer("did:plc:test1"); expect(result).not.toBeNull(); expect(result!.pdsEndpoint).toBe("https://pds.example.com"); expect(result!.peerId).toBeNull(); expect(result!.multiaddrs).toEqual([]); }); it("returns peer info when record exists", async () => { const mockFetcher = { resolvePds: async () => "https://pds.example.com", fetchRecord: async () => ({ $type: PEER_NSID, peerId: "12D3KooWTest", multiaddrs: ["/ip4/127.0.0.1/tcp/4001"], createdAt: new Date().toISOString(), }), listRecords: async () => [], }; const discovery = new PeerDiscovery(mockFetcher as any); const result = await discovery.discoverPeer("did:plc:test1"); expect(result).not.toBeNull(); expect(result!.peerId).toBe("12D3KooWTest"); expect(result!.multiaddrs).toEqual(["/ip4/127.0.0.1/tcp/4001"]); }); }); // ============================================ // BlockVerifier // ============================================ describe("BlockVerifier", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "verifier-test-")); db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); }); afterEach(async () => { if (ipfsService.isRunning()) { await ipfsService.stop(); } db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("all blocks available reports 100%", async () => { const verifier = new BlockVerifier(ipfsService); const cids: string[] = []; for (let i = 0; i < 3; i++) { const bytes = new TextEncoder().encode(`block-${i}`); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); cids.push(cidStr); } const result = await verifier.verifyBlockAvailability(cids, 10); expect(result.checked).toBe(3); expect(result.available).toBe(3); expect(result.missing).toEqual([]); }); it("some blocks missing reports correctly", async () => { const verifier = new BlockVerifier(ipfsService); const bytes1 = new TextEncoder().encode("present"); const cid1 = await makeCidStr(bytes1); await ipfsService.putBlock(cid1, bytes1); const bytes2 = new TextEncoder().encode("missing"); const cid2 = await makeCidStr(bytes2); // Don't store cid2 const result = await verifier.verifyBlockAvailability( [cid1, cid2], 10, ); expect(result.checked).toBe(2); expect(result.available).toBe(1); expect(result.missing).toContain(cid2); }); it("sample size > array length checks all", async () => { const verifier = new BlockVerifier(ipfsService); const bytes = new TextEncoder().encode("single"); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); const result = await verifier.verifyBlockAvailability([cidStr], 100); expect(result.checked).toBe(1); expect(result.available).toBe(1); }); it("empty array returns zeros", async () => { const verifier = new BlockVerifier(ipfsService); const result = await verifier.verifyBlockAvailability([]); expect(result.checked).toBe(0); expect(result.available).toBe(0); expect(result.missing).toEqual([]); }); }); // ============================================ // Integration: repo sync (two in-process repos) // ============================================ describe("Integration: repo sync via CAR roundtrip", () => { let tmpDir: string; let sourceDb: InstanceType; let replicaDb: InstanceType; let sourceIpfs: IpfsService; let replicaIpfs: IpfsService; let sourceRepo: RepoManager; let replicaRepo: RepoManager; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "repl-integration-test-")); // Source setup const sourceConfig = testConfig(join(tmpDir, "source"), []); sourceDb = new Database(join(tmpDir, "source.db")); sourceIpfs = new IpfsService({ db: sourceDb, networking: false, }); await sourceIpfs.start(); sourceRepo = new RepoManager(sourceDb, sourceConfig); sourceRepo.init(undefined, sourceIpfs, sourceIpfs); // Replica setup (different DID for the local node, but will replicate source's data) const replicaConfig = testConfig(join(tmpDir, "replica"), [ sourceConfig.DID!, ]); replicaConfig.DID = "did:plc:replica456"; replicaConfig.SIGNING_KEY = "0000000000000000000000000000000000000000000000000000000000000002"; replicaDb = new Database(join(tmpDir, "replica.db")); replicaIpfs = new IpfsService({ db: replicaDb, networking: false, }); await replicaIpfs.start(); replicaRepo = new RepoManager(replicaDb, replicaConfig); replicaRepo.init(undefined, replicaIpfs, replicaIpfs); }); afterEach(async () => { if (sourceIpfs.isRunning()) await sourceIpfs.stop(); if (replicaIpfs.isRunning()) await replicaIpfs.stop(); sourceDb.close(); replicaDb.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("source records can be replicated via CAR export/import to IPFS", async () => { // 1. Create records in source await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Hello from source!", createdAt: new Date().toISOString(), }); await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Second post", createdAt: new Date().toISOString(), }); // 2. Export as CAR const carBytes = await sourceRepo.getRepoCar(); expect(carBytes.length).toBeGreaterThan(0); // 3. Parse CAR on replica side const { root, blocks } = await readCarWithRoot(carBytes); expect(root).toBeDefined(); // 4. Store blocks in replica's IPFS await replicaIpfs.putBlocks(blocks); // 5. Verify blocks are retrievable from replica's IPFS const internalMap = ( blocks as unknown as { map: Map } ).map; expect(internalMap).toBeDefined(); expect(internalMap.size).toBeGreaterThan(0); for (const cidStr of internalMap.keys()) { const has = await replicaIpfs.hasBlock(cidStr); expect(has).toBe(true); } // 6. Verify block availability const verifier = new BlockVerifier(replicaIpfs); const cidStrs = Array.from(internalMap.keys()); const verification = await verifier.verifyBlockAvailability(cidStrs); expect(verification.available).toBe(verification.checked); expect(verification.missing).toEqual([]); }); it("tracks block CIDs for verification", async () => { // 1. Create records in source await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "tracking test", createdAt: new Date().toISOString(), }); // 2. Export as CAR, parse, store blocks const carBytes = await sourceRepo.getRepoCar(); const { blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); // 3. Collect CID strings const internalMap = ( blocks as unknown as { map: Map } ).map; const cidStrs = Array.from(internalMap.keys()); // 4. Track blocks in sync storage const syncStorage = new SyncStorage(replicaDb); syncStorage.initSchema(); syncStorage.trackBlocks("did:plc:test123", cidStrs); // 5. Verify tracking expect(syncStorage.getBlockCount("did:plc:test123")).toBe( cidStrs.length, ); const stored = syncStorage.getBlockCids("did:plc:test123"); expect(stored.sort()).toEqual(cidStrs.sort()); // 6. Tracking is idempotent syncStorage.trackBlocks("did:plc:test123", cidStrs); expect(syncStorage.getBlockCount("did:plc:test123")).toBe( cidStrs.length, ); // 7. Clear blocks syncStorage.clearBlocks("did:plc:test123"); expect(syncStorage.getBlockCount("did:plc:test123")).toBe(0); }); it("manifest record updated with sync rev after replication", async () => { // Create a manifest record in replica const sourceDid = "did:plc:test123"; const rkey = didToRkey(sourceDid); await replicaRepo.putRecord(MANIFEST_NSID, rkey, { $type: MANIFEST_NSID, subject: sourceDid, status: "active", lastSyncRev: null, lastSyncAt: null, createdAt: new Date().toISOString(), }); // Simulate sync: create records in source, export, import to IPFS await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "test post", createdAt: new Date().toISOString(), }); const carBytes = await sourceRepo.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); // Update manifest with sync rev const syncRev = root.toString(); await replicaRepo.putRecord(MANIFEST_NSID, rkey, { $type: MANIFEST_NSID, subject: sourceDid, status: "active", lastSyncRev: syncRev, lastSyncAt: new Date().toISOString(), createdAt: new Date().toISOString(), }); // Verify manifest was updated const result = await replicaRepo.getRecord(MANIFEST_NSID, rkey); expect(result).not.toBeNull(); const record = result!.record as Record; expect(record.lastSyncRev).toBe(syncRev); expect(record.lastSyncAt).not.toBeNull(); expect(record.status).toBe("active"); }); }); // ============================================ // SyncStorage: Block Tracking // ============================================ describe("SyncStorage block tracking", () => { let tmpDir: string; let db: InstanceType; let storage: SyncStorage; beforeEach(() => { tmpDir = mkdtempSync(join(tmpdir(), "block-tracking-test-")); db = new Database(join(tmpDir, "test.db")); storage = new SyncStorage(db); storage.initSchema(); }); afterEach(() => { db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("tracks blocks for a DID", () => { storage.trackBlocks("did:plc:a", ["cid1", "cid2", "cid3"]); expect(storage.getBlockCount("did:plc:a")).toBe(3); expect(storage.getBlockCids("did:plc:a").sort()).toEqual([ "cid1", "cid2", "cid3", ]); }); it("ignores duplicate CIDs", () => { storage.trackBlocks("did:plc:a", ["cid1", "cid2"]); storage.trackBlocks("did:plc:a", ["cid2", "cid3"]); expect(storage.getBlockCount("did:plc:a")).toBe(3); }); it("tracks blocks per-DID independently", () => { storage.trackBlocks("did:plc:a", ["cid1", "cid2"]); storage.trackBlocks("did:plc:b", ["cid2", "cid3"]); expect(storage.getBlockCount("did:plc:a")).toBe(2); expect(storage.getBlockCount("did:plc:b")).toBe(2); }); it("clears blocks for a specific DID", () => { storage.trackBlocks("did:plc:a", ["cid1"]); storage.trackBlocks("did:plc:b", ["cid2"]); storage.clearBlocks("did:plc:a"); expect(storage.getBlockCount("did:plc:a")).toBe(0); expect(storage.getBlockCount("did:plc:b")).toBe(1); }); it("returns 0 count for unknown DID", () => { expect(storage.getBlockCount("did:plc:unknown")).toBe(0); }); it("returns empty array for unknown DID", () => { expect(storage.getBlockCids("did:plc:unknown")).toEqual([]); }); it("handles empty CID array", () => { storage.trackBlocks("did:plc:a", []); expect(storage.getBlockCount("did:plc:a")).toBe(0); }); }); // ============================================ // RemoteVerifier // ============================================ describe("RemoteVerifier", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "remote-verifier-test-")); db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); }); afterEach(async () => { if (ipfsService.isRunning()) { await ipfsService.stop(); } db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("Layer 0: passes when local root matches remote getHead", async () => { const bytes = new TextEncoder().encode("root-block"); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); const mockFetch = async (url: string | URL | Request) => { const urlStr = typeof url === "string" ? url : url.toString(); if (urlStr.includes("getHead")) { return new Response(JSON.stringify({ root: cidStr }), { status: 200, headers: { "Content-Type": "application/json" }, }); } return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, { raslSampleSize: 10 }, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", cidStr, [], ); const layer0 = result.layers.find((l) => l.layer === 0); expect(layer0).toBeDefined(); expect(layer0!.passed).toBe(true); expect(layer0!.available).toBe(1); }); it("Layer 0: fails when commit root returns 404", async () => { const bytes = new TextEncoder().encode("missing-root"); const cidStr = await makeCidStr(bytes); const mockFetch = async () => { return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, {}, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", cidStr, [], ); const layer0 = result.layers.find((l) => l.layer === 0); expect(layer0!.passed).toBe(false); expect(layer0!.missing).toContain(cidStr); }); it("Layer 0: fails on network error", async () => { const bytes = new TextEncoder().encode("error-root"); const cidStr = await makeCidStr(bytes); const mockFetch = async () => { throw new Error("Connection refused"); }; const verifier = new RemoteVerifier( ipfsService, {}, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", cidStr, [], ); const layer0 = result.layers.find((l) => l.layer === 0); expect(layer0!.passed).toBe(false); expect(layer0!.error).toBe("Connection refused"); }); it("Layer 1: passes when all sampled blocks match local", async () => { const blocks: Array<{ cid: string; bytes: Uint8Array }> = []; for (let i = 0; i < 5; i++) { const bytes = new TextEncoder().encode(`rasl-block-${i}`); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); blocks.push({ cid: cidStr, bytes }); } const mockFetch = async (url: string | URL | Request) => { const urlStr = typeof url === "string" ? url : url.toString(); const block = blocks.find((b) => urlStr.includes(b.cid)); if (block) { return new Response(block.bytes, { status: 200 }); } return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, { raslSampleSize: 100 }, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", null, blocks.map((b) => b.cid), ); const layer1 = result.layers.find((l) => l.layer === 1); expect(layer1).toBeDefined(); expect(layer1!.passed).toBe(true); expect(layer1!.checked).toBe(5); expect(layer1!.available).toBe(5); }); it("Layer 1: fails when some blocks missing locally", async () => { const presentBytes = new TextEncoder().encode("present-block"); const presentCid = await makeCidStr(presentBytes); await ipfsService.putBlock(presentCid, presentBytes); const missingBytes = new TextEncoder().encode("missing-block"); const missingCid = await makeCidStr(missingBytes); // Don't store missingCid — it's tracked but not in blockstore const mockFetch = async () => new Response(null, { status: 404 }); const verifier = new RemoteVerifier( ipfsService, { raslSampleSize: 100 }, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", null, [presentCid, missingCid], ); const layer1 = result.layers.find((l) => l.layer === 1); expect(layer1!.passed).toBe(false); expect(layer1!.available).toBe(1); expect(layer1!.missing).toContain(missingCid); }); it("combined: overallPassed is true when all layers pass", async () => { const bytes = new TextEncoder().encode("all-pass"); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); const mockFetch = async (url: string | URL | Request) => { const urlStr = typeof url === "string" ? url : url.toString(); if (urlStr.includes("getHead")) { return new Response(JSON.stringify({ root: cidStr }), { status: 200, headers: { "Content-Type": "application/json" }, }); } return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, { raslSampleSize: 100 }, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", cidStr, [cidStr], ); expect(result.overallPassed).toBe(true); expect(result.did).toBe("did:plc:test"); expect(result.pdsEndpoint).toBe("https://pds.example.com"); expect(result.layers.length).toBe(4); // 0, 1, 2 (stub), 3 (stub) }); it("combined: overallPassed is false when any layer fails", async () => { const bytes = new TextEncoder().encode("fail-test"); const cidStr = await makeCidStr(bytes); const mockFetch = async () => { return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, {}, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", cidStr, [cidStr], ); expect(result.overallPassed).toBe(false); }); it("Layer 0: fails gracefully when rootCid is null", async () => { const mockFetch = async () => { return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, {}, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", null, [], ); const layer0 = result.layers.find((l) => l.layer === 0); expect(layer0).toBeDefined(); expect(layer0!.passed).toBe(false); expect(layer0!.error).toContain("no local root CID"); }); it("skips Layer 1 when blockCids is empty", async () => { const mockFetch = async () => { return new Response(null, { status: 200 }); }; const verifier = new RemoteVerifier( ipfsService, {}, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", null, [], ); expect(result.layers.find((l) => l.layer === 1)).toBeUndefined(); }); it("Layer 2 and 3 are stubs", async () => { const mockFetch = async () => { return new Response(null, { status: 200 }); }; const verifier = new RemoteVerifier( ipfsService, {}, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", null, [], ); const layer2 = result.layers.find((l) => l.layer === 2); expect(layer2).toBeDefined(); expect(layer2!.checked).toBe(0); expect(layer2!.error).toContain("not implemented"); const layer3 = result.layers.find((l) => l.layer === 3); expect(layer3).toBeDefined(); expect(layer3!.checked).toBe(0); expect(layer3!.error).toContain("not implemented"); }); it("respects raslSampleSize config", async () => { const blocks: Array<{ cid: string; bytes: Uint8Array }> = []; for (let i = 0; i < 20; i++) { const bytes = new TextEncoder().encode(`sample-block-${i}`); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); blocks.push({ cid: cidStr, bytes }); } const mockFetch = async (url: string | URL | Request) => { const urlStr = typeof url === "string" ? url : url.toString(); const block = blocks.find((b) => urlStr.includes(b.cid)); if (block) { return new Response(block.bytes, { status: 200 }); } return new Response(null, { status: 404 }); }; const verifier = new RemoteVerifier( ipfsService, { raslSampleSize: 5 }, mockFetch as unknown as typeof fetch, ); const result = await verifier.verifyPeer( "did:plc:test", "https://pds.example.com", null, blocks.map((b) => b.cid), ); const layer1 = result.layers.find((l) => l.layer === 1); expect(layer1!.checked).toBe(5); }); }); // ============================================ // IpfsReadableBlockstore // ============================================ describe("IpfsReadableBlockstore", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; let readableBlockstore: IpfsReadableBlockstore; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "ipfs-readable-bs-test-")); db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); readableBlockstore = new IpfsReadableBlockstore(ipfsService); }); afterEach(async () => { if (ipfsService.isRunning()) await ipfsService.stop(); db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("getBytes roundtrip", async () => { const bytes = new TextEncoder().encode("readable-test"); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); const { CID: MfCID } = await import("multiformats"); const cid = MfCID.parse(cidStr); const result = await readableBlockstore.getBytes(cid); expect(result).not.toBeNull(); expect(Buffer.from(result!)).toEqual(Buffer.from(bytes)); }); it("has returns true/false correctly", async () => { const bytes = new TextEncoder().encode("has-test"); const cidStr = await makeCidStr(bytes); await ipfsService.putBlock(cidStr, bytes); const { CID: MfCID } = await import("multiformats"); const present = MfCID.parse(cidStr); expect(await readableBlockstore.has(present)).toBe(true); const missingBytes = new TextEncoder().encode("missing-test"); const missingCidStr = await makeCidStr(missingBytes); const missing = MfCID.parse(missingCidStr); expect(await readableBlockstore.has(missing)).toBe(false); }); it("getBlocks returns blocks + missing", async () => { const bytes1 = new TextEncoder().encode("block-a"); const cidStr1 = await makeCidStr(bytes1); await ipfsService.putBlock(cidStr1, bytes1); const bytes2 = new TextEncoder().encode("block-b-missing"); const cidStr2 = await makeCidStr(bytes2); const { CID: MfCID } = await import("multiformats"); const cid1 = MfCID.parse(cidStr1); const cid2 = MfCID.parse(cidStr2); const { blocks, missing } = await readableBlockstore.getBlocks([ cid1, cid2, ]); expect(blocks.has(cid1)).toBe(true); expect(missing).toHaveLength(1); expect(missing[0]!.toString()).toBe(cidStr2); }); }); // ============================================ // ReplicatedRepoReader // ============================================ describe("ReplicatedRepoReader", () => { let tmpDir: string; let sourceDb: InstanceType; let replicaDb: InstanceType; let sourceIpfs: IpfsService; let replicaIpfs: IpfsService; let sourceRepo: RepoManager; let syncStorage: SyncStorage; let reader: ReplicatedRepoReader; const sourceDid = "did:plc:test123"; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "replicated-reader-test-")); // Source setup const sourceConfig = testConfig(join(tmpDir, "source"), []); sourceDb = new Database(join(tmpDir, "source.db")); sourceIpfs = new IpfsService({ db: sourceDb, networking: false, }); await sourceIpfs.start(); sourceRepo = new RepoManager(sourceDb, sourceConfig); sourceRepo.init(undefined, sourceIpfs, sourceIpfs); // Replica IPFS + sync storage replicaDb = new Database(join(tmpDir, "replica.db")); replicaIpfs = new IpfsService({ db: replicaDb, networking: false, }); await replicaIpfs.start(); syncStorage = new SyncStorage(replicaDb); syncStorage.initSchema(); reader = new ReplicatedRepoReader(replicaIpfs, syncStorage); }); afterEach(async () => { if (sourceIpfs.isRunning()) await sourceIpfs.stop(); if (replicaIpfs.isRunning()) await replicaIpfs.stop(); sourceDb.close(); replicaDb.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); /** Helper: create records in source, export CAR, store blocks in replica IPFS, record root_cid. */ async function replicateSource(): Promise<{ rootCid: string; rev: string }> { const carBytes = await sourceRepo.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); const rootCidStr = root.toString(); // Extract rev from commit block const internalMap = ( blocks as unknown as { map: Map } ).map; let rev = rootCidStr; const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } // Record in sync storage syncStorage.upsertState({ did: sourceDid, pdsEndpoint: "https://pds.example.com", }); syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); return { rootCid: rootCidStr, rev }; } it("returns null for unknown DID", async () => { const result = await reader.getRecord( "did:plc:unknown", "app.bsky.feed.post", "abc", ); expect(result).toBeNull(); expect(reader.isReplicatedDid("did:plc:unknown")).toBe(false); }); it("getRecord returns record with correct CID and value", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Hello replicated!", createdAt: "2025-01-01T00:00:00.000Z", }); await replicateSource(); // List records from source to get the rkey const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", { limit: 10, }); const firstRecord = sourceRecords.records[0]!; const rkey = firstRecord.uri.split("/").pop()!; const result = await reader.getRecord( sourceDid, "app.bsky.feed.post", rkey, ); expect(result).not.toBeNull(); expect(result!.cid).toBeDefined(); expect(typeof result!.cid).toBe("string"); const value = result!.value as Record; expect(value.text).toBe("Hello replicated!"); }); it("listRecords returns records with cursor support", async () => { // Create multiple records for (let i = 0; i < 5; i++) { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: `Post ${i}`, createdAt: new Date().toISOString(), }); } await replicateSource(); // First page const page1 = await reader.listRecords(sourceDid, "app.bsky.feed.post", { limit: 3, }); expect(page1.records).toHaveLength(3); expect(page1.cursor).toBeDefined(); // Second page const page2 = await reader.listRecords(sourceDid, "app.bsky.feed.post", { limit: 3, cursor: page1.cursor, }); expect(page2.records).toHaveLength(2); // All records have correct URIs for (const rec of [...page1.records, ...page2.records]) { expect(rec.uri).toMatch( new RegExp(`^at://${sourceDid}/app\\.bsky\\.feed\\.post/`), ); expect(rec.cid).toBeDefined(); expect(rec.value).toBeDefined(); } }); it("describeRepo returns collections list", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "post", createdAt: new Date().toISOString(), }); await sourceRepo.putRecord("app.bsky.actor.profile", "self", { $type: "app.bsky.actor.profile", displayName: "Test", }); await replicateSource(); const result = await reader.describeRepo(sourceDid); expect(result).not.toBeNull(); expect(result!.did).toBe(sourceDid); expect(result!.collections).toContain("app.bsky.feed.post"); expect(result!.collections).toContain("app.bsky.actor.profile"); }); it("getRepoStatus returns rev and root CID", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "status test", createdAt: new Date().toISOString(), }); const { rootCid, rev } = await replicateSource(); const status = reader.getRepoStatus(sourceDid); expect(status).not.toBeNull(); expect(status!.did).toBe(sourceDid); expect(status!.rootCid).toBe(rootCid); expect(status!.rev).toBe(rev); expect(status!.active).toBe(true); }); it("cache invalidation works", async () => { // First sync await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "First post", createdAt: new Date().toISOString(), }); await replicateSource(); const result1 = await reader.listRecords( sourceDid, "app.bsky.feed.post", { limit: 100 }, ); expect(result1.records).toHaveLength(1); // Add another record and re-sync await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Second post", createdAt: new Date().toISOString(), }); reader.invalidateCache(sourceDid); await replicateSource(); const result2 = await reader.listRecords( sourceDid, "app.bsky.feed.post", { limit: 100 }, ); expect(result2.records).toHaveLength(2); }); }); // ============================================ // XRPC integration (replicated repos) // ============================================ describe("XRPC integration: replicated repos", () => { let tmpDir: string; let sourceDb: InstanceType; let replicaDb: InstanceType; let sourceIpfs: IpfsService; let replicaIpfs: IpfsService; let sourceRepo: RepoManager; let replicaRepo: RepoManager; let syncStorage: SyncStorage; let reader: ReplicatedRepoReader; let app: ReturnType; const sourceDid = "did:plc:test123"; const replicaDid = "did:plc:replica456"; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "xrpc-replicated-test-")); // Source setup const sourceConfig = testConfig(join(tmpDir, "source"), []); sourceDb = new Database(join(tmpDir, "source.db")); sourceIpfs = new IpfsService({ db: sourceDb, networking: false, }); await sourceIpfs.start(); sourceRepo = new RepoManager(sourceDb, sourceConfig); sourceRepo.init(undefined, sourceIpfs, sourceIpfs); // Replica setup const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); replicaConfig.DID = replicaDid; replicaConfig.SIGNING_KEY = "0000000000000000000000000000000000000000000000000000000000000002"; replicaDb = new Database(join(tmpDir, "replica.db")); replicaIpfs = new IpfsService({ db: replicaDb, networking: false, }); await replicaIpfs.start(); replicaRepo = new RepoManager(replicaDb, replicaConfig); replicaRepo.init(undefined, replicaIpfs, replicaIpfs); syncStorage = new SyncStorage(replicaDb); syncStorage.initSchema(); reader = new ReplicatedRepoReader(replicaIpfs, syncStorage); const firehose = new Firehose(replicaRepo); app = createApp( replicaConfig, firehose, replicaIpfs, replicaIpfs, undefined, undefined, reader, replicaRepo, ); }); afterEach(async () => { if (sourceIpfs.isRunning()) await sourceIpfs.stop(); if (replicaIpfs.isRunning()) await replicaIpfs.stop(); sourceDb.close(); replicaDb.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); async function replicateSource(): Promise { const carBytes = await sourceRepo.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); const rootCidStr = root.toString(); const internalMap = ( blocks as unknown as { map: Map } ).map; let rev = rootCidStr; const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } syncStorage.upsertState({ did: sourceDid, pdsEndpoint: "https://pds.example.com", }); syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); reader.invalidateCache(sourceDid); } it("GET getRecord for replicated DID → 200", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "XRPC replicated test", createdAt: "2025-01-01T00:00:00.000Z", }); await replicateSource(); const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", { limit: 10, }); const rkey = sourceRecords.records[0]!.uri.split("/").pop()!; const res = await app.request( `/xrpc/com.atproto.repo.getRecord?repo=${sourceDid}&collection=app.bsky.feed.post&rkey=${rkey}`, undefined, {}, ); expect(res.status).toBe(200); const json = (await res.json()) as { uri: string; cid: string; value: Record; }; expect(json.uri).toContain(sourceDid); expect(json.value.text).toBe("XRPC replicated test"); }); it("GET listRecords for replicated DID → 200", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "list test 1", createdAt: new Date().toISOString(), }); await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "list test 2", createdAt: new Date().toISOString(), }); await replicateSource(); const res = await app.request( `/xrpc/com.atproto.repo.listRecords?repo=${sourceDid}&collection=app.bsky.feed.post`, undefined, {}, ); expect(res.status).toBe(200); const json = (await res.json()) as { records: Array<{ uri: string; value: Record }>; }; expect(json.records.length).toBe(2); }); it("GET describeRepo for replicated DID → 200", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "describe test", createdAt: new Date().toISOString(), }); await replicateSource(); const res = await app.request( `/xrpc/com.atproto.repo.describeRepo?repo=${sourceDid}`, undefined, {}, ); expect(res.status).toBe(200); const json = (await res.json()) as { did: string; collections: string[]; }; expect(json.did).toBe(sourceDid); expect(json.collections).toContain("app.bsky.feed.post"); }); it("non-replicated foreign DID → 404", async () => { const res = await app.request( `/xrpc/com.atproto.repo.getRecord?repo=did:plc:nonexistent&collection=app.bsky.feed.post&rkey=abc`, undefined, {}, ); expect(res.status).toBe(404); }); it("local DID still works via existing path", async () => { await replicaRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "local post", createdAt: new Date().toISOString(), }); const records = await replicaRepo.listRecords("app.bsky.feed.post", { limit: 10, }); const rkey = records.records[0]!.uri.split("/").pop()!; const res = await app.request( `/xrpc/com.atproto.repo.getRecord?repo=${replicaDid}&collection=app.bsky.feed.post&rkey=${rkey}`, undefined, {}, ); expect(res.status).toBe(200); const json = (await res.json()) as { uri: string; value: Record; }; expect(json.value.text).toBe("local post"); }); }); // ============================================ // root_cid + rev extraction // ============================================ describe("root_cid + rev extraction", () => { let tmpDir: string; let sourceDb: InstanceType; let sourceIpfs: IpfsService; let sourceRepo: RepoManager; let replicaDb: InstanceType; let replicaIpfs: IpfsService; let syncStorage: SyncStorage; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "rev-extraction-test-")); const sourceConfig = testConfig(join(tmpDir, "source"), []); sourceDb = new Database(join(tmpDir, "source.db")); sourceIpfs = new IpfsService({ db: sourceDb, networking: false, }); await sourceIpfs.start(); sourceRepo = new RepoManager(sourceDb, sourceConfig); sourceRepo.init(undefined, sourceIpfs, sourceIpfs); replicaDb = new Database(join(tmpDir, "replica.db")); replicaIpfs = new IpfsService({ db: replicaDb, networking: false, }); await replicaIpfs.start(); syncStorage = new SyncStorage(replicaDb); syncStorage.initSchema(); }); afterEach(async () => { if (sourceIpfs.isRunning()) await sourceIpfs.stop(); if (replicaIpfs.isRunning()) await replicaIpfs.stop(); sourceDb.close(); replicaDb.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("after sync, replication_state has both root_cid and last_sync_rev (actual TID)", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "rev test", createdAt: new Date().toISOString(), }); const carBytes = await sourceRepo.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); const rootCidStr = root.toString(); const internalMap = ( blocks as unknown as { map: Map } ).map; let rev = rootCidStr; const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } const did = "did:plc:test123"; syncStorage.upsertState({ did, pdsEndpoint: "https://pds.example.com", }); syncStorage.updateSyncProgress(did, rev, rootCidStr); const state = syncStorage.getState(did); expect(state).not.toBeNull(); expect(state!.rootCid).toBe(rootCidStr); expect(state!.lastSyncRev).toBe(rev); // Rev should be a TID (not a CID) expect(state!.lastSyncRev).not.toBe(rootCidStr); }); it("root_cid is a valid CID string and last_sync_rev is a TID", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "cid validation test", createdAt: new Date().toISOString(), }); const carBytes = await sourceRepo.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); const rootCidStr = root.toString(); const internalMap = ( blocks as unknown as { map: Map } ).map; let rev = rootCidStr; const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } // root_cid should be parseable as a CID const { CID: MfCID } = await import("multiformats"); expect(() => MfCID.parse(rootCidStr)).not.toThrow(); // rev should be a TID (base32-sortable, 13 chars) expect(rev).toMatch(/^[a-z2-7]{13}$/); }); }); // ============================================ // SyncStorage: Peer Endpoint Tracking // ============================================ describe("SyncStorage peer endpoint tracking", () => { let tmpDir: string; let db: InstanceType; let storage: SyncStorage; beforeEach(() => { tmpDir = mkdtempSync(join(tmpdir(), "peer-endpoint-test-")); db = new Database(join(tmpDir, "test.db")); storage = new SyncStorage(db); storage.initSchema(); }); afterEach(() => { db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("upsertPeerEndpoint + getPeerEndpoints returns entries", () => { storage.upsertPeerEndpoint( "did:plc:target1", "did:plc:peer1", "https://peer1.example.com", "rev123", ); const peers = storage.getPeerEndpoints("did:plc:target1"); expect(peers).toHaveLength(1); expect(peers[0]!.peerDid).toBe("did:plc:peer1"); expect(peers[0]!.pdsEndpoint).toBe("https://peer1.example.com"); expect(peers[0]!.lastSyncRev).toBe("rev123"); }); it("duplicate peer_did+target_did upsert updates existing", () => { storage.upsertPeerEndpoint( "did:plc:target1", "did:plc:peer1", "https://old.example.com", "rev1", ); storage.upsertPeerEndpoint( "did:plc:target1", "did:plc:peer1", "https://new.example.com", "rev2", ); const peers = storage.getPeerEndpoints("did:plc:target1"); expect(peers).toHaveLength(1); expect(peers[0]!.pdsEndpoint).toBe("https://new.example.com"); expect(peers[0]!.lastSyncRev).toBe("rev2"); }); it("getPeerEndpoints returns empty for unknown DID", () => { const peers = storage.getPeerEndpoints("did:plc:unknown"); expect(peers).toEqual([]); }); it("clearPeerEndpoints removes all entries for a DID", () => { storage.upsertPeerEndpoint( "did:plc:target1", "did:plc:peer1", "https://peer1.example.com", null, ); storage.upsertPeerEndpoint( "did:plc:target1", "did:plc:peer2", "https://peer2.example.com", null, ); storage.upsertPeerEndpoint( "did:plc:target2", "did:plc:peer1", "https://peer1.example.com", null, ); storage.clearPeerEndpoints("did:plc:target1"); expect(storage.getPeerEndpoints("did:plc:target1")).toEqual([]); // Other target DID's entries should remain expect(storage.getPeerEndpoints("did:plc:target2")).toHaveLength(1); }); }); // ============================================ // XRPC: getRepo / getBlocks for replicated DIDs // ============================================ describe("XRPC: getRepo + getBlocks for replicated DIDs", () => { let tmpDir: string; let sourceDb: InstanceType; let replicaDb: InstanceType; let sourceIpfs: IpfsService; let replicaIpfs: IpfsService; let sourceRepo: RepoManager; let replicaRepo: RepoManager; let syncStorage: SyncStorage; let app: ReturnType; const sourceDid = "did:plc:test123"; const replicaDid = "did:plc:replica456"; let trackedCids: string[] = []; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "xrpc-getrepo-test-")); // Source setup const sourceConfig = testConfig(join(tmpDir, "source"), []); sourceDb = new Database(join(tmpDir, "source.db")); sourceIpfs = new IpfsService({ db: sourceDb, networking: false, }); await sourceIpfs.start(); sourceRepo = new RepoManager(sourceDb, sourceConfig); sourceRepo.init(undefined, sourceIpfs, sourceIpfs); // Replica setup const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); replicaConfig.DID = replicaDid; replicaConfig.SIGNING_KEY = "0000000000000000000000000000000000000000000000000000000000000002"; replicaDb = new Database(join(tmpDir, "replica.db")); replicaIpfs = new IpfsService({ db: replicaDb, networking: false, }); await replicaIpfs.start(); replicaRepo = new RepoManager(replicaDb, replicaConfig); replicaRepo.init(undefined, replicaIpfs, replicaIpfs); syncStorage = new SyncStorage(replicaDb); syncStorage.initSchema(); // Create a mock replicationManager with getSyncStorage const mockReplicationManager = { getSyncStorage: () => syncStorage, } as unknown as import("./replication-manager.js").ReplicationManager; const firehose = new Firehose(replicaRepo); app = createApp( replicaConfig, firehose, replicaIpfs, // blockStore replicaIpfs, // networkService undefined, // blobStore mockReplicationManager, undefined, // replicatedRepoReader replicaRepo, ); }); afterEach(async () => { if (sourceIpfs.isRunning()) await sourceIpfs.stop(); if (replicaIpfs.isRunning()) await replicaIpfs.stop(); sourceDb.close(); replicaDb.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); async function replicateSource(): Promise { const carBytes = await sourceRepo.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await replicaIpfs.putBlocks(blocks); const rootCidStr = root.toString(); const internalMap = ( blocks as unknown as { map: Map } ).map; let rev = rootCidStr; const commitBytes = internalMap?.get(rootCidStr); if (commitBytes) { const commitObj = cborDecode(commitBytes) as Record; if (typeof commitObj.rev === "string") { rev = commitObj.rev; } } // Track block CIDs trackedCids = Array.from(internalMap.keys()); syncStorage.upsertState({ did: sourceDid, pdsEndpoint: "https://pds.example.com", }); syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); syncStorage.trackBlocks(sourceDid, trackedCids); } it("getRepo serves replicated DID repo as valid CAR", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "replicated getRepo test", createdAt: new Date().toISOString(), }); await replicateSource(); const res = await app.request( `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`, undefined, {}, ); expect(res.status).toBe(200); expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); // Parse the CAR and verify it has a valid root + blocks const carBytes = new Uint8Array(await res.arrayBuffer()); const { root, blocks } = await readCarWithRoot(carBytes); expect(root).toBeDefined(); const internalMap = ( blocks as unknown as { map: Map } ).map; expect(internalMap.size).toBeGreaterThan(0); }); it("getRepo returns 404 for non-replicated DID", async () => { const res = await app.request( `/xrpc/com.atproto.sync.getRepo?did=did:plc:nonexistent`, undefined, {}, ); expect(res.status).toBe(404); }); it("getRepo returns 404 for replicated DID with no rootCid yet", async () => { // Create state without rootCid syncStorage.upsertState({ did: sourceDid, pdsEndpoint: "https://pds.example.com", }); const res = await app.request( `/xrpc/com.atproto.sync.getRepo?did=${sourceDid}`, undefined, {}, ); expect(res.status).toBe(404); }); it("getBlocks serves requested blocks for replicated DID", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "replicated getBlocks test", createdAt: new Date().toISOString(), }); await replicateSource(); // Request a subset of tracked CIDs const requestCids = trackedCids.slice(0, 2); const cidsQuery = requestCids.map((c) => `cids=${c}`).join("&"); const res = await app.request( `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&${cidsQuery}`, undefined, {}, ); expect(res.status).toBe(200); expect(res.headers.get("Content-Type")).toBe("application/vnd.ipld.car"); const carBytes = new Uint8Array(await res.arrayBuffer()); const { blocks } = await readCarWithRoot(carBytes); const internalMap = ( blocks as unknown as { map: Map } ).map; // Should contain the requested blocks for (const cid of requestCids) { expect(internalMap.has(cid)).toBe(true); } }); it("getBlocks returns 404 for non-tracked DID", async () => { const res = await app.request( `/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent&cids=bafytest`, undefined, {}, ); expect(res.status).toBe(404); }); it("getBlocks only serves blocks tracked for the requested DID", async () => { await sourceRepo.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "security test", createdAt: new Date().toISOString(), }); await replicateSource(); // Store a block that is NOT tracked for sourceDid const untrackedBytes = new TextEncoder().encode("untracked-block"); const untrackedCid = await makeCidStr(untrackedBytes); await replicaIpfs.putBlock(untrackedCid, untrackedBytes); // Request the untracked CID for the tracked DID const res = await app.request( `/xrpc/com.atproto.sync.getBlocks?did=${sourceDid}&cids=${untrackedCid}`, undefined, {}, ); expect(res.status).toBe(200); // The CAR should be returned but the untracked block should not be in it const carBytes = new Uint8Array(await res.arrayBuffer()); const { blocks } = await readCarWithRoot(carBytes); const internalMap = ( blocks as unknown as { map: Map } ).map; expect(internalMap.has(untrackedCid)).toBe(false); }); }); // ============================================ // Peer fallback sync // ============================================ describe("Peer fallback in syncDid", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; let repoManager: RepoManager; let syncStorage: SyncStorage; const localDid = "did:plc:local"; const remoteDid = "did:plc:remote1"; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "peer-fallback-test-")); const config = testConfig(join(tmpDir, "data"), [remoteDid]); config.DID = localDid; db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); repoManager = new RepoManager(db, config); repoManager.init(undefined, ipfsService, ipfsService); syncStorage = new SyncStorage(db); syncStorage.initSchema(); }); afterEach(async () => { if (ipfsService.isRunning()) await ipfsService.stop(); db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("source PDS fails → fetches from peer endpoint successfully", async () => { // Create a real CAR from the local repo to serve as peer response await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "peer fallback test", createdAt: new Date().toISOString(), }); const carBytes = await repoManager.getRepoCar(); // Set up sync state syncStorage.upsertState({ did: remoteDid, pdsEndpoint: "https://source-pds.example.com", }); // Register a peer endpoint syncStorage.upsertPeerEndpoint( remoteDid, "did:plc:peer1", "https://peer1.example.com", "rev1", ); // Mock RepoFetcher: source fails, peer succeeds const mockDidResolver = { resolve: async (did: string) => ({ id: did, service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: "https://source-pds.example.com", }, ], }), }; const { RepoFetcher: RF } = await import("./repo-fetcher.js"); const fetcher = new RF(mockDidResolver as any); const originalFetchRepo = fetcher.fetchRepo.bind(fetcher); let callCount = 0; fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => { callCount++; if (endpoint === "https://source-pds.example.com") { throw new Error("Source PDS unreachable"); } if (endpoint === "https://peer1.example.com") { return carBytes; } return originalFetchRepo(endpoint, did, since); }; // Access private method via prototype const { ReplicationManager: RM } = await import("./replication-manager.js"); const manager = new RM( db, testConfig(join(tmpDir, "data"), [remoteDid]), repoManager, ipfsService, ipfsService, mockDidResolver as any, ); // Replace the internal repoFetcher with our mock (manager as any).repoFetcher = fetcher; (manager as any).syncStorage = syncStorage; // Should succeed via peer fallback await manager.syncDid(remoteDid); // Source PDS was tried (at least once), and peer was used expect(callCount).toBeGreaterThanOrEqual(2); // Verify sync state was updated const state = syncStorage.getState(remoteDid); expect(state).not.toBeNull(); expect(state!.status).toBe("synced"); }); it("source PDS fails + all peers fail → throws original error", async () => { syncStorage.upsertState({ did: remoteDid, pdsEndpoint: "https://source-pds.example.com", }); // Register a peer that will also fail syncStorage.upsertPeerEndpoint( remoteDid, "did:plc:peer1", "https://peer1.example.com", null, ); const mockDidResolver = { resolve: async (did: string) => ({ id: did, service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: "https://source-pds.example.com", }, ], }), }; const { RepoFetcher: RF } = await import("./repo-fetcher.js"); const fetcher = new RF(mockDidResolver as any); fetcher.fetchRepo = async () => { throw new Error("Connection refused"); }; const { ReplicationManager: RM } = await import("./replication-manager.js"); const manager = new RM( db, testConfig(join(tmpDir, "data"), [remoteDid]), repoManager, ipfsService, ipfsService, mockDidResolver as any, ); (manager as any).repoFetcher = fetcher; (manager as any).syncStorage = syncStorage; // Should throw the original error await expect(manager.syncDid(remoteDid)).rejects.toThrow("Connection refused"); }); it("source PDS fails without since → tries peers without since", async () => { // Create a real CAR await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "no-since test", createdAt: new Date().toISOString(), }); const carBytes = await repoManager.getRepoCar(); // Set up sync state WITHOUT a previous rev (no since) syncStorage.upsertState({ did: remoteDid, pdsEndpoint: "https://source-pds.example.com", }); syncStorage.upsertPeerEndpoint( remoteDid, "did:plc:peer1", "https://peer1.example.com", null, ); const mockDidResolver = { resolve: async (did: string) => ({ id: did, service: [ { id: "#atproto_pds", type: "AtprotoPersonalDataServer", serviceEndpoint: "https://source-pds.example.com", }, ], }), }; const { RepoFetcher: RF } = await import("./repo-fetcher.js"); const fetcher = new RF(mockDidResolver as any); let peerSincePassed: string | undefined = "NOT_CALLED"; fetcher.fetchRepo = async (endpoint: string, did: string, since?: string) => { if (endpoint === "https://source-pds.example.com") { throw new Error("Source PDS down"); } if (endpoint === "https://peer1.example.com") { peerSincePassed = since; return carBytes; } throw new Error("Unknown endpoint"); }; const { ReplicationManager: RM } = await import("./replication-manager.js"); const manager = new RM( db, testConfig(join(tmpDir, "data"), [remoteDid]), repoManager, ipfsService, ipfsService, mockDidResolver as any, ); (manager as any).repoFetcher = fetcher; (manager as any).syncStorage = syncStorage; await manager.syncDid(remoteDid); // Peer should have been called without `since` (undefined) expect(peerSincePassed).toBeUndefined(); }); });