atproto user agency toolkit for individuals and groups
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

Serve replicated records via XRPC read endpoints

Replicated repos' blocks are in IPFS but weren't queryable via standard
atproto endpoints. This adds an IpfsReadableBlockstore adapter and
ReplicatedRepoReader service that loads ReadableRepo instances on demand,
making getRecord, listRecords, and describeRepo work for replicated DIDs.

Also fixes rev extraction: syncDid() now decodes the commit block CBOR to
get the actual TID rev (instead of storing the root CID as rev), and stores
both root_cid and rev separately in replication_state.

+1117 -34
+25 -2
src/index.ts
··· 7 7 import type { BlockStore, NetworkService } from "./ipfs.js"; 8 8 import type { BlobStore } from "./blobs.js"; 9 9 import type { ReplicationManager } from "./replication/replication-manager.js"; 10 + import type { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 10 11 import * as sync from "./xrpc/sync.js"; 11 12 import * as repo from "./xrpc/repo.js"; 12 13 import * as server from "./xrpc/server.js"; ··· 25 26 networkService?: NetworkService, 26 27 blobStore?: BlobStore, 27 28 replicationManager?: ReplicationManager, 29 + replicatedRepoReader?: ReplicatedRepoReader, 28 30 ) { 29 31 const app = new Hono<{ Bindings: Config }>(); 30 32 ··· 201 203 sync.getRepo(c, repoManager), 202 204 ); 203 205 app.get("/xrpc/com.atproto.sync.getRepoStatus", (c) => 204 - sync.getRepoStatus(c, repoManager), 206 + sync.getRepoStatus(c, repoManager, replicatedRepoReader), 205 207 ); 206 208 app.get("/xrpc/com.atproto.sync.getBlocks", (c) => 207 209 sync.getBlocks(c, repoManager), ··· 210 212 sync.getBlob(c, repoManager), 211 213 ); 212 214 app.get("/xrpc/com.atproto.sync.listRepos", (c) => 213 - sync.listRepos(c, repoManager), 215 + sync.listRepos(c, repoManager, replicatedRepoReader), 214 216 ); 215 217 app.get("/xrpc/com.atproto.sync.listBlobs", (c) => 216 218 sync.listBlobs(c, repoManager), ··· 230 232 if (!requestedRepo || requestedRepo === config.DID) { 231 233 return repo.describeRepo(c, repoManager); 232 234 } 235 + if ( 236 + replicatedRepoReader && 237 + requestedRepo && 238 + replicatedRepoReader.isReplicatedDid(requestedRepo) 239 + ) { 240 + return repo.describeRepoReplicated(c, replicatedRepoReader, requestedRepo); 241 + } 233 242 await next(); 234 243 }); 235 244 ··· 238 247 if (!requestedRepo || requestedRepo === config.DID) { 239 248 return repo.getRecord(c, repoManager); 240 249 } 250 + if ( 251 + replicatedRepoReader && 252 + requestedRepo && 253 + replicatedRepoReader.isReplicatedDid(requestedRepo) 254 + ) { 255 + return repo.getRecordReplicated(c, replicatedRepoReader, requestedRepo); 256 + } 241 257 await next(); 242 258 }); 243 259 ··· 245 261 const requestedRepo = c.req.query("repo"); 246 262 if (!requestedRepo || requestedRepo === config.DID) { 247 263 return repo.listRecords(c, repoManager); 264 + } 265 + if ( 266 + replicatedRepoReader && 267 + requestedRepo && 268 + replicatedRepoReader.isReplicatedDid(requestedRepo) 269 + ) { 270 + return repo.listRecordsReplicated(c, replicatedRepoReader, requestedRepo); 248 271 } 249 272 await next(); 250 273 });
+36
src/replication/ipfs-readable-blockstore.ts
··· 1 + /** 2 + * Adapter: wraps our BlockStore interface to implement @atproto/repo's 3 + * ReadableBlockstore abstract class (getBytes, has, getBlocks). 4 + */ 5 + 6 + import { CID } from "multiformats"; 7 + import { ReadableBlockstore, BlockMap } from "@atproto/repo"; 8 + import type { BlockStore } from "../ipfs.js"; 9 + 10 + export class IpfsReadableBlockstore extends ReadableBlockstore { 11 + constructor(private blockStore: BlockStore) { 12 + super(); 13 + } 14 + 15 + async getBytes(cid: CID): Promise<Uint8Array | null> { 16 + return this.blockStore.getBlock(cid.toString()); 17 + } 18 + 19 + async has(cid: CID): Promise<boolean> { 20 + return this.blockStore.hasBlock(cid.toString()); 21 + } 22 + 23 + async getBlocks(cids: CID[]): Promise<{ blocks: BlockMap; missing: CID[] }> { 24 + const blocks = new BlockMap(); 25 + const missing: CID[] = []; 26 + for (const cid of cids) { 27 + const bytes = await this.blockStore.getBlock(cid.toString()); 28 + if (bytes) { 29 + blocks.set(cid, bytes); 30 + } else { 31 + missing.push(cid); 32 + } 33 + } 34 + return { blocks, missing }; 35 + } 36 + }
+206
src/replication/replicated-repo-reader.ts
··· 1 + /** 2 + * Service that loads and caches ReadableRepo instances for replicated DIDs. 3 + * Provides read-only access to replicated repo data via IPFS blocks. 4 + */ 5 + 6 + import { CID } from "multiformats"; 7 + import { ReadableRepo } from "@atproto/repo/dist/readable-repo.js"; 8 + import type { BlockStore } from "../ipfs.js"; 9 + import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js"; 10 + import type { SyncStorage } from "./sync-storage.js"; 11 + 12 + export class ReplicatedRepoReader { 13 + private cache = new Map<string, ReadableRepo>(); 14 + private readableBlockstore: IpfsReadableBlockstore; 15 + 16 + constructor( 17 + blockStore: BlockStore, 18 + private syncStorage: SyncStorage, 19 + ) { 20 + this.readableBlockstore = new IpfsReadableBlockstore(blockStore); 21 + } 22 + 23 + /** 24 + * Check if a DID is tracked for replication. 25 + */ 26 + isReplicatedDid(did: string): boolean { 27 + const state = this.syncStorage.getState(did); 28 + return state !== null; 29 + } 30 + 31 + /** 32 + * Load a ReadableRepo for a replicated DID (cached until invalidated). 33 + */ 34 + async getRepo(did: string): Promise<ReadableRepo | null> { 35 + const cached = this.cache.get(did); 36 + if (cached) return cached; 37 + 38 + const state = this.syncStorage.getState(did); 39 + if (!state?.rootCid) return null; 40 + 41 + try { 42 + const commitCid = CID.parse(state.rootCid); 43 + const repo = await ReadableRepo.load( 44 + this.readableBlockstore, 45 + commitCid, 46 + ); 47 + this.cache.set(did, repo); 48 + return repo; 49 + } catch { 50 + return null; 51 + } 52 + } 53 + 54 + /** 55 + * Get a single record from a replicated repo. 56 + */ 57 + async getRecord( 58 + did: string, 59 + collection: string, 60 + rkey: string, 61 + ): Promise<{ cid: string; value: unknown } | null> { 62 + const repo = await this.getRepo(did); 63 + if (!repo) return null; 64 + 65 + // Get the record CID from the MST 66 + const key = `${collection}/${rkey}`; 67 + const recordCid = await repo.data.get(key); 68 + if (!recordCid) return null; 69 + 70 + // Read the record value 71 + const value = await repo.getRecord(collection, rkey); 72 + if (value === null) return null; 73 + 74 + return { cid: recordCid.toString(), value }; 75 + } 76 + 77 + /** 78 + * List records from a collection in a replicated repo. 79 + */ 80 + async listRecords( 81 + did: string, 82 + collection: string, 83 + opts: { limit?: number; cursor?: string; reverse?: boolean } = {}, 84 + ): Promise<{ 85 + records: Array<{ uri: string; cid: string; value: unknown }>; 86 + cursor?: string; 87 + }> { 88 + const repo = await this.getRepo(did); 89 + if (!repo) return { records: [] }; 90 + 91 + const limit = opts.limit ?? 50; 92 + const prefix = `${collection}/`; 93 + const records: Array<{ uri: string; cid: string; value: unknown }> = []; 94 + 95 + // Use walkRecords with a starting key for cursor support 96 + const startKey = opts.cursor 97 + ? `${collection}/${opts.cursor}` 98 + : collection; 99 + 100 + // Collect matching records via MST list 101 + // MST.list(count, after, before) returns Leaf[] sorted by key 102 + const leaves = opts.reverse 103 + ? await repo.data.list(undefined, undefined, `${collection}0`) 104 + : await repo.data.list( 105 + undefined, 106 + opts.cursor ? `${collection}/${opts.cursor}` : undefined, 107 + ); 108 + 109 + for (const leaf of leaves) { 110 + if (records.length >= limit) break; 111 + if (!leaf.key.startsWith(prefix)) { 112 + // If we've passed the collection prefix, stop 113 + if (leaf.key > prefix + "\xff") break; 114 + continue; 115 + } 116 + 117 + const rkey = leaf.key.slice(prefix.length); 118 + const value = await this.readableBlockstore.attemptReadRecord( 119 + leaf.value, 120 + ); 121 + if (value !== null) { 122 + records.push({ 123 + uri: `at://${did}/${collection}/${rkey}`, 124 + cid: leaf.value.toString(), 125 + value, 126 + }); 127 + } 128 + } 129 + 130 + if (opts.reverse) { 131 + records.reverse(); 132 + } 133 + 134 + const result: { 135 + records: Array<{ uri: string; cid: string; value: unknown }>; 136 + cursor?: string; 137 + } = { records }; 138 + 139 + if (records.length === limit) { 140 + const lastRecord = records[records.length - 1]!; 141 + const lastUri = lastRecord.uri; 142 + // cursor is the rkey of the last record 143 + result.cursor = lastUri.split("/").pop()!; 144 + } 145 + 146 + return result; 147 + } 148 + 149 + /** 150 + * Describe a replicated repo (list collections). 151 + */ 152 + async describeRepo( 153 + did: string, 154 + ): Promise<{ did: string; collections: string[] } | null> { 155 + const repo = await this.getRepo(did); 156 + if (!repo) return null; 157 + 158 + const collections = new Set<string>(); 159 + for await (const entry of repo.walkRecords()) { 160 + collections.add(entry.collection); 161 + } 162 + 163 + return { did, collections: Array.from(collections).sort() }; 164 + } 165 + 166 + /** 167 + * Get repo status for a replicated DID. 168 + */ 169 + getRepoStatus( 170 + did: string, 171 + ): { did: string; rev: string | null; rootCid: string | null; active: boolean } | null { 172 + const state = this.syncStorage.getState(did); 173 + if (!state) return null; 174 + 175 + return { 176 + did, 177 + rev: state.lastSyncRev, 178 + rootCid: state.rootCid, 179 + active: state.status === "synced", 180 + }; 181 + } 182 + 183 + /** 184 + * Get repo statuses for all replicated DIDs. 185 + */ 186 + getAllRepoStatuses(): Array<{ 187 + did: string; 188 + rev: string | null; 189 + rootCid: string | null; 190 + active: boolean; 191 + }> { 192 + return this.syncStorage.getAllStates().map((state) => ({ 193 + did: state.did, 194 + rev: state.lastSyncRev, 195 + rootCid: state.rootCid, 196 + active: state.status === "synced", 197 + })); 198 + } 199 + 200 + /** 201 + * Invalidate the cached ReadableRepo for a DID (call after sync). 202 + */ 203 + invalidateCache(did: string): void { 204 + this.cache.delete(did); 205 + } 206 + }
+36 -5
src/replication/replication-manager.ts
··· 9 9 import type { BlockStore, NetworkService } from "../ipfs.js"; 10 10 import type { DidResolver } from "../did-resolver.js"; 11 11 import { readCarWithRoot } from "@atproto/repo"; 12 + import { decode as cborDecode } from "../cbor-compat.js"; 13 + import type { ReplicatedRepoReader } from "./replicated-repo-reader.js"; 12 14 13 15 import { 14 16 PEER_NSID, ··· 50 52 private networkService: NetworkService, 51 53 private didResolver: DidResolver, 52 54 verificationConfig?: Partial<VerificationConfig>, 55 + private replicatedRepoReader?: ReplicatedRepoReader, 53 56 ) { 54 57 this.syncStorage = new SyncStorage(db); 55 58 this.repoFetcher = new RepoFetcher(didResolver); ··· 231 234 } 232 235 this.syncStorage.updateVerifiedAt(did); 233 236 234 - // 8. Determine rev from the root commit 235 - // The root CID string serves as a rev identifier when we can't extract the actual rev 236 - const rev = root.toString(); 237 + // 8. Extract actual rev from the commit block 238 + const rootCidStr = root.toString(); 239 + let rev = rootCidStr; // fallback 240 + const commitBytes = internalMap?.get(rootCidStr); 241 + if (commitBytes) { 242 + try { 243 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 244 + if (typeof commitObj.rev === "string") { 245 + rev = commitObj.rev; 246 + } 247 + } catch { 248 + // If CBOR decode fails, fall back to root CID as rev 249 + } 250 + } 251 + 252 + // 9. Update sync state with both rev and root CID 253 + this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 237 254 238 - // 9. Update sync state 239 - this.syncStorage.updateSyncProgress(did, rev); 255 + // 9b. Invalidate cached ReadableRepo so it reloads with new root 256 + this.replicatedRepoReader?.invalidateCache(did); 240 257 241 258 // 10. Update manifest record 242 259 const rkey = didToRkey(did); ··· 355 372 } 356 373 357 374 return result; 375 + } 376 + 377 + /** 378 + * Get the underlying SyncStorage instance. 379 + */ 380 + getSyncStorage(): SyncStorage { 381 + return this.syncStorage; 382 + } 383 + 384 + /** 385 + * Set the replicated repo reader for cache invalidation after sync. 386 + */ 387 + setReplicatedRepoReader(reader: ReplicatedRepoReader): void { 388 + this.replicatedRepoReader = reader; 358 389 } 359 390 360 391 /**
+636
src/replication/replication.test.ts
··· 25 25 import { BlockVerifier, RemoteVerifier } from "./verification.js"; 26 26 import { RepoFetcher, extractPdsEndpoint } from "./repo-fetcher.js"; 27 27 import { PeerDiscovery } from "./peer-discovery.js"; 28 + import { IpfsReadableBlockstore } from "./ipfs-readable-blockstore.js"; 29 + import { ReplicatedRepoReader } from "./replicated-repo-reader.js"; 30 + import { decode as cborDecode } from "../cbor-compat.js"; 31 + import { Firehose } from "../firehose.js"; 32 + import { createApp } from "../index.js"; 28 33 29 34 /** Create a CID string from raw bytes using SHA-256. */ 30 35 async function makeCidStr(bytes: Uint8Array): Promise<string> { ··· 1118 1123 expect(layer1!.checked).toBe(5); 1119 1124 }); 1120 1125 }); 1126 + 1127 + // ============================================ 1128 + // IpfsReadableBlockstore 1129 + // ============================================ 1130 + 1131 + describe("IpfsReadableBlockstore", () => { 1132 + let tmpDir: string; 1133 + let ipfsService: IpfsService; 1134 + let readableBlockstore: IpfsReadableBlockstore; 1135 + 1136 + beforeEach(async () => { 1137 + tmpDir = mkdtempSync(join(tmpdir(), "ipfs-readable-bs-test-")); 1138 + ipfsService = new IpfsService({ 1139 + blocksPath: join(tmpDir, "blocks"), 1140 + datastorePath: join(tmpDir, "datastore"), 1141 + networking: false, 1142 + }); 1143 + await ipfsService.start(); 1144 + readableBlockstore = new IpfsReadableBlockstore(ipfsService); 1145 + }); 1146 + 1147 + afterEach(async () => { 1148 + if (ipfsService.isRunning()) await ipfsService.stop(); 1149 + rmSync(tmpDir, { recursive: true, force: true }); 1150 + }); 1151 + 1152 + it("getBytes roundtrip", async () => { 1153 + const bytes = new TextEncoder().encode("readable-test"); 1154 + const cidStr = await makeCidStr(bytes); 1155 + await ipfsService.putBlock(cidStr, bytes); 1156 + 1157 + const { CID: MfCID } = await import("multiformats"); 1158 + const cid = MfCID.parse(cidStr); 1159 + const result = await readableBlockstore.getBytes(cid); 1160 + expect(result).not.toBeNull(); 1161 + expect(Buffer.from(result!)).toEqual(Buffer.from(bytes)); 1162 + }); 1163 + 1164 + it("has returns true/false correctly", async () => { 1165 + const bytes = new TextEncoder().encode("has-test"); 1166 + const cidStr = await makeCidStr(bytes); 1167 + await ipfsService.putBlock(cidStr, bytes); 1168 + 1169 + const { CID: MfCID } = await import("multiformats"); 1170 + const present = MfCID.parse(cidStr); 1171 + expect(await readableBlockstore.has(present)).toBe(true); 1172 + 1173 + const missingBytes = new TextEncoder().encode("missing-test"); 1174 + const missingCidStr = await makeCidStr(missingBytes); 1175 + const missing = MfCID.parse(missingCidStr); 1176 + expect(await readableBlockstore.has(missing)).toBe(false); 1177 + }); 1178 + 1179 + it("getBlocks returns blocks + missing", async () => { 1180 + const bytes1 = new TextEncoder().encode("block-a"); 1181 + const cidStr1 = await makeCidStr(bytes1); 1182 + await ipfsService.putBlock(cidStr1, bytes1); 1183 + 1184 + const bytes2 = new TextEncoder().encode("block-b-missing"); 1185 + const cidStr2 = await makeCidStr(bytes2); 1186 + 1187 + const { CID: MfCID } = await import("multiformats"); 1188 + const cid1 = MfCID.parse(cidStr1); 1189 + const cid2 = MfCID.parse(cidStr2); 1190 + 1191 + const { blocks, missing } = await readableBlockstore.getBlocks([ 1192 + cid1, 1193 + cid2, 1194 + ]); 1195 + expect(blocks.has(cid1)).toBe(true); 1196 + expect(missing).toHaveLength(1); 1197 + expect(missing[0]!.toString()).toBe(cidStr2); 1198 + }); 1199 + }); 1200 + 1201 + // ============================================ 1202 + // ReplicatedRepoReader 1203 + // ============================================ 1204 + 1205 + describe("ReplicatedRepoReader", () => { 1206 + let tmpDir: string; 1207 + let sourceDb: InstanceType<typeof Database>; 1208 + let replicaDb: InstanceType<typeof Database>; 1209 + let sourceIpfs: IpfsService; 1210 + let replicaIpfs: IpfsService; 1211 + let sourceRepo: RepoManager; 1212 + let syncStorage: SyncStorage; 1213 + let reader: ReplicatedRepoReader; 1214 + const sourceDid = "did:plc:test123"; 1215 + 1216 + beforeEach(async () => { 1217 + tmpDir = mkdtempSync(join(tmpdir(), "replicated-reader-test-")); 1218 + 1219 + // Source setup 1220 + const sourceConfig = testConfig(join(tmpDir, "source"), []); 1221 + sourceDb = new Database(join(tmpDir, "source.db")); 1222 + sourceIpfs = new IpfsService({ 1223 + blocksPath: join(tmpDir, "source-ipfs-blocks"), 1224 + datastorePath: join(tmpDir, "source-ipfs-datastore"), 1225 + networking: false, 1226 + }); 1227 + await sourceIpfs.start(); 1228 + sourceRepo = new RepoManager(sourceDb, sourceConfig); 1229 + sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1230 + 1231 + // Replica IPFS + sync storage 1232 + replicaDb = new Database(join(tmpDir, "replica.db")); 1233 + replicaIpfs = new IpfsService({ 1234 + blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1235 + datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1236 + networking: false, 1237 + }); 1238 + await replicaIpfs.start(); 1239 + 1240 + syncStorage = new SyncStorage(replicaDb); 1241 + syncStorage.initSchema(); 1242 + 1243 + reader = new ReplicatedRepoReader(replicaIpfs, syncStorage); 1244 + }); 1245 + 1246 + afterEach(async () => { 1247 + if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1248 + if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1249 + sourceDb.close(); 1250 + replicaDb.close(); 1251 + rmSync(tmpDir, { recursive: true, force: true }); 1252 + }); 1253 + 1254 + /** Helper: create records in source, export CAR, store blocks in replica IPFS, record root_cid. */ 1255 + async function replicateSource(): Promise<{ rootCid: string; rev: string }> { 1256 + const carBytes = await sourceRepo.getRepoCar(); 1257 + const { root, blocks } = await readCarWithRoot(carBytes); 1258 + await replicaIpfs.putBlocks(blocks); 1259 + 1260 + const rootCidStr = root.toString(); 1261 + 1262 + // Extract rev from commit block 1263 + const internalMap = ( 1264 + blocks as unknown as { map: Map<string, Uint8Array> } 1265 + ).map; 1266 + let rev = rootCidStr; 1267 + const commitBytes = internalMap?.get(rootCidStr); 1268 + if (commitBytes) { 1269 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1270 + if (typeof commitObj.rev === "string") { 1271 + rev = commitObj.rev; 1272 + } 1273 + } 1274 + 1275 + // Record in sync storage 1276 + syncStorage.upsertState({ 1277 + did: sourceDid, 1278 + pdsEndpoint: "https://pds.example.com", 1279 + }); 1280 + syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); 1281 + 1282 + return { rootCid: rootCidStr, rev }; 1283 + } 1284 + 1285 + it("returns null for unknown DID", async () => { 1286 + const result = await reader.getRecord( 1287 + "did:plc:unknown", 1288 + "app.bsky.feed.post", 1289 + "abc", 1290 + ); 1291 + expect(result).toBeNull(); 1292 + expect(reader.isReplicatedDid("did:plc:unknown")).toBe(false); 1293 + }); 1294 + 1295 + it("getRecord returns record with correct CID and value", async () => { 1296 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1297 + $type: "app.bsky.feed.post", 1298 + text: "Hello replicated!", 1299 + createdAt: "2025-01-01T00:00:00.000Z", 1300 + }); 1301 + await replicateSource(); 1302 + 1303 + // List records from source to get the rkey 1304 + const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", { 1305 + limit: 10, 1306 + }); 1307 + const firstRecord = sourceRecords.records[0]!; 1308 + const rkey = firstRecord.uri.split("/").pop()!; 1309 + 1310 + const result = await reader.getRecord( 1311 + sourceDid, 1312 + "app.bsky.feed.post", 1313 + rkey, 1314 + ); 1315 + expect(result).not.toBeNull(); 1316 + expect(result!.cid).toBeDefined(); 1317 + expect(typeof result!.cid).toBe("string"); 1318 + const value = result!.value as Record<string, unknown>; 1319 + expect(value.text).toBe("Hello replicated!"); 1320 + }); 1321 + 1322 + it("listRecords returns records with cursor support", async () => { 1323 + // Create multiple records 1324 + for (let i = 0; i < 5; i++) { 1325 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1326 + $type: "app.bsky.feed.post", 1327 + text: `Post ${i}`, 1328 + createdAt: new Date().toISOString(), 1329 + }); 1330 + } 1331 + await replicateSource(); 1332 + 1333 + // First page 1334 + const page1 = await reader.listRecords(sourceDid, "app.bsky.feed.post", { 1335 + limit: 3, 1336 + }); 1337 + expect(page1.records).toHaveLength(3); 1338 + expect(page1.cursor).toBeDefined(); 1339 + 1340 + // Second page 1341 + const page2 = await reader.listRecords(sourceDid, "app.bsky.feed.post", { 1342 + limit: 3, 1343 + cursor: page1.cursor, 1344 + }); 1345 + expect(page2.records).toHaveLength(2); 1346 + 1347 + // All records have correct URIs 1348 + for (const rec of [...page1.records, ...page2.records]) { 1349 + expect(rec.uri).toMatch( 1350 + new RegExp(`^at://${sourceDid}/app\\.bsky\\.feed\\.post/`), 1351 + ); 1352 + expect(rec.cid).toBeDefined(); 1353 + expect(rec.value).toBeDefined(); 1354 + } 1355 + }); 1356 + 1357 + it("describeRepo returns collections list", async () => { 1358 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1359 + $type: "app.bsky.feed.post", 1360 + text: "post", 1361 + createdAt: new Date().toISOString(), 1362 + }); 1363 + await sourceRepo.putRecord("app.bsky.actor.profile", "self", { 1364 + $type: "app.bsky.actor.profile", 1365 + displayName: "Test", 1366 + }); 1367 + await replicateSource(); 1368 + 1369 + const result = await reader.describeRepo(sourceDid); 1370 + expect(result).not.toBeNull(); 1371 + expect(result!.did).toBe(sourceDid); 1372 + expect(result!.collections).toContain("app.bsky.feed.post"); 1373 + expect(result!.collections).toContain("app.bsky.actor.profile"); 1374 + }); 1375 + 1376 + it("getRepoStatus returns rev and root CID", async () => { 1377 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1378 + $type: "app.bsky.feed.post", 1379 + text: "status test", 1380 + createdAt: new Date().toISOString(), 1381 + }); 1382 + const { rootCid, rev } = await replicateSource(); 1383 + 1384 + const status = reader.getRepoStatus(sourceDid); 1385 + expect(status).not.toBeNull(); 1386 + expect(status!.did).toBe(sourceDid); 1387 + expect(status!.rootCid).toBe(rootCid); 1388 + expect(status!.rev).toBe(rev); 1389 + expect(status!.active).toBe(true); 1390 + }); 1391 + 1392 + it("cache invalidation works", async () => { 1393 + // First sync 1394 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1395 + $type: "app.bsky.feed.post", 1396 + text: "First post", 1397 + createdAt: new Date().toISOString(), 1398 + }); 1399 + await replicateSource(); 1400 + 1401 + const result1 = await reader.listRecords( 1402 + sourceDid, 1403 + "app.bsky.feed.post", 1404 + { limit: 100 }, 1405 + ); 1406 + expect(result1.records).toHaveLength(1); 1407 + 1408 + // Add another record and re-sync 1409 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1410 + $type: "app.bsky.feed.post", 1411 + text: "Second post", 1412 + createdAt: new Date().toISOString(), 1413 + }); 1414 + reader.invalidateCache(sourceDid); 1415 + await replicateSource(); 1416 + 1417 + const result2 = await reader.listRecords( 1418 + sourceDid, 1419 + "app.bsky.feed.post", 1420 + { limit: 100 }, 1421 + ); 1422 + expect(result2.records).toHaveLength(2); 1423 + }); 1424 + }); 1425 + 1426 + // ============================================ 1427 + // XRPC integration (replicated repos) 1428 + // ============================================ 1429 + 1430 + describe("XRPC integration: replicated repos", () => { 1431 + let tmpDir: string; 1432 + let sourceDb: InstanceType<typeof Database>; 1433 + let replicaDb: InstanceType<typeof Database>; 1434 + let sourceIpfs: IpfsService; 1435 + let replicaIpfs: IpfsService; 1436 + let sourceRepo: RepoManager; 1437 + let replicaRepo: RepoManager; 1438 + let syncStorage: SyncStorage; 1439 + let reader: ReplicatedRepoReader; 1440 + let app: ReturnType<typeof createApp>; 1441 + const sourceDid = "did:plc:test123"; 1442 + const replicaDid = "did:plc:replica456"; 1443 + 1444 + beforeEach(async () => { 1445 + tmpDir = mkdtempSync(join(tmpdir(), "xrpc-replicated-test-")); 1446 + 1447 + // Source setup 1448 + const sourceConfig = testConfig(join(tmpDir, "source"), []); 1449 + sourceDb = new Database(join(tmpDir, "source.db")); 1450 + sourceIpfs = new IpfsService({ 1451 + blocksPath: join(tmpDir, "source-ipfs-blocks"), 1452 + datastorePath: join(tmpDir, "source-ipfs-datastore"), 1453 + networking: false, 1454 + }); 1455 + await sourceIpfs.start(); 1456 + sourceRepo = new RepoManager(sourceDb, sourceConfig); 1457 + sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1458 + 1459 + // Replica setup 1460 + const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); 1461 + replicaConfig.DID = replicaDid; 1462 + replicaConfig.SIGNING_KEY = 1463 + "0000000000000000000000000000000000000000000000000000000000000002"; 1464 + replicaDb = new Database(join(tmpDir, "replica.db")); 1465 + replicaIpfs = new IpfsService({ 1466 + blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1467 + datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1468 + networking: false, 1469 + }); 1470 + await replicaIpfs.start(); 1471 + replicaRepo = new RepoManager(replicaDb, replicaConfig); 1472 + replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 1473 + 1474 + syncStorage = new SyncStorage(replicaDb); 1475 + syncStorage.initSchema(); 1476 + reader = new ReplicatedRepoReader(replicaIpfs, syncStorage); 1477 + 1478 + const firehose = new Firehose(replicaRepo); 1479 + app = createApp( 1480 + replicaConfig, 1481 + replicaRepo, 1482 + firehose, 1483 + replicaIpfs, 1484 + replicaIpfs, 1485 + undefined, 1486 + undefined, 1487 + reader, 1488 + ); 1489 + }); 1490 + 1491 + afterEach(async () => { 1492 + if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1493 + if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1494 + sourceDb.close(); 1495 + replicaDb.close(); 1496 + rmSync(tmpDir, { recursive: true, force: true }); 1497 + }); 1498 + 1499 + async function replicateSource(): Promise<void> { 1500 + const carBytes = await sourceRepo.getRepoCar(); 1501 + const { root, blocks } = await readCarWithRoot(carBytes); 1502 + await replicaIpfs.putBlocks(blocks); 1503 + 1504 + const rootCidStr = root.toString(); 1505 + const internalMap = ( 1506 + blocks as unknown as { map: Map<string, Uint8Array> } 1507 + ).map; 1508 + let rev = rootCidStr; 1509 + const commitBytes = internalMap?.get(rootCidStr); 1510 + if (commitBytes) { 1511 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1512 + if (typeof commitObj.rev === "string") { 1513 + rev = commitObj.rev; 1514 + } 1515 + } 1516 + 1517 + syncStorage.upsertState({ 1518 + did: sourceDid, 1519 + pdsEndpoint: "https://pds.example.com", 1520 + }); 1521 + syncStorage.updateSyncProgress(sourceDid, rev, rootCidStr); 1522 + reader.invalidateCache(sourceDid); 1523 + } 1524 + 1525 + it("GET getRecord for replicated DID → 200", async () => { 1526 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1527 + $type: "app.bsky.feed.post", 1528 + text: "XRPC replicated test", 1529 + createdAt: "2025-01-01T00:00:00.000Z", 1530 + }); 1531 + await replicateSource(); 1532 + 1533 + const sourceRecords = await sourceRepo.listRecords("app.bsky.feed.post", { 1534 + limit: 10, 1535 + }); 1536 + const rkey = sourceRecords.records[0]!.uri.split("/").pop()!; 1537 + 1538 + const res = await app.request( 1539 + `/xrpc/com.atproto.repo.getRecord?repo=${sourceDid}&collection=app.bsky.feed.post&rkey=${rkey}`, 1540 + undefined, 1541 + {}, 1542 + ); 1543 + expect(res.status).toBe(200); 1544 + 1545 + const json = (await res.json()) as { 1546 + uri: string; 1547 + cid: string; 1548 + value: Record<string, unknown>; 1549 + }; 1550 + expect(json.uri).toContain(sourceDid); 1551 + expect(json.value.text).toBe("XRPC replicated test"); 1552 + }); 1553 + 1554 + it("GET listRecords for replicated DID → 200", async () => { 1555 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1556 + $type: "app.bsky.feed.post", 1557 + text: "list test 1", 1558 + createdAt: new Date().toISOString(), 1559 + }); 1560 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1561 + $type: "app.bsky.feed.post", 1562 + text: "list test 2", 1563 + createdAt: new Date().toISOString(), 1564 + }); 1565 + await replicateSource(); 1566 + 1567 + const res = await app.request( 1568 + `/xrpc/com.atproto.repo.listRecords?repo=${sourceDid}&collection=app.bsky.feed.post`, 1569 + undefined, 1570 + {}, 1571 + ); 1572 + expect(res.status).toBe(200); 1573 + 1574 + const json = (await res.json()) as { 1575 + records: Array<{ uri: string; value: Record<string, unknown> }>; 1576 + }; 1577 + expect(json.records.length).toBe(2); 1578 + }); 1579 + 1580 + it("GET describeRepo for replicated DID → 200", async () => { 1581 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1582 + $type: "app.bsky.feed.post", 1583 + text: "describe test", 1584 + createdAt: new Date().toISOString(), 1585 + }); 1586 + await replicateSource(); 1587 + 1588 + const res = await app.request( 1589 + `/xrpc/com.atproto.repo.describeRepo?repo=${sourceDid}`, 1590 + undefined, 1591 + {}, 1592 + ); 1593 + expect(res.status).toBe(200); 1594 + 1595 + const json = (await res.json()) as { 1596 + did: string; 1597 + collections: string[]; 1598 + }; 1599 + expect(json.did).toBe(sourceDid); 1600 + expect(json.collections).toContain("app.bsky.feed.post"); 1601 + }); 1602 + 1603 + it("non-replicated foreign DID → 404", async () => { 1604 + const res = await app.request( 1605 + `/xrpc/com.atproto.repo.getRecord?repo=did:plc:nonexistent&collection=app.bsky.feed.post&rkey=abc`, 1606 + undefined, 1607 + {}, 1608 + ); 1609 + expect(res.status).toBe(404); 1610 + }); 1611 + 1612 + it("local DID still works via existing path", async () => { 1613 + await replicaRepo.createRecord("app.bsky.feed.post", undefined, { 1614 + $type: "app.bsky.feed.post", 1615 + text: "local post", 1616 + createdAt: new Date().toISOString(), 1617 + }); 1618 + 1619 + const records = await replicaRepo.listRecords("app.bsky.feed.post", { 1620 + limit: 10, 1621 + }); 1622 + const rkey = records.records[0]!.uri.split("/").pop()!; 1623 + 1624 + const res = await app.request( 1625 + `/xrpc/com.atproto.repo.getRecord?repo=${replicaDid}&collection=app.bsky.feed.post&rkey=${rkey}`, 1626 + undefined, 1627 + {}, 1628 + ); 1629 + expect(res.status).toBe(200); 1630 + 1631 + const json = (await res.json()) as { 1632 + uri: string; 1633 + value: Record<string, unknown>; 1634 + }; 1635 + expect(json.value.text).toBe("local post"); 1636 + }); 1637 + }); 1638 + 1639 + // ============================================ 1640 + // root_cid + rev extraction 1641 + // ============================================ 1642 + 1643 + describe("root_cid + rev extraction", () => { 1644 + let tmpDir: string; 1645 + let sourceDb: InstanceType<typeof Database>; 1646 + let sourceIpfs: IpfsService; 1647 + let sourceRepo: RepoManager; 1648 + let replicaDb: InstanceType<typeof Database>; 1649 + let replicaIpfs: IpfsService; 1650 + let syncStorage: SyncStorage; 1651 + 1652 + beforeEach(async () => { 1653 + tmpDir = mkdtempSync(join(tmpdir(), "rev-extraction-test-")); 1654 + 1655 + const sourceConfig = testConfig(join(tmpDir, "source"), []); 1656 + sourceDb = new Database(join(tmpDir, "source.db")); 1657 + sourceIpfs = new IpfsService({ 1658 + blocksPath: join(tmpDir, "source-ipfs-blocks"), 1659 + datastorePath: join(tmpDir, "source-ipfs-datastore"), 1660 + networking: false, 1661 + }); 1662 + await sourceIpfs.start(); 1663 + sourceRepo = new RepoManager(sourceDb, sourceConfig); 1664 + sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 1665 + 1666 + replicaDb = new Database(join(tmpDir, "replica.db")); 1667 + replicaIpfs = new IpfsService({ 1668 + blocksPath: join(tmpDir, "replica-ipfs-blocks"), 1669 + datastorePath: join(tmpDir, "replica-ipfs-datastore"), 1670 + networking: false, 1671 + }); 1672 + await replicaIpfs.start(); 1673 + 1674 + syncStorage = new SyncStorage(replicaDb); 1675 + syncStorage.initSchema(); 1676 + }); 1677 + 1678 + afterEach(async () => { 1679 + if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 1680 + if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1681 + sourceDb.close(); 1682 + replicaDb.close(); 1683 + rmSync(tmpDir, { recursive: true, force: true }); 1684 + }); 1685 + 1686 + it("after sync, replication_state has both root_cid and last_sync_rev (actual TID)", async () => { 1687 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1688 + $type: "app.bsky.feed.post", 1689 + text: "rev test", 1690 + createdAt: new Date().toISOString(), 1691 + }); 1692 + 1693 + const carBytes = await sourceRepo.getRepoCar(); 1694 + const { root, blocks } = await readCarWithRoot(carBytes); 1695 + await replicaIpfs.putBlocks(blocks); 1696 + 1697 + const rootCidStr = root.toString(); 1698 + const internalMap = ( 1699 + blocks as unknown as { map: Map<string, Uint8Array> } 1700 + ).map; 1701 + let rev = rootCidStr; 1702 + const commitBytes = internalMap?.get(rootCidStr); 1703 + if (commitBytes) { 1704 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1705 + if (typeof commitObj.rev === "string") { 1706 + rev = commitObj.rev; 1707 + } 1708 + } 1709 + 1710 + const did = "did:plc:test123"; 1711 + syncStorage.upsertState({ 1712 + did, 1713 + pdsEndpoint: "https://pds.example.com", 1714 + }); 1715 + syncStorage.updateSyncProgress(did, rev, rootCidStr); 1716 + 1717 + const state = syncStorage.getState(did); 1718 + expect(state).not.toBeNull(); 1719 + expect(state!.rootCid).toBe(rootCidStr); 1720 + expect(state!.lastSyncRev).toBe(rev); 1721 + // Rev should be a TID (not a CID) 1722 + expect(state!.lastSyncRev).not.toBe(rootCidStr); 1723 + }); 1724 + 1725 + it("root_cid is a valid CID string and last_sync_rev is a TID", async () => { 1726 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 1727 + $type: "app.bsky.feed.post", 1728 + text: "cid validation test", 1729 + createdAt: new Date().toISOString(), 1730 + }); 1731 + 1732 + const carBytes = await sourceRepo.getRepoCar(); 1733 + const { root, blocks } = await readCarWithRoot(carBytes); 1734 + await replicaIpfs.putBlocks(blocks); 1735 + 1736 + const rootCidStr = root.toString(); 1737 + const internalMap = ( 1738 + blocks as unknown as { map: Map<string, Uint8Array> } 1739 + ).map; 1740 + let rev = rootCidStr; 1741 + const commitBytes = internalMap?.get(rootCidStr); 1742 + if (commitBytes) { 1743 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 1744 + if (typeof commitObj.rev === "string") { 1745 + rev = commitObj.rev; 1746 + } 1747 + } 1748 + 1749 + // root_cid should be parseable as a CID 1750 + const { CID: MfCID } = await import("multiformats"); 1751 + expect(() => MfCID.parse(rootCidStr)).not.toThrow(); 1752 + 1753 + // rev should be a TID (base32-sortable, 13 chars) 1754 + expect(rev).toMatch(/^[a-z2-7]{13}$/); 1755 + }); 1756 + });
+16 -3
src/replication/sync-storage.ts
··· 20 20 peer_id TEXT, 21 21 peer_info_fetched_at TEXT, 22 22 last_sync_rev TEXT, 23 + root_cid TEXT, 23 24 last_sync_at TEXT, 24 25 last_verified_at TEXT, 25 26 status TEXT NOT NULL DEFAULT 'pending', ··· 32 33 PRIMARY KEY (did, cid) 33 34 ); 34 35 `); 36 + 37 + // Migration: add root_cid column if missing (for existing databases) 38 + const columns = this.db 39 + .prepare("PRAGMA table_info(replication_state)") 40 + .all() as Array<{ name: string }>; 41 + if (!columns.some((c) => c.name === "root_cid")) { 42 + this.db.exec( 43 + "ALTER TABLE replication_state ADD COLUMN root_cid TEXT", 44 + ); 45 + } 35 46 } 36 47 37 48 /** ··· 84 95 /** 85 96 * Update sync progress after a successful sync. 86 97 */ 87 - updateSyncProgress(did: string, rev: string): void { 98 + updateSyncProgress(did: string, rev: string, rootCid?: string): void { 88 99 this.db 89 100 .prepare( 90 101 `UPDATE replication_state 91 - SET last_sync_rev = ?, last_sync_at = datetime('now'), 102 + SET last_sync_rev = ?, root_cid = COALESCE(?, root_cid), 103 + last_sync_at = datetime('now'), 92 104 status = 'synced', error_message = NULL 93 105 WHERE did = ?`, 94 106 ) 95 - .run(rev, did); 107 + .run(rev, rootCid ?? null, did); 96 108 } 97 109 98 110 /** ··· 205 217 peerId: (row.peer_id as string) ?? null, 206 218 peerInfoFetchedAt: (row.peer_info_fetched_at as string) ?? null, 207 219 lastSyncRev: (row.last_sync_rev as string) ?? null, 220 + rootCid: (row.root_cid as string) ?? null, 208 221 lastSyncAt: (row.last_sync_at as string) ?? null, 209 222 lastVerifiedAt: (row.last_verified_at as string) ?? null, 210 223 status: row.status as SyncState["status"],
+1
src/replication/types.ts
··· 31 31 peerId: string | null; 32 32 peerInfoFetchedAt: string | null; 33 33 lastSyncRev: string | null; 34 + rootCid: string | null; 34 35 lastSyncAt: string | null; 35 36 lastVerifiedAt: string | null; 36 37 status: "pending" | "syncing" | "synced" | "error";
+9 -2
src/server.ts
··· 15 15 import { DidResolver } from "./did-resolver.js"; 16 16 import { InMemoryDidCache } from "./did-cache.js"; 17 17 import { ReplicationManager } from "./replication/replication-manager.js"; 18 + import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 18 19 19 20 // Load configuration 20 21 const config = loadConfig(); ··· 57 58 didCache: new InMemoryDidCache(), 58 59 }); 59 60 60 - // Initialize replication manager (if IPFS enabled and DIDs configured) 61 + // Initialize replication manager and replicated repo reader (if IPFS enabled and DIDs configured) 61 62 let replicationManager: ReplicationManager | undefined; 63 + let replicatedRepoReader: ReplicatedRepoReader | undefined; 62 64 if (ipfsService && config.REPLICATE_DIDS.length > 0) { 63 65 replicationManager = new ReplicationManager( 64 66 db, ··· 68 70 ipfsService, 69 71 didResolver, 70 72 ); 73 + replicatedRepoReader = new ReplicatedRepoReader( 74 + ipfsService, 75 + replicationManager.getSyncStorage(), 76 + ); 77 + replicationManager.setReplicatedRepoReader(replicatedRepoReader); 71 78 } 72 79 73 80 // Create Hono app 74 - const app = createApp(config, repoManager, firehose, ipfsService, ipfsService, blobStore, replicationManager); 81 + const app = createApp(config, repoManager, firehose, ipfsService, ipfsService, blobStore, replicationManager, replicatedRepoReader); 75 82 76 83 // Create HTTP server using @hono/node-server's request listener 77 84 const requestListener = getRequestListener(app.fetch);
+98
src/xrpc/repo.ts
··· 4 4 import type { AppEnv, AuthedAppEnv } from "../types.js"; 5 5 import { validator } from "../validation.js"; 6 6 import { detectContentType } from "../format.js"; 7 + import type { ReplicatedRepoReader } from "../replication/replicated-repo-reader.js"; 7 8 8 9 function invalidRecordError( 9 10 c: Context<AuthedAppEnv>, ··· 511 512 512 513 return c.json(result); 513 514 } 515 + 516 + // ============================================ 517 + // Replicated repo handlers 518 + // ============================================ 519 + 520 + export async function getRecordReplicated( 521 + c: Context<AppEnv>, 522 + reader: ReplicatedRepoReader, 523 + did: string, 524 + ): Promise<Response> { 525 + const collection = c.req.query("collection"); 526 + const rkey = c.req.query("rkey"); 527 + 528 + if (!collection || !rkey) { 529 + return c.json( 530 + { 531 + error: "InvalidRequest", 532 + message: "Missing required parameters: repo, collection, rkey", 533 + }, 534 + 400, 535 + ); 536 + } 537 + 538 + const result = await reader.getRecord(did, collection, rkey); 539 + 540 + if (!result) { 541 + return c.json( 542 + { 543 + error: "RecordNotFound", 544 + message: `Record not found: ${collection}/${rkey}`, 545 + }, 546 + 404, 547 + ); 548 + } 549 + 550 + return c.json({ 551 + uri: `at://${did}/${collection}/${rkey}`, 552 + cid: result.cid, 553 + value: result.value, 554 + }); 555 + } 556 + 557 + export async function listRecordsReplicated( 558 + c: Context<AppEnv>, 559 + reader: ReplicatedRepoReader, 560 + did: string, 561 + ): Promise<Response> { 562 + const collection = c.req.query("collection"); 563 + const limitStr = c.req.query("limit"); 564 + const cursor = c.req.query("cursor"); 565 + const reverseStr = c.req.query("reverse"); 566 + 567 + if (!collection) { 568 + return c.json( 569 + { 570 + error: "InvalidRequest", 571 + message: "Missing required parameters: repo, collection", 572 + }, 573 + 400, 574 + ); 575 + } 576 + 577 + const limit = Math.min(limitStr ? Number.parseInt(limitStr, 10) : 50, 100); 578 + const reverse = reverseStr === "true"; 579 + 580 + const result = await reader.listRecords(did, collection, { 581 + limit, 582 + cursor: cursor || undefined, 583 + reverse, 584 + }); 585 + 586 + return c.json(result); 587 + } 588 + 589 + export async function describeRepoReplicated( 590 + c: Context<AppEnv>, 591 + reader: ReplicatedRepoReader, 592 + did: string, 593 + ): Promise<Response> { 594 + const result = await reader.describeRepo(did); 595 + 596 + if (!result) { 597 + return c.json( 598 + { 599 + error: "RepoNotFound", 600 + message: `Repository not found: ${did}`, 601 + }, 602 + 404, 603 + ); 604 + } 605 + 606 + return c.json({ 607 + did: result.did, 608 + collections: result.collections, 609 + handleIsCorrect: false, 610 + }); 611 + }
+54 -22
src/xrpc/sync.ts
··· 3 3 import type { RepoManager } from "../repo-manager.js"; 4 4 import type { AppEnv } from "../types.js"; 5 5 import { detectContentType } from "../format.js"; 6 + import type { ReplicatedRepoReader } from "../replication/replicated-repo-reader.js"; 6 7 7 8 export async function getRepo( 8 9 c: Context<AppEnv>, ··· 45 46 export async function getRepoStatus( 46 47 c: Context<AppEnv>, 47 48 repoManager: RepoManager, 49 + replicatedRepoReader?: ReplicatedRepoReader, 48 50 ): Promise<Response> { 49 51 const did = c.req.query("did"); 50 52 ··· 62 64 ); 63 65 } 64 66 65 - if (did !== c.env.DID) { 66 - return c.json( 67 - { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 68 - 404, 69 - ); 67 + if (did === c.env.DID) { 68 + const data = await repoManager.getRepoStatus(); 69 + return c.json({ 70 + did: data.did, 71 + active: true, 72 + status: "active", 73 + rev: data.rev, 74 + }); 70 75 } 71 76 72 - const data = await repoManager.getRepoStatus(); 77 + // Check replicated repos 78 + if (replicatedRepoReader) { 79 + const status = replicatedRepoReader.getRepoStatus(did); 80 + if (status) { 81 + return c.json({ 82 + did: status.did, 83 + active: status.active, 84 + status: status.active ? "active" : "deactivated", 85 + rev: status.rev, 86 + }); 87 + } 88 + } 73 89 74 - return c.json({ 75 - did: data.did, 76 - active: true, 77 - status: "active", 78 - rev: data.rev, 79 - }); 90 + return c.json( 91 + { error: "RepoNotFound", message: `Repository not found for DID: ${did}` }, 92 + 404, 93 + ); 80 94 } 81 95 82 96 export async function listRepos( 83 97 c: Context<AppEnv>, 84 98 repoManager: RepoManager, 99 + replicatedRepoReader?: ReplicatedRepoReader, 85 100 ): Promise<Response> { 86 101 const data = await repoManager.getRepoStatus(); 87 102 88 - return c.json({ 89 - repos: [ 90 - { 91 - did: data.did, 92 - head: data.head, 93 - rev: data.rev, 94 - active: true, 95 - }, 96 - ], 97 - }); 103 + const repos: Array<{ 104 + did: string; 105 + head?: string; 106 + rev: string | null; 107 + active: boolean; 108 + }> = [ 109 + { 110 + did: data.did, 111 + head: data.head, 112 + rev: data.rev, 113 + active: true, 114 + }, 115 + ]; 116 + 117 + // Include replicated repos if reader is available 118 + if (replicatedRepoReader) { 119 + for (const status of replicatedRepoReader.getAllRepoStatuses()) { 120 + repos.push({ 121 + did: status.did, 122 + head: status.rootCid ?? undefined, 123 + rev: status.rev, 124 + active: status.active, 125 + }); 126 + } 127 + } 128 + 129 + return c.json({ repos }); 98 130 } 99 131 100 132 export async function listBlobs(