atproto user agency toolkit for individuals and groups
at main 355 lines 11 kB view raw
1/** 2 * E2E sync integration tests: full syncDid() pipeline against mock PDS accounts. 3 * 4 * Uses createTestRepo() + startMockPds() to spin up tiny in-process PDS servers, 5 * then verifies ReplicationManager.syncDid() stores blocks, updates state, etc. 6 */ 7 8import { describe, it, expect, beforeEach, afterEach } from "vitest"; 9import { mkdtempSync, rmSync } from "node:fs"; 10import { tmpdir } from "node:os"; 11import { join } from "node:path"; 12import Database from "better-sqlite3"; 13import { readCarWithRoot } from "@atproto/repo"; 14 15import { IpfsService, type NetworkService } from "../ipfs.js"; 16import { RepoManager } from "../repo-manager.js"; 17import type { Config } from "../config.js"; 18import { ReplicationManager } from "./replication-manager.js"; 19import { 20 createTestRepo, 21 startMockPds, 22 createMockDidResolver, 23 type MockPds, 24} from "./test-helpers.js"; 25 26const TEST_DID = "did:plc:testuser1"; 27const TEST_DID_2 = "did:plc:testuser2"; 28 29function testConfig(dataDir: string, replicateDids: string[] = []): Config { 30 return { 31 DID: "did:plc:localnode", 32 HANDLE: "local.test", 33 PDS_HOSTNAME: "local.test", 34 AUTH_TOKEN: "test-auth-token", 35 SIGNING_KEY: 36 "0000000000000000000000000000000000000000000000000000000000000001", 37 SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 38 JWT_SECRET: "test-jwt-secret", 39 PASSWORD_HASH: "$2a$10$test", 40 DATA_DIR: dataDir, 41 PORT: 3000, 42 IPFS_ENABLED: true, 43 IPFS_NETWORKING: false, 44 REPLICATE_DIDS: replicateDids, 45 FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 46 FIREHOSE_ENABLED: false, 47 RATE_LIMIT_ENABLED: false, 48 RATE_LIMIT_READ_PER_MIN: 300, 49 RATE_LIMIT_SYNC_PER_MIN: 30, 50 RATE_LIMIT_SESSION_PER_MIN: 10, 51 RATE_LIMIT_WRITE_PER_MIN: 200, 52 RATE_LIMIT_CHALLENGE_PER_MIN: 20, 53 RATE_LIMIT_MAX_CONNECTIONS: 100, 54 RATE_LIMIT_FIREHOSE_PER_IP: 3, 55 OAUTH_ENABLED: false, PUBLIC_URL: "http://localhost:3000", 56 }; 57} 58 59const mockNetworkService: NetworkService = { 60 provideBlocks: async () => {}, 61 publishCommitNotification: async () => {}, 62 onCommitNotification: () => {}, 63 subscribeCommitTopics: () => {}, 64 unsubscribeCommitTopics: () => {}, 65 getPeerId: () => null, 66 getMultiaddrs: () => [], 67 getConnectionCount: () => 0, 68 getRemoteAddrs: () => [], 69 publishIdentityNotification: async () => {}, 70 onIdentityNotification: () => {}, 71 subscribeIdentityTopics: () => {}, 72 unsubscribeIdentityTopics: () => {}, 73 provideForDid: async () => {}, 74 findProvidersForDid: async () => [], 75}; 76 77describe("E2E sync integration", () => { 78 let tmpDir: string; 79 let db: InstanceType<typeof Database>; 80 let ipfsService: IpfsService; 81 let repoManager: RepoManager; 82 let mockPds: MockPds; 83 let replicationManager: ReplicationManager; 84 85 afterEach(async () => { 86 replicationManager?.stop(); 87 await mockPds?.close(); 88 if (ipfsService?.isRunning()) await ipfsService.stop(); 89 db?.close(); 90 if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); 91 }); 92 93 /** 94 * Set up a full test environment with mock PDS serving the given accounts. 95 */ 96 async function setup(opts: { 97 dids: string[]; 98 accounts: Array<{ did: string; carBytes: Uint8Array; blobs?: Map<string, Uint8Array> }>; 99 }) { 100 tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-")); 101 db = new Database(join(tmpDir, "test.db")); 102 103 const config = testConfig(tmpDir, opts.dids); 104 105 repoManager = new RepoManager(db, config); 106 repoManager.init(); 107 108 ipfsService = new IpfsService({ 109 db, 110 networking: false, 111 }); 112 await ipfsService.start(); 113 114 mockPds = await startMockPds(opts.accounts); 115 116 const didMapping: Record<string, string> = {}; 117 for (const did of opts.dids) { 118 didMapping[did] = mockPds.url; 119 } 120 const mockResolver = createMockDidResolver(didMapping); 121 122 replicationManager = new ReplicationManager( 123 db, 124 config, 125 repoManager, 126 ipfsService, 127 mockNetworkService, 128 mockResolver, 129 ); 130 await replicationManager.init(); 131 132 return { config, mockResolver }; 133 } 134 135 it("syncs an empty account (commit block only)", async () => { 136 const carBytes = await createTestRepo(TEST_DID); 137 await setup({ 138 dids: [TEST_DID], 139 accounts: [{ did: TEST_DID, carBytes }], 140 }); 141 142 await replicationManager.syncDid(TEST_DID); 143 144 const states = replicationManager.getSyncStates(); 145 const state = states.find((s) => s.did === TEST_DID); 146 expect(state).toBeDefined(); 147 expect(state!.lastSyncRev).toBeTruthy(); 148 expect(state!.rootCid).toBeTruthy(); 149 expect(state!.pdsEndpoint).toBe(mockPds.url); 150 }, 15_000); 151 152 it("syncs an account with records", async () => { 153 const records = [ 154 { collection: "app.bsky.feed.post", rkey: "post1", record: { text: "Hello world", createdAt: new Date().toISOString() } }, 155 { collection: "app.bsky.feed.post", rkey: "post2", record: { text: "Second post", createdAt: new Date().toISOString() } }, 156 { collection: "app.bsky.actor.profile", rkey: "self", record: { displayName: "Test User" } }, 157 ]; 158 const carBytes = await createTestRepo(TEST_DID, records); 159 await setup({ 160 dids: [TEST_DID], 161 accounts: [{ did: TEST_DID, carBytes }], 162 }); 163 164 await replicationManager.syncDid(TEST_DID); 165 166 const states = replicationManager.getSyncStates(); 167 const state = states.find((s) => s.did === TEST_DID); 168 expect(state).toBeDefined(); 169 expect(state!.lastSyncRev).toBeTruthy(); 170 171 // Verify blocks were stored: parse original CAR and check each block exists 172 const { blocks } = await readCarWithRoot(carBytes); 173 const internalMap = (blocks as unknown as { map: Map<string, Uint8Array> }).map; 174 for (const [cidStr] of internalMap.entries()) { 175 const hasIt = await ipfsService.hasBlock(cidStr); 176 expect(hasIt, `block ${cidStr} should be stored`).toBe(true); 177 } 178 }, 15_000); 179 180 it("syncs multiple accounts", async () => { 181 const car1 = await createTestRepo(TEST_DID, [ 182 { collection: "app.bsky.feed.post", rkey: "a", record: { text: "User 1" } }, 183 ]); 184 const car2 = await createTestRepo(TEST_DID_2, [ 185 { collection: "app.bsky.feed.post", rkey: "b", record: { text: "User 2" } }, 186 ]); 187 188 await setup({ 189 dids: [TEST_DID, TEST_DID_2], 190 accounts: [ 191 { did: TEST_DID, carBytes: car1 }, 192 { did: TEST_DID_2, carBytes: car2 }, 193 ], 194 }); 195 196 await replicationManager.syncAll(); 197 198 const states = replicationManager.getSyncStates(); 199 const state1 = states.find((s) => s.did === TEST_DID); 200 const state2 = states.find((s) => s.did === TEST_DID_2); 201 expect(state1?.lastSyncRev).toBeTruthy(); 202 expect(state2?.lastSyncRev).toBeTruthy(); 203 }, 15_000); 204 205 it("adds a DID via admin API and syncs", async () => { 206 const carBytes = await createTestRepo(TEST_DID, [ 207 { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Admin added" } }, 208 ]); 209 210 // Start with no replicate DIDs — we'll add via admin API 211 tmpDir = mkdtempSync(join(tmpdir(), "e2e-sync-")); 212 db = new Database(join(tmpDir, "test.db")); 213 const config = testConfig(tmpDir, []); 214 repoManager = new RepoManager(db, config); 215 repoManager.init(); 216 ipfsService = new IpfsService({ 217 db, 218 networking: false, 219 }); 220 await ipfsService.start(); 221 mockPds = await startMockPds([{ did: TEST_DID, carBytes }]); 222 const mockResolver = createMockDidResolver({ [TEST_DID]: mockPds.url }); 223 224 replicationManager = new ReplicationManager( 225 db, 226 config, 227 repoManager, 228 ipfsService, 229 mockNetworkService, 230 mockResolver, 231 ); 232 await replicationManager.init(); 233 234 // Add DID via admin interface 235 const result = await replicationManager.addDid(TEST_DID); 236 expect(result.status).toBe("added"); 237 238 // addDid fires syncDid in background — wait a bit then check 239 await new Promise((r) => setTimeout(r, 2000)); 240 241 const states = replicationManager.getSyncStates(); 242 const state = states.find((s) => s.did === TEST_DID); 243 expect(state).toBeDefined(); 244 expect(state!.pdsEndpoint).toBe(mockPds.url); 245 // The sync may or may not have completed yet, but the DID should be tracked 246 expect(replicationManager.getReplicateDids()).toContain(TEST_DID); 247 }, 15_000); 248 249 it("persists sync state across ReplicationManager restarts", async () => { 250 const carBytes = await createTestRepo(TEST_DID, [ 251 { collection: "app.bsky.feed.post", rkey: "p1", record: { text: "Persistent" } }, 252 ]); 253 await setup({ 254 dids: [TEST_DID], 255 accounts: [{ did: TEST_DID, carBytes }], 256 }); 257 258 await replicationManager.syncDid(TEST_DID); 259 260 const statesBefore = replicationManager.getSyncStates(); 261 const stateBefore = statesBefore.find((s) => s.did === TEST_DID); 262 expect(stateBefore?.lastSyncRev).toBeTruthy(); 263 264 // Stop and create a new ReplicationManager on the same DB 265 replicationManager.stop(); 266 267 const config2 = testConfig(tmpDir, [TEST_DID]); 268 const mockResolver2 = createMockDidResolver({ [TEST_DID]: mockPds.url }); 269 const replicationManager2 = new ReplicationManager( 270 db, 271 config2, 272 repoManager, 273 ipfsService, 274 mockNetworkService, 275 mockResolver2, 276 ); 277 await replicationManager2.init(); 278 279 const statesAfter = replicationManager2.getSyncStates(); 280 const stateAfter = statesAfter.find((s) => s.did === TEST_DID); 281 expect(stateAfter).toBeDefined(); 282 expect(stateAfter!.lastSyncRev).toBe(stateBefore!.lastSyncRev); 283 expect(stateAfter!.rootCid).toBe(stateBefore!.rootCid); 284 285 replicationManager2.stop(); 286 // Replace reference so afterEach doesn't double-stop 287 replicationManager = replicationManager2; 288 }, 15_000); 289 290 it("handles sync of account with blobs", async () => { 291 // Create a blob and a record that references it 292 const blobBytes = new TextEncoder().encode("fake image data for testing"); 293 const { create: createCid, CODEC_RAW, toString: cidToString } = await import("@atcute/cid"); 294 const blobCid = cidToString(await createCid(CODEC_RAW, blobBytes)); 295 296 const carBytes = await createTestRepo(TEST_DID, [ 297 { 298 collection: "app.bsky.feed.post", 299 rkey: "with-blob", 300 record: { 301 text: "Post with image", 302 embed: { 303 $type: "app.bsky.embed.images", 304 images: [ 305 { 306 alt: "test", 307 image: { 308 $type: "blob", 309 ref: { $link: blobCid }, 310 mimeType: "image/jpeg", 311 size: blobBytes.length, 312 }, 313 }, 314 ], 315 }, 316 }, 317 }, 318 ]); 319 320 const blobs = new Map<string, Uint8Array>(); 321 blobs.set(blobCid, blobBytes); 322 323 await setup({ 324 dids: [TEST_DID], 325 accounts: [{ did: TEST_DID, carBytes, blobs }], 326 }); 327 328 await replicationManager.syncDid(TEST_DID); 329 330 const states = replicationManager.getSyncStates(); 331 const state = states.find((s) => s.did === TEST_DID); 332 expect(state).toBeDefined(); 333 expect(state!.lastSyncRev).toBeTruthy(); 334 335 // Note: blob sync requires ReplicatedRepoReader which we don't set up here. 336 // The main syncDid pipeline (block storage) should still complete successfully. 337 }, 15_000); 338 339 it("sync completes within reasonable time for small repos", async () => { 340 const carBytes = await createTestRepo(TEST_DID, [ 341 { collection: "app.bsky.feed.post", rkey: "t1", record: { text: "Timing test" } }, 342 ]); 343 await setup({ 344 dids: [TEST_DID], 345 accounts: [{ did: TEST_DID, carBytes }], 346 }); 347 348 const start = Date.now(); 349 await replicationManager.syncDid(TEST_DID); 350 const elapsed = Date.now() - start; 351 352 // A tiny mock repo should sync in well under 5 seconds 353 expect(elapsed).toBeLessThan(5000); 354 }, 15_000); 355});