/** * E2E sync integration tests: full syncDid() pipeline against mock PDS accounts. * * Uses createTestRepo() + startMockPds() to spin up tiny in-process PDS servers, * then verifies ReplicationManager.syncDid() stores blocks, updates state, etc. */ import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import Database from "better-sqlite3"; import { readCarWithRoot } from "@atproto/repo"; import { IpfsService, type NetworkService } from "../ipfs.js"; import { RepoManager } from "../repo-manager.js"; import type { Config } from "../config.js"; import { ReplicationManager } from "./replication-manager.js"; import { createTestRepo, startMockPds, createMockDidResolver, type MockPds, } from "./test-helpers.js"; const TEST_DID = "did:plc:testuser1"; const TEST_DID_2 = "did:plc:testuser2"; function testConfig(dataDir: string, replicateDids: string[] = []): Config { return { DID: "did:plc:localnode", HANDLE: "local.test", PDS_HOSTNAME: "local.test", AUTH_TOKEN: "test-auth-token", SIGNING_KEY: "0000000000000000000000000000000000000000000000000000000000000001", SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", JWT_SECRET: "test-jwt-secret", PASSWORD_HASH: "$2a$10$test", DATA_DIR: dataDir, PORT: 3000, IPFS_ENABLED: true, IPFS_NETWORKING: false, REPLICATE_DIDS: replicateDids, FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", FIREHOSE_ENABLED: false, RATE_LIMIT_ENABLED: false, RATE_LIMIT_READ_PER_MIN: 300, RATE_LIMIT_SYNC_PER_MIN: 30, RATE_LIMIT_SESSION_PER_MIN: 10, RATE_LIMIT_WRITE_PER_MIN: 200, RATE_LIMIT_CHALLENGE_PER_MIN: 20, RATE_LIMIT_MAX_CONNECTIONS: 100, RATE_LIMIT_FIREHOSE_PER_IP: 3, OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", }; } const mockNetworkService: NetworkService = { provideBlocks: async () => {}, publishCommitNotification: async () => {}, onCommitNotification: () => {}, subscribeCommitTopics: () => {}, unsubscribeCommitTopics: () => {}, getPeerId: () => null, getMultiaddrs: () => [], getConnectionCount: () => 0, getRemoteAddrs: () => [], publishIdentityNotification: async () => {}, onIdentityNotification: () => {}, subscribeIdentityTopics: () => {}, unsubscribeIdentityTopics: () => {}, provideForDid: async () => {}, findProvidersForDid: async () => [], }; describe("E2E sync integration", () => { let tmpDir: string; let db: InstanceType; let ipfsService: IpfsService; let repoManager: RepoManager; let mockPds: MockPds; let replicationManager: ReplicationManager; afterEach(async () => { replicationManager?.stop(); await mockPds?.close(); if (ipfsService?.isRunning()) await ipfsService.stop(); db?.close(); if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); /** * Set up a full test environment with mock PDS serving the given accounts. */ async function setup(opts: { dids: string[]; accounts: Array<{ did: string; carBytes: Uint8Array; blobs?: Map }>; }) { tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-")); db = new Database(join(tmpDir, "test.db")); const config = testConfig(tmpDir, opts.dids); repoManager = new RepoManager(db, config); repoManager.init(); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); mockPds = await startMockPds(opts.accounts); const didMapping: Record = {}; for (const did of opts.dids) { didMapping[did] = mockPds.url; } const mockResolver = createMockDidResolver(didMapping); replicationManager = new ReplicationManager( db, config, repoManager, ipfsService, mockNetworkService, mockResolver, ); await replicationManager.init(); return { config, mockResolver }; } it("syncs an empty account (commit block only)", async () => { const carBytes = await createTestRepo(TEST_DID); await setup({ dids: [TEST_DID], accounts: [{ did: TEST_DID, carBytes }], }); await replicationManager.syncDid(TEST_DID); const states = replicationManager.getSyncStates(); const state = states.find((s) => s.did === TEST_DID); expect(state).toBeDefined(); expect(state!.lastSyncRev).toBeTruthy(); expect(state!.rootCid).toBeTruthy(); expect(state!.pdsEndpoint).toBe(mockPds.url); }, 15_000); it("syncs an account with records", async () => { const records = [ { collection: "app.bsky.feed.post", rkey: "post1", record: { text: "Hello world", createdAt: new Date().toISOString() } }, { collection: "app.bsky.feed.post", rkey: "post2", record: { text: "Second post", createdAt: new Date().toISOString() } }, { collection: "app.bsky.actor.profile", rkey: "self", record: { displayName: "Test User" } }, ]; const carBytes = await createTestRepo(TEST_DID, records); await setup({ dids: [TEST_DID], accounts: [{ did: TEST_DID, carBytes }], }); await replicationManager.syncDid(TEST_DID); const states = replicationManager.getSyncStates(); const state = states.find((s) => s.did === TEST_DID); expect(state).toBeDefined(); expect(state!.lastSyncRev).toBeTruthy(); // Verify blocks were stored: parse original CAR and check each block exists const { blocks } = await readCarWithRoot(carBytes); const internalMap = (blocks as unknown as { map: Map }).map; for (const [cidStr] of internalMap.entries()) { const hasIt = await ipfsService.hasBlock(cidStr); expect(hasIt, `block ${cidStr} should be stored`).toBe(true); } }, 15_000); it("syncs multiple accounts", async () => { const car1 = await createTestRepo(TEST_DID, [ { collection: "app.bsky.feed.post", rkey: "a", record: { text: "User 1" } }, ]); const car2 = await createTestRepo(TEST_DID_2, [ { collection: "app.bsky.feed.post", rkey: "b", record: { text: "User 2" } }, ]); await setup({ dids: [TEST_DID, TEST_DID_2], accounts: [ { did: TEST_DID, carBytes: car1 }, { did: TEST_DID_2, carBytes: car2 }, ], }); await replicationManager.syncAll(); const states = replicationManager.getSyncStates(); const state1 = states.find((s) => s.did === TEST_DID); const state2 = states.find((s) => s.did === TEST_DID_2); expect(state1?.lastSyncRev).toBeTruthy(); expect(state2?.lastSyncRev).toBeTruthy(); }, 15_000); it("adds a DID via admin API and syncs", async () => { const carBytes = await createTestRepo(TEST_DID, [ { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Admin added" } }, ]); // Start with no replicate DIDs — we'll add via admin API tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-")); db = new Database(join(tmpDir, "test.db")); const config = testConfig(tmpDir, []); repoManager = new RepoManager(db, config); repoManager.init(); ipfsService = new IpfsService({ db, networking: false, }); await ipfsService.start(); mockPds = await startMockPds([{ did: TEST_DID, carBytes }]); const mockResolver = createMockDidResolver({ [TEST_DID]: mockPds.url }); replicationManager = new ReplicationManager( db, config, repoManager, ipfsService, mockNetworkService, mockResolver, ); await replicationManager.init(); // Add DID via admin interface const result = await replicationManager.addDid(TEST_DID); expect(result.status).toBe("added"); // addDid fires syncDid in background — wait a bit then check await new Promise((r) => setTimeout(r, 2000)); const states = replicationManager.getSyncStates(); const state = states.find((s) => s.did === TEST_DID); expect(state).toBeDefined(); expect(state!.pdsEndpoint).toBe(mockPds.url); // The sync may or may not have completed yet, but the DID should be tracked expect(replicationManager.getReplicateDids()).toContain(TEST_DID); }, 15_000); it("persists sync state across ReplicationManager restarts", async () => { const carBytes = await createTestRepo(TEST_DID, [ { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Persistent" } }, ]); await setup({ dids: [TEST_DID], accounts: [{ did: TEST_DID, carBytes }], }); await replicationManager.syncDid(TEST_DID); const statesBefore = replicationManager.getSyncStates(); const stateBefore = statesBefore.find((s) => s.did === TEST_DID); expect(stateBefore?.lastSyncRev).toBeTruthy(); // Stop and create a new ReplicationManager on the same DB replicationManager.stop(); const config2 = testConfig(tmpDir, [TEST_DID]); const mockResolver2 = createMockDidResolver({ [TEST_DID]: mockPds.url }); const replicationManager2 = new ReplicationManager( db, config2, repoManager, ipfsService, mockNetworkService, mockResolver2, ); await replicationManager2.init(); const statesAfter = replicationManager2.getSyncStates(); const stateAfter = statesAfter.find((s) => s.did === TEST_DID); expect(stateAfter).toBeDefined(); expect(stateAfter!.lastSyncRev).toBe(stateBefore!.lastSyncRev); expect(stateAfter!.rootCid).toBe(stateBefore!.rootCid); replicationManager2.stop(); // Replace reference so afterEach doesn't double-stop replicationManager = replicationManager2; }, 15_000); it("handles sync of account with blobs", async () => { // Create a blob and a record that references it const blobBytes = new TextEncoder().encode("fake image data for testing"); const { create: createCid, CODEC_RAW, toString: cidToString } = await import("@atcute/cid"); const blobCid = cidToString(await createCid(CODEC_RAW, blobBytes)); const carBytes = await createTestRepo(TEST_DID, [ { collection: "app.bsky.feed.post", rkey: "with-blob", record: { text: "Post with image", embed: { $type: "app.bsky.embed.images", images: [ { alt: "test", image: { $type: "blob", ref: { $link: blobCid }, mimeType: "image/jpeg", size: blobBytes.length, }, }, ], }, }, }, ]); const blobs = new Map(); blobs.set(blobCid, blobBytes); await setup({ dids: [TEST_DID], accounts: [{ did: TEST_DID, carBytes, blobs }], }); await replicationManager.syncDid(TEST_DID); const states = replicationManager.getSyncStates(); const state = states.find((s) => s.did === TEST_DID); expect(state).toBeDefined(); expect(state!.lastSyncRev).toBeTruthy(); // Note: blob sync requires ReplicatedRepoReader which we don't set up here. // The main syncDid pipeline (block storage) should still complete successfully. }, 15_000); it("sync completes within reasonable time for small repos", async () => { const carBytes = await createTestRepo(TEST_DID, [ { collection: "app.bsky.feed.post", rkey: "t1", record: { text: "Timing test" } }, ]); await setup({ dids: [TEST_DID], accounts: [{ did: TEST_DID, carBytes }], }); const start = Date.now(); await replicationManager.syncDid(TEST_DID); const elapsed = Date.now() - start; // A tiny mock repo should sync in well under 5 seconds expect(elapsed).toBeLessThan(5000); }, 15_000); });