/** * Tests for incremental sync: * - SyncStorage.getRootCidForRev() * - generateCarForDid() with since parameter (incremental CAR) */ import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import Database from "better-sqlite3"; import { IpfsService } from "../ipfs.js"; import { RepoManager } from "../repo-manager.js"; import type { Config } from "../config.js"; import { readCarWithRoot } from "@atproto/repo"; import { SyncStorage } from "./sync-storage.js"; import { generateCarForDid } from "../xrpc/sync.js"; function testConfig(dataDir: string): Config { return { DID: "did:plc:test123", HANDLE: "test.example.com", PDS_HOSTNAME: "test.example.com", AUTH_TOKEN: "test-auth-token", SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 3000, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: [], FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } const TEST_DID = "did:plc:testrepo"; // ============================================ // SyncStorage.getRootCidForRev() // ============================================ describe("SyncStorage.getRootCidForRev", () => { let tmpDir: string; let db: InstanceType; let storage: SyncStorage; beforeEach(() => { tmpDir = mkdtempSync(join(tmpdir(), "incremental-sync-test-")); db = new Database(join(tmpDir, "test.db")); storage = new SyncStorage(db); storage.initSchema(); }); afterEach(() => { db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); it("returns root_cid for a known successful sync", () => { const eventId = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(eventId, { status: "success", rev: "rev-001", rootCid: "bafyreiabc123", }); const result = storage.getRootCidForRev(TEST_DID, "rev-001"); expect(result).toBe("bafyreiabc123"); }); it("returns null for unknown rev", () => { const eventId = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(eventId, { status: "success", rev: "rev-001", rootCid: "bafyreiabc123", }); const result = storage.getRootCidForRev(TEST_DID, "rev-unknown"); expect(result).toBeNull(); }); it("returns null for unknown DID", () => { const eventId = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(eventId, { status: "success", rev: "rev-001", rootCid: "bafyreiabc123", }); const result = storage.getRootCidForRev("did:plc:unknown", "rev-001"); expect(result).toBeNull(); }); it("excludes failed syncs", () => { const eventId = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(eventId, { status: "error", rev: "rev-001", rootCid: "bafyreiabc123", errorMessage: "fetch failed", }); const result = storage.getRootCidForRev(TEST_DID, "rev-001"); expect(result).toBeNull(); }); it("excludes syncs without root_cid", () => { const eventId = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(eventId, { status: "success", rev: "rev-001", }); const result = storage.getRootCidForRev(TEST_DID, "rev-001"); expect(result).toBeNull(); }); it("returns most recent root_cid when multiple syncs exist for same rev", () => { const event1 = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(event1, { status: "success", rev: "rev-001", rootCid: "bafyreiold", }); const event2 = storage.startSyncEvent(TEST_DID, "pds"); storage.completeSyncEvent(event2, { status: "success", rev: "rev-001", rootCid: "bafyreinew", }); const result = storage.getRootCidForRev(TEST_DID, "rev-001"); expect(result).toBe("bafyreinew"); }); }); // ============================================ // Incremental CAR generation // ============================================ describe("Incremental CAR generation", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; let repoManager: RepoManager; let syncStorage: SyncStorage; beforeEach(async () => { tmpDir = mkdtempSync(join(tmpdir(), "incremental-car-test-")); const config = testConfig(tmpDir); db = new Database(join(tmpDir, "test.db")); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); repoManager = new RepoManager(db, config); repoManager.init(undefined, ipfsService, ipfsService); syncStorage = new SyncStorage(db); syncStorage.initSchema(); }); afterEach(async () => { if (ipfsService.isRunning()) { await ipfsService.stop(); } db.close(); try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} }); /** * Helper: snapshot current repo state into IPFS + SyncStorage. * Returns { rev, rootCid, blockCids }. */ async function snapshotRepo(): Promise<{ rev: string; rootCid: string; blockCids: string[] }> { const status = await repoManager.getRepoStatus(); const carBytes = await repoManager.getRepoCar(); const { root, blocks } = await readCarWithRoot(carBytes); await ipfsService.putBlocks(blocks); const rootCid = root.toString(); const blockCids: string[] = []; const blockMap = (blocks as unknown as { map: Map }).map; for (const [cidStr] of blockMap) { blockCids.push(cidStr); } // Track in sync storage syncStorage.upsertState({ did: TEST_DID, pdsEndpoint: "https://pds.example.com" }); syncStorage.updateSyncProgress(TEST_DID, status.rev, rootCid); syncStorage.clearBlocks(TEST_DID); syncStorage.trackBlocks(TEST_DID, blockCids); // Record in sync history const eventId = syncStorage.startSyncEvent(TEST_DID, "test"); syncStorage.completeSyncEvent(eventId, { status: "success", rev: status.rev, rootCid, blocksAdded: blockCids.length, }); return { rev: status.rev, rootCid, blockCids }; } it("returns smaller CAR for incremental sync after changes", async () => { // Create initial records await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "First post", createdAt: "2025-01-01T00:00:00.000Z", }); await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Second post", createdAt: "2025-01-01T00:00:01.000Z", }); const snapshot1 = await snapshotRepo(); // Add more records await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Third post", createdAt: "2025-01-01T00:00:02.000Z", }); const snapshot2 = await snapshotRepo(); // Full CAR (no since) const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); expect(fullCar).not.toBeNull(); // Incremental CAR (since first snapshot) const incrementalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot1.rev); expect(incrementalCar).not.toBeNull(); // Incremental should be smaller than full expect(incrementalCar!.length).toBeLessThan(fullCar!.length); }); it("falls back to full CAR for unknown since rev", async () => { await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Hello", createdAt: "2025-01-01T00:00:00.000Z", }); await snapshotRepo(); // Full CAR const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); expect(fullCar).not.toBeNull(); // Incremental with unknown rev → should fall back to full const fallbackCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, "unknown-rev"); expect(fallbackCar).not.toBeNull(); // Should be same size as full (fallback) expect(fallbackCar!.length).toBe(fullCar!.length); }); it("returns minimal CAR when since matches current rev", async () => { await repoManager.createRecord("app.bsky.feed.post", undefined, { $type: "app.bsky.feed.post", text: "Hello", createdAt: "2025-01-01T00:00:00.000Z", }); const snapshot = await snapshotRepo(); // Full CAR const fullCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage); expect(fullCar).not.toBeNull(); // Incremental with since = current rev → minimal CAR (just commit block) const minimalCar = await generateCarForDid(TEST_DID, ipfsService, syncStorage, snapshot.rev); expect(minimalCar).not.toBeNull(); // Minimal should be much smaller than full expect(minimalCar!.length).toBeLessThan(fullCar!.length); // Parse the minimal CAR to verify it has blocks const { root } = await readCarWithRoot(minimalCar!); expect(root.toString()).toBe(snapshot.rootCid); }); it("returns null for DID with no sync state", async () => { const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage); expect(result).toBeNull(); }); it("returns null for DID with no sync state even with since", async () => { const result = await generateCarForDid("did:plc:nonexistent", ipfsService, syncStorage, "some-rev"); expect(result).toBeNull(); }); });