WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at main 663 lines 22 kB view raw
1import type { Database } from "@atbb/db"; 2import { forums, backfillProgress, backfillErrors, users } from "@atbb/db"; 3import { eq, asc, gt } from "drizzle-orm"; 4import { AtpAgent } from "@atproto/api"; 5import { CursorManager } from "./cursor-manager.js"; 6import type { AppConfig } from "./config.js"; 7import type { Indexer } from "./indexer.js"; 8import { isProgrammingError } from "./errors.js"; 9import type { Logger } from "@atbb/logger"; 10 11/** 12 * Maps AT Proto collection NSIDs to Indexer handler method names. 13 * Order matters: sync forum-owned records first (FK dependencies). 14 */ 15// These collections define the sync order. Used by performBackfill() in Task 6. 16export const FORUM_OWNED_COLLECTIONS = [ 17 "space.atbb.forum.forum", 18 "space.atbb.forum.category", 19 "space.atbb.forum.board", 20 "space.atbb.forum.role", 21 "space.atbb.modAction", 22 "space.atbb.forum.theme", 23 "space.atbb.forum.themePolicy", 24] as const; 25 26export const USER_OWNED_COLLECTIONS = [ 27 "space.atbb.membership", 28 "space.atbb.post", 29] as const; 30 31const COLLECTION_HANDLER_MAP: Record<string, string> = { 32 "space.atbb.post": "handlePostCreate", 33 "space.atbb.forum.forum": "handleForumCreate", 34 "space.atbb.forum.category": "handleCategoryCreate", 35 "space.atbb.forum.board": "handleBoardCreate", 36 "space.atbb.forum.role": "handleRoleCreate", 37 "space.atbb.membership": "handleMembershipCreate", 38 "space.atbb.modAction": "handleModActionCreate", 39 "space.atbb.forum.theme": "handleThemeCreate", 40 "space.atbb.forum.themePolicy": "handleThemePolicyCreate", 41}; 42 43export enum BackfillStatus { 44 NotNeeded = "not_needed", 45 CatchUp = "catch_up", 46 FullSync = "full_sync", 47} 48 49export interface BackfillResult { 50 backfillId: bigint; 51 type: BackfillStatus; 52 didsProcessed: number; 53 recordsIndexed: number; 54 errors: number; 55 durationMs: number; 56} 57 58export interface SyncStats { 59 recordsFound: number; 60 recordsIndexed: number; 61 errors: number; 62} 63 64export class BackfillManager { 65 private cursorManager: CursorManager; 66 private isRunning = false; 67 private indexer: Indexer | null = null; 68 69 constructor( 70 private db: Database, 71 private config: AppConfig, 72 private logger: Logger, 73 ) { 74 this.cursorManager = new CursorManager(db, logger); 75 } 76 77 /** 78 * Inject the Indexer instance. Called during AppContext wiring. 79 */ 80 setIndexer(indexer: Indexer): void { 81 this.indexer = indexer; 82 } 83 84 /** 85 * Sync all records from a single (DID, collection) pair via listRecords. 86 * Feeds each record through the matching Indexer handler. 87 */ 88 async syncRepoRecords( 89 did: string, 90 collection: string, 91 agent: AtpAgent 92 ): Promise<SyncStats> { 93 const stats: SyncStats = { recordsFound: 0, recordsIndexed: 0, errors: 0 }; 94 const handlerName = COLLECTION_HANDLER_MAP[collection]; 95 96 if (!handlerName || !this.indexer) { 97 this.logger.error("backfill.sync_skipped", { 98 event: "backfill.sync_skipped", 99 did, 100 collection, 101 reason: !handlerName ? "unknown_collection" : "indexer_not_set", 102 }); 103 stats.errors = 1; 104 return stats; 105 } 106 107 const handler = (this.indexer as any)[handlerName].bind(this.indexer); 108 const delayMs = 1000 / this.config.backfillRateLimit; 109 let cursor: string | undefined; 110 111 try { 112 do { 113 const response = await agent.com.atproto.repo.listRecords({ 114 repo: did, 115 collection, 116 limit: 100, 117 cursor, 118 }); 119 120 const records = response.data.records; 121 stats.recordsFound += records.length; 122 123 for (const record of records) { 124 try { 125 const rkey = record.uri.split("/").pop()!; 126 const event = { 127 did, 128 commit: { rkey, cid: record.cid, record: record.value }, 129 }; 130 await handler(event); 131 stats.recordsIndexed++; 132 } catch (error) { 133 if (isProgrammingError(error)) throw error; 134 stats.errors++; 135 this.logger.error("backfill.record_error", { 136 event: "backfill.record_error", 137 did, 138 collection, 139 uri: record.uri, 140 error: error instanceof Error ? error.message : String(error), 141 }); 142 } 143 } 144 145 cursor = response.data.cursor; 146 147 // Rate limiting: delay between page fetches 148 if (cursor) { 149 await new Promise((resolve) => setTimeout(resolve, delayMs)); 150 } 151 } while (cursor); 152 } catch (error) { 153 if (isProgrammingError(error)) throw error; 154 stats.errors++; 155 this.logger.error("backfill.pds_error", { 156 event: "backfill.pds_error", 157 did, 158 collection, 159 error: error instanceof Error ? error.message : String(error), 160 }); 161 } 162 163 return stats; 164 } 165 166 /** 167 * Determine if backfill is needed based on cursor state and DB contents. 168 */ 169 async checkIfNeeded(cursor: bigint | null): Promise<BackfillStatus> { 170 // No cursor at all → first startup or wiped cursor 171 if (cursor === null) { 172 this.logger.info("backfill.decision", { 173 event: "backfill.decision", 174 status: BackfillStatus.FullSync, 175 reason: "no_cursor", 176 }); 177 return BackfillStatus.FullSync; 178 } 179 180 // Check if DB has forum data (consistency check) 181 let forum: { rkey: string } | undefined; 182 try { 183 const results = await this.db 184 .select() 185 .from(forums) 186 .where(eq(forums.rkey, "self")) 187 .limit(1); 188 forum = results[0]; 189 } catch (error) { 190 this.logger.error("backfill.decision", { 191 event: "backfill.decision", 192 status: BackfillStatus.FullSync, 193 reason: "db_query_failed", 194 error: error instanceof Error ? error.message : String(error), 195 }); 196 return BackfillStatus.FullSync; 197 } 198 199 if (!forum) { 200 this.logger.info("backfill.decision", { 201 event: "backfill.decision", 202 status: BackfillStatus.FullSync, 203 reason: "db_inconsistency", 204 cursorTimestamp: cursor.toString(), 205 }); 206 return BackfillStatus.FullSync; 207 } 208 209 // Check cursor age 210 const ageHours = this.cursorManager.getCursorAgeHours(cursor)!; 211 if (ageHours > this.config.backfillCursorMaxAgeHours) { 212 this.logger.info("backfill.decision", { 213 event: "backfill.decision", 214 status: BackfillStatus.CatchUp, 215 reason: "cursor_too_old", 216 cursorAgeHours: Math.round(ageHours), 217 thresholdHours: this.config.backfillCursorMaxAgeHours, 218 cursorTimestamp: cursor.toString(), 219 }); 220 return BackfillStatus.CatchUp; 221 } 222 223 this.logger.info("backfill.decision", { 224 event: "backfill.decision", 225 status: BackfillStatus.NotNeeded, 226 reason: "cursor_fresh", 227 cursorAgeHours: Math.round(ageHours), 228 }); 229 return BackfillStatus.NotNeeded; 230 } 231 232 /** 233 * Check if a backfill is currently running. 234 */ 235 getIsRunning(): boolean { 236 return this.isRunning; 237 } 238 239 /** 240 * Create an AtpAgent pointed at the forum's PDS. 241 * Extracted as a private method for test mocking. 242 */ 243 private createAgentForPds(): AtpAgent { 244 return new AtpAgent({ service: this.config.pdsUrl }); 245 } 246 247 /** 248 * Create a progress row and return its ID. 249 * Use this before performBackfill when you need the ID immediately (e.g., for a 202 response). 250 * Pass the returned ID as existingRowId to performBackfill to skip duplicate row creation. 251 */ 252 async prepareBackfillRow(type: BackfillStatus): Promise<bigint> { 253 const [row] = await this.db 254 .insert(backfillProgress) 255 .values({ 256 status: "in_progress", 257 backfillType: type, 258 startedAt: new Date(), 259 }) 260 .returning({ id: backfillProgress.id }); 261 return row.id; 262 } 263 264 /** 265 * Query the backfill_progress table for any row with status = 'in_progress'. 266 * Returns the first such row, or null if none exists. 267 */ 268 async checkForInterruptedBackfill() { 269 try { 270 const [row] = await this.db 271 .select() 272 .from(backfillProgress) 273 .where(eq(backfillProgress.status, "in_progress")) 274 .limit(1); 275 276 return row ?? null; 277 } catch (error) { 278 if (isProgrammingError(error)) throw error; 279 this.logger.error("backfill.check_interrupted.failed", { 280 event: "backfill.check_interrupted.failed", 281 error: error instanceof Error ? error.message : String(error), 282 note: "Could not check for interrupted backfills — assuming none", 283 }); 284 return null; 285 } 286 } 287 288 /** 289 * Resume a CatchUp backfill from its last checkpoint (lastProcessedDid). 290 * Only processes users with DID > lastProcessedDid. 291 * Does NOT re-run Phase 1 (forum-owned collections). 292 */ 293 async resumeBackfill(interrupted: typeof backfillProgress.$inferSelect): Promise<BackfillResult> { 294 if (this.isRunning) { 295 throw new Error("Backfill is already in progress"); 296 } 297 298 this.isRunning = true; 299 const startTime = Date.now(); 300 let totalIndexed = interrupted.recordsIndexed; 301 let totalErrors = 0; 302 let didsProcessed = interrupted.didsProcessed; 303 304 this.logger.info("backfill.resuming", { 305 event: "backfill.resuming", 306 backfillId: interrupted.id.toString(), 307 lastProcessedDid: interrupted.lastProcessedDid, 308 didsProcessed: interrupted.didsProcessed, 309 didsTotal: interrupted.didsTotal, 310 }); 311 312 try { 313 const agent = this.createAgentForPds(); 314 315 if (interrupted.backfillType !== BackfillStatus.CatchUp) { 316 // FullSync cannot be resumed from a checkpoint — it must re-run from scratch 317 throw new Error( 318 "Interrupted FullSync cannot be resumed. Re-trigger via /api/admin/backfill?force=full_sync." 319 ); 320 } 321 322 if (interrupted.lastProcessedDid) { 323 // Resume: fetch users after lastProcessedDid 324 // TODO(ATB-13): Paginate for large forums 325 const remainingUsers = await this.db 326 .select({ did: users.did }) 327 .from(users) 328 .where(gt(users.did, interrupted.lastProcessedDid)) 329 .orderBy(asc(users.did)); 330 331 for (let i = 0; i < remainingUsers.length; i += this.config.backfillConcurrency) { 332 const batch = remainingUsers.slice(i, i + this.config.backfillConcurrency); 333 const backfillId = interrupted.id; 334 335 const batchResults = await Promise.allSettled( 336 batch.map(async (user) => { 337 let userIndexed = 0; 338 let userErrors = 0; 339 for (const collection of USER_OWNED_COLLECTIONS) { 340 const stats = await this.syncRepoRecords(user.did, collection, agent); 341 userIndexed += stats.recordsIndexed; 342 if (stats.errors > 0) { 343 userErrors += stats.errors; 344 await this.db.insert(backfillErrors).values({ 345 backfillId, 346 did: user.did, 347 collection, 348 errorMessage: `${stats.errors} record(s) failed`, 349 createdAt: new Date(), 350 }); 351 } 352 } 353 return { indexed: userIndexed, errors: userErrors }; 354 }) 355 ); 356 357 // Aggregate results after settlement, including DID for debuggability 358 batchResults.forEach((result, i) => { 359 if (result.status === "fulfilled") { 360 totalIndexed += result.value.indexed; 361 totalErrors += result.value.errors; 362 } else { 363 totalErrors++; 364 this.logger.error("backfill.resume.batch_user_failed", { 365 event: "backfill.resume.batch_user_failed", 366 backfillId: backfillId.toString(), 367 did: batch[i].did, 368 error: result.reason instanceof Error ? result.reason.message : String(result.reason), 369 }); 370 } 371 }); 372 373 didsProcessed += batch.length; 374 375 try { 376 await this.db 377 .update(backfillProgress) 378 .set({ 379 didsProcessed, 380 recordsIndexed: totalIndexed, 381 lastProcessedDid: batch[batch.length - 1].did, 382 }) 383 .where(eq(backfillProgress.id, backfillId)); 384 } catch (checkpointError) { 385 if (isProgrammingError(checkpointError)) throw checkpointError; 386 this.logger.warn("backfill.resume.checkpoint_failed", { 387 event: "backfill.resume.checkpoint_failed", 388 backfillId: backfillId.toString(), 389 didsProcessed, 390 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), 391 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", 392 }); 393 } 394 } 395 } 396 397 // Mark completed 398 await this.db 399 .update(backfillProgress) 400 .set({ 401 status: "completed", 402 didsProcessed, 403 recordsIndexed: totalIndexed, 404 completedAt: new Date(), 405 }) 406 .where(eq(backfillProgress.id, interrupted.id)); 407 408 const result: BackfillResult = { 409 backfillId: interrupted.id, 410 type: interrupted.backfillType as BackfillStatus, 411 didsProcessed, 412 recordsIndexed: totalIndexed, 413 errors: totalErrors, 414 durationMs: Date.now() - startTime, 415 }; 416 417 const resumeEvent = totalErrors > 0 ? "backfill.resume.completed_with_errors" : "backfill.resume.completed"; 418 this.logger.info(resumeEvent, { 419 event: resumeEvent, 420 ...result, 421 backfillId: result.backfillId.toString(), 422 }); 423 424 return result; 425 } catch (error) { 426 // Best-effort: mark as failed 427 try { 428 await this.db 429 .update(backfillProgress) 430 .set({ 431 status: "failed", 432 errorMessage: error instanceof Error ? error.message : String(error), 433 completedAt: new Date(), 434 }) 435 .where(eq(backfillProgress.id, interrupted.id)); 436 } catch (updateError) { 437 this.logger.error("backfill.resume.failed_status_update_error", { 438 event: "backfill.resume.failed_status_update_error", 439 backfillId: interrupted.id.toString(), 440 error: updateError instanceof Error ? updateError.message : String(updateError), 441 }); 442 } 443 444 this.logger.error("backfill.resume.failed", { 445 event: "backfill.resume.failed", 446 backfillId: interrupted.id.toString(), 447 error: error instanceof Error ? error.message : String(error), 448 }); 449 throw error; 450 } finally { 451 this.isRunning = false; 452 } 453 } 454 455 /** 456 * Execute a backfill operation. 457 * Phase 1: Syncs forum-owned collections from the Forum DID. 458 * Phase 2 (CatchUp only): Syncs user-owned collections from all known users. 459 * 460 * @param existingRowId - If provided (from prepareBackfillRow), skips creating a new progress row. 461 */ 462 async performBackfill(type: BackfillStatus, existingRowId?: bigint): Promise<BackfillResult> { 463 if (this.isRunning) { 464 throw new Error("Backfill is already in progress"); 465 } 466 467 this.isRunning = true; 468 const startTime = Date.now(); 469 let backfillId: bigint | undefined = existingRowId; 470 let totalIndexed = 0; 471 let totalErrors = 0; 472 let didsProcessed = 0; 473 474 try { 475 // Create progress row only if not pre-created by prepareBackfillRow 476 if (backfillId === undefined) { 477 const [row] = await this.db 478 .insert(backfillProgress) 479 .values({ 480 status: "in_progress", 481 backfillType: type, 482 startedAt: new Date(), 483 }) 484 .returning({ id: backfillProgress.id }); 485 backfillId = row.id; 486 } 487 // Capture in const so TypeScript can narrow through async closures 488 const resolvedBackfillId: bigint = backfillId; 489 490 const agent = this.createAgentForPds(); 491 492 // Phase 1: Sync forum-owned collections from Forum DID 493 for (const collection of FORUM_OWNED_COLLECTIONS) { 494 const stats = await this.syncRepoRecords( 495 this.config.forumDid, 496 collection, 497 agent 498 ); 499 totalIndexed += stats.recordsIndexed; 500 totalErrors += stats.errors; 501 if (stats.errors > 0) { 502 await this.db.insert(backfillErrors).values({ 503 backfillId: resolvedBackfillId, 504 did: this.config.forumDid, 505 collection, 506 errorMessage: `${stats.errors} record(s) failed`, 507 createdAt: new Date(), 508 }); 509 } 510 } 511 512 // Phase 2: For CatchUp, sync user-owned records from known DIDs 513 if (type === BackfillStatus.CatchUp) { 514 // TODO(ATB-13): Paginate for large forums — currently loads all DIDs into memory 515 const knownUsers = await this.db 516 .select({ did: users.did }) 517 .from(users) 518 .orderBy(asc(users.did)); 519 520 const didsTotal = knownUsers.length; 521 522 await this.db 523 .update(backfillProgress) 524 .set({ didsTotal }) 525 .where(eq(backfillProgress.id, backfillId)); 526 527 // Process in batches of backfillConcurrency 528 for (let i = 0; i < knownUsers.length; i += this.config.backfillConcurrency) { 529 const batch = knownUsers.slice(i, i + this.config.backfillConcurrency); 530 531 const batchResults = await Promise.allSettled( 532 batch.map(async (user) => { 533 let userIndexed = 0; 534 let userErrors = 0; 535 for (const collection of USER_OWNED_COLLECTIONS) { 536 const stats = await this.syncRepoRecords(user.did, collection, agent); 537 userIndexed += stats.recordsIndexed; 538 if (stats.errors > 0) { 539 userErrors += stats.errors; 540 await this.db.insert(backfillErrors).values({ 541 backfillId: resolvedBackfillId, 542 did: user.did, 543 collection, 544 errorMessage: `${stats.errors} record(s) failed`, 545 createdAt: new Date(), 546 }); 547 } 548 } 549 return { indexed: userIndexed, errors: userErrors }; 550 }) 551 ); 552 553 // Aggregate results after settlement, including DID for debuggability 554 batchResults.forEach((result, i) => { 555 if (result.status === "fulfilled") { 556 totalIndexed += result.value.indexed; 557 totalErrors += result.value.errors; 558 } else { 559 totalErrors++; 560 this.logger.error("backfill.batch_user_failed", { 561 event: "backfill.batch_user_failed", 562 backfillId: resolvedBackfillId.toString(), 563 did: batch[i].did, 564 error: result.reason instanceof Error ? result.reason.message : String(result.reason), 565 }); 566 } 567 }); 568 569 didsProcessed += batch.length; 570 571 try { 572 await this.db 573 .update(backfillProgress) 574 .set({ 575 didsProcessed, 576 recordsIndexed: totalIndexed, 577 lastProcessedDid: batch[batch.length - 1].did, 578 }) 579 .where(eq(backfillProgress.id, backfillId)); 580 } catch (checkpointError) { 581 if (isProgrammingError(checkpointError)) throw checkpointError; 582 this.logger.warn("backfill.checkpoint_failed", { 583 event: "backfill.checkpoint_failed", 584 backfillId: resolvedBackfillId.toString(), 585 didsProcessed, 586 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), 587 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", 588 }); 589 } 590 591 this.logger.info("backfill.progress", { 592 event: "backfill.progress", 593 backfillId: backfillId.toString(), 594 type, 595 didsProcessed, 596 didsTotal, 597 recordsIndexed: totalIndexed, 598 elapsedMs: Date.now() - startTime, 599 }); 600 } 601 } 602 603 // Mark completed 604 await this.db 605 .update(backfillProgress) 606 .set({ 607 status: "completed", 608 didsProcessed, 609 recordsIndexed: totalIndexed, 610 completedAt: new Date(), 611 }) 612 .where(eq(backfillProgress.id, backfillId)); 613 614 const result: BackfillResult = { 615 backfillId: resolvedBackfillId, 616 type, 617 didsProcessed, 618 recordsIndexed: totalIndexed, 619 errors: totalErrors, 620 durationMs: Date.now() - startTime, 621 }; 622 623 const completedEvent = totalErrors > 0 ? "backfill.completed_with_errors" : "backfill.completed"; 624 this.logger.info(completedEvent, { 625 event: completedEvent, 626 ...result, 627 backfillId: result.backfillId.toString(), 628 }); 629 630 return result; 631 } catch (error) { 632 // Best-effort: mark progress row as failed (if it was created) 633 if (backfillId !== undefined) { 634 try { 635 await this.db 636 .update(backfillProgress) 637 .set({ 638 status: "failed", 639 errorMessage: error instanceof Error ? error.message : String(error), 640 completedAt: new Date(), 641 }) 642 .where(eq(backfillProgress.id, backfillId)); 643 } catch (updateError) { 644 this.logger.error("backfill.failed_status_update_error", { 645 event: "backfill.failed_status_update_error", 646 backfillId: backfillId.toString(), 647 error: updateError instanceof Error ? updateError.message : String(updateError), 648 }); 649 } 650 } 651 652 this.logger.error("backfill.failed", { 653 event: "backfill.failed", 654 backfillId: backfillId !== undefined ? backfillId.toString() : "not_created", 655 error: error instanceof Error ? error.message : String(error), 656 }); 657 throw error; 658 } finally { 659 this.isRunning = false; 660 } 661 } 662} 663