atproto user agency toolkit for individuals and groups
1/**
2 * E2E sync integration tests: full syncDid() pipeline against mock PDS accounts.
3 *
4 * Uses createTestRepo() + startMockPds() to spin up tiny in-process PDS servers,
5 * then verifies ReplicationManager.syncDid() stores blocks, updates state, etc.
6 */
7
8import { describe, it, expect, beforeEach, afterEach } from "vitest";
9import { mkdtempSync, rmSync } from "node:fs";
10import { tmpdir } from "node:os";
11import { join } from "node:path";
12import Database from "better-sqlite3";
13import { readCarWithRoot } from "@atproto/repo";
14
15import { IpfsService, type NetworkService } from "../ipfs.js";
16import { RepoManager } from "../repo-manager.js";
17import type { Config } from "../config.js";
18import { ReplicationManager } from "./replication-manager.js";
19import {
20 createTestRepo,
21 startMockPds,
22 createMockDidResolver,
23 type MockPds,
24} from "./test-helpers.js";
25
26const TEST_DID = "did:plc:testuser1";
27const TEST_DID_2 = "did:plc:testuser2";
28
29function testConfig(dataDir: string, replicateDids: string[] = []): Config {
30 return {
31 DID: "did:plc:localnode",
32 HANDLE: "local.test",
33 PDS_HOSTNAME: "local.test",
34 AUTH_TOKEN: "test-auth-token",
35 SIGNING_KEY:
36 "0000000000000000000000000000000000000000000000000000000000000001",
37 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr",
38 JWT_SECRET: "test-jwt-secret",
39 PASSWORD_HASH: "$2a$10$test",
40 DATA_DIR: dataDir,
41 PORT: 3000,
42 IPFS_ENABLED: true,
43 IPFS_NETWORKING: false,
44 REPLICATE_DIDS: replicateDids,
45 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos",
46 FIREHOSE_ENABLED: false,
47 RATE_LIMIT_ENABLED: false,
48 RATE_LIMIT_READ_PER_MIN: 300,
49 RATE_LIMIT_SYNC_PER_MIN: 30,
50 RATE_LIMIT_SESSION_PER_MIN: 10,
51 RATE_LIMIT_WRITE_PER_MIN: 200,
52 RATE_LIMIT_CHALLENGE_PER_MIN: 20,
53 RATE_LIMIT_MAX_CONNECTIONS: 100,
54 RATE_LIMIT_FIREHOSE_PER_IP: 3,
55 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000",
56 };
57}
58
59const mockNetworkService: NetworkService = {
60 provideBlocks: async () => {},
61 publishCommitNotification: async () => {},
62 onCommitNotification: () => {},
63 subscribeCommitTopics: () => {},
64 unsubscribeCommitTopics: () => {},
65 getPeerId: () => null,
66 getMultiaddrs: () => [],
67 getConnectionCount: () => 0,
68 getRemoteAddrs: () => [],
69 publishIdentityNotification: async () => {},
70 onIdentityNotification: () => {},
71 subscribeIdentityTopics: () => {},
72 unsubscribeIdentityTopics: () => {},
73 provideForDid: async () => {},
74 findProvidersForDid: async () => [],
75};
76
77describe("E2E sync integration", () => {
78 let tmpDir: string;
79 let db: InstanceType<typeof Database>;
80 let ipfsService: IpfsService;
81 let repoManager: RepoManager;
82 let mockPds: MockPds;
83 let replicationManager: ReplicationManager;
84
85 afterEach(async () => {
86 replicationManager?.stop();
87 await mockPds?.close();
88 if (ipfsService?.isRunning()) await ipfsService.stop();
89 db?.close();
90 if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
91 });
92
93 /**
94 * Set up a full test environment with mock PDS serving the given accounts.
95 */
96 async function setup(opts: {
97 dids: string[];
98 accounts: Array<{ did: string; carBytes: Uint8Array; blobs?: Map<string, Uint8Array> }>;
99 }) {
100 tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-"));
101 db = new Database(join(tmpDir, "test.db"));
102
103 const config = testConfig(tmpDir, opts.dids);
104
105 repoManager = new RepoManager(db, config);
106 repoManager.init();
107
108 ipfsService = new IpfsService({
109 db,
110 networking: false,
111 });
112 await ipfsService.start();
113
114 mockPds = await startMockPds(opts.accounts);
115
116 const didMapping: Record<string, string> = {};
117 for (const did of opts.dids) {
118 didMapping[did] = mockPds.url;
119 }
120 const mockResolver = createMockDidResolver(didMapping);
121
122 replicationManager = new ReplicationManager(
123 db,
124 config,
125 repoManager,
126 ipfsService,
127 mockNetworkService,
128 mockResolver,
129 );
130 await replicationManager.init();
131
132 return { config, mockResolver };
133 }
134
135 it("syncs an empty account (commit block only)", async () => {
136 const carBytes = await createTestRepo(TEST_DID);
137 await setup({
138 dids: [TEST_DID],
139 accounts: [{ did: TEST_DID, carBytes }],
140 });
141
142 await replicationManager.syncDid(TEST_DID);
143
144 const states = replicationManager.getSyncStates();
145 const state = states.find((s) => s.did === TEST_DID);
146 expect(state).toBeDefined();
147 expect(state!.lastSyncRev).toBeTruthy();
148 expect(state!.rootCid).toBeTruthy();
149 expect(state!.pdsEndpoint).toBe(mockPds.url);
150 }, 15_000);
151
152 it("syncs an account with records", async () => {
153 const records = [
154 { collection: "app.bsky.feed.post", rkey: "post1", record: { text: "Hello world", createdAt: new Date().toISOString() } },
155 { collection: "app.bsky.feed.post", rkey: "post2", record: { text: "Second post", createdAt: new Date().toISOString() } },
156 { collection: "app.bsky.actor.profile", rkey: "self", record: { displayName: "Test User" } },
157 ];
158 const carBytes = await createTestRepo(TEST_DID, records);
159 await setup({
160 dids: [TEST_DID],
161 accounts: [{ did: TEST_DID, carBytes }],
162 });
163
164 await replicationManager.syncDid(TEST_DID);
165
166 const states = replicationManager.getSyncStates();
167 const state = states.find((s) => s.did === TEST_DID);
168 expect(state).toBeDefined();
169 expect(state!.lastSyncRev).toBeTruthy();
170
171 // Verify blocks were stored: parse original CAR and check each block exists
172 const { blocks } = await readCarWithRoot(carBytes);
173 const internalMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map;
174 for (const [cidStr] of internalMap.entries()) {
175 const hasIt = await ipfsService.hasBlock(cidStr);
176 expect(hasIt, `block ${cidStr} should be stored`).toBe(true);
177 }
178 }, 15_000);
179
180 it("syncs multiple accounts", async () => {
181 const car1 = await createTestRepo(TEST_DID, [
182 { collection: "app.bsky.feed.post", rkey: "a", record: { text: "User 1" } },
183 ]);
184 const car2 = await createTestRepo(TEST_DID_2, [
185 { collection: "app.bsky.feed.post", rkey: "b", record: { text: "User 2" } },
186 ]);
187
188 await setup({
189 dids: [TEST_DID, TEST_DID_2],
190 accounts: [
191 { did: TEST_DID, carBytes: car1 },
192 { did: TEST_DID_2, carBytes: car2 },
193 ],
194 });
195
196 await replicationManager.syncAll();
197
198 const states = replicationManager.getSyncStates();
199 const state1 = states.find((s) => s.did === TEST_DID);
200 const state2 = states.find((s) => s.did === TEST_DID_2);
201 expect(state1?.lastSyncRev).toBeTruthy();
202 expect(state2?.lastSyncRev).toBeTruthy();
203 }, 15_000);
204
205 it("adds a DID via admin API and syncs", async () => {
206 const carBytes = await createTestRepo(TEST_DID, [
207 { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Admin added" } },
208 ]);
209
210 // Start with no replicate DIDs — we'll add via admin API
211 tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-"));
212 db = new Database(join(tmpDir, "test.db"));
213 const config = testConfig(tmpDir, []);
214 repoManager = new RepoManager(db, config);
215 repoManager.init();
216 ipfsService = new IpfsService({
217 db,
218 networking: false,
219 });
220 await ipfsService.start();
221 mockPds = await startMockPds([{ did: TEST_DID, carBytes }]);
222 const mockResolver = createMockDidResolver({ [TEST_DID]: mockPds.url });
223
224 replicationManager = new ReplicationManager(
225 db,
226 config,
227 repoManager,
228 ipfsService,
229 mockNetworkService,
230 mockResolver,
231 );
232 await replicationManager.init();
233
234 // Add DID via admin interface
235 const result = await replicationManager.addDid(TEST_DID);
236 expect(result.status).toBe("added");
237
238 // addDid fires syncDid in background — wait a bit then check
239 await new Promise((r) => setTimeout(r, 2000));
240
241 const states = replicationManager.getSyncStates();
242 const state = states.find((s) => s.did === TEST_DID);
243 expect(state).toBeDefined();
244 expect(state!.pdsEndpoint).toBe(mockPds.url);
245 // The sync may or may not have completed yet, but the DID should be tracked
246 expect(replicationManager.getReplicateDids()).toContain(TEST_DID);
247 }, 15_000);
248
249 it("persists sync state across ReplicationManager restarts", async () => {
250 const carBytes = await createTestRepo(TEST_DID, [
251 { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Persistent" } },
252 ]);
253 await setup({
254 dids: [TEST_DID],
255 accounts: [{ did: TEST_DID, carBytes }],
256 });
257
258 await replicationManager.syncDid(TEST_DID);
259
260 const statesBefore = replicationManager.getSyncStates();
261 const stateBefore = statesBefore.find((s) => s.did === TEST_DID);
262 expect(stateBefore?.lastSyncRev).toBeTruthy();
263
264 // Stop and create a new ReplicationManager on the same DB
265 replicationManager.stop();
266
267 const config2 = testConfig(tmpDir, [TEST_DID]);
268 const mockResolver2 = createMockDidResolver({ [TEST_DID]: mockPds.url });
269 const replicationManager2 = new ReplicationManager(
270 db,
271 config2,
272 repoManager,
273 ipfsService,
274 mockNetworkService,
275 mockResolver2,
276 );
277 await replicationManager2.init();
278
279 const statesAfter = replicationManager2.getSyncStates();
280 const stateAfter = statesAfter.find((s) => s.did === TEST_DID);
281 expect(stateAfter).toBeDefined();
282 expect(stateAfter!.lastSyncRev).toBe(stateBefore!.lastSyncRev);
283 expect(stateAfter!.rootCid).toBe(stateBefore!.rootCid);
284
285 replicationManager2.stop();
286 // Replace reference so afterEach doesn't double-stop
287 replicationManager = replicationManager2;
288 }, 15_000);
289
290 it("handles sync of account with blobs", async () => {
291 // Create a blob and a record that references it
292 const blobBytes = new TextEncoder().encode("fake image data for testing");
293 const { create: createCid, CODEC_RAW, toString: cidToString } = await import("@atcute/cid");
294 const blobCid = cidToString(await createCid(CODEC_RAW, blobBytes));
295
296 const carBytes = await createTestRepo(TEST_DID, [
297 {
298 collection: "app.bsky.feed.post",
299 rkey: "with-blob",
300 record: {
301 text: "Post with image",
302 embed: {
303 $type: "app.bsky.embed.images",
304 images: [
305 {
306 alt: "test",
307 image: {
308 $type: "blob",
309 ref: { $link: blobCid },
310 mimeType: "image/jpeg",
311 size: blobBytes.length,
312 },
313 },
314 ],
315 },
316 },
317 },
318 ]);
319
320 const blobs = new Map<string, Uint8Array>();
321 blobs.set(blobCid, blobBytes);
322
323 await setup({
324 dids: [TEST_DID],
325 accounts: [{ did: TEST_DID, carBytes, blobs }],
326 });
327
328 await replicationManager.syncDid(TEST_DID);
329
330 const states = replicationManager.getSyncStates();
331 const state = states.find((s) => s.did === TEST_DID);
332 expect(state).toBeDefined();
333 expect(state!.lastSyncRev).toBeTruthy();
334
335 // Note: blob sync requires ReplicatedRepoReader which we don't set up here.
336 // The main syncDid pipeline (block storage) should still complete successfully.
337 }, 15_000);
338
339 it("sync completes within reasonable time for small repos", async () => {
340 const carBytes = await createTestRepo(TEST_DID, [
341 { collection: "app.bsky.feed.post", rkey: "t1", record: { text: "Timing test" } },
342 ]);
343 await setup({
344 dids: [TEST_DID],
345 accounts: [{ did: TEST_DID, carBytes }],
346 });
347
348 const start = Date.now();
349 await replicationManager.syncDid(TEST_DID);
350 const elapsed = Date.now() - start;
351
352 // A tiny mock repo should sync in well under 5 seconds
353 expect(elapsed).toBeLessThan(5000);
354 }, 15_000);
355});