WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import type { Database } from "@atbb/db";
2import { forums, backfillProgress, backfillErrors, users } from "@atbb/db";
3import { eq, asc, gt } from "drizzle-orm";
4import { AtpAgent } from "@atproto/api";
5import { CursorManager } from "./cursor-manager.js";
6import type { AppConfig } from "./config.js";
7import type { Indexer } from "./indexer.js";
8import { isProgrammingError } from "./errors.js";
9import type { Logger } from "@atbb/logger";
10
11/**
12 * Maps AT Proto collection NSIDs to Indexer handler method names.
13 * Order matters: sync forum-owned records first (FK dependencies).
14 */
15// These collections define the sync order. Used by performBackfill() in Task 6.
16export const FORUM_OWNED_COLLECTIONS = [
17 "space.atbb.forum.forum",
18 "space.atbb.forum.category",
19 "space.atbb.forum.board",
20 "space.atbb.forum.role",
21 "space.atbb.modAction",
22 "space.atbb.forum.theme",
23 "space.atbb.forum.themePolicy",
24] as const;
25
26export const USER_OWNED_COLLECTIONS = [
27 "space.atbb.membership",
28 "space.atbb.post",
29] as const;
30
31const COLLECTION_HANDLER_MAP: Record<string, string> = {
32 "space.atbb.post": "handlePostCreate",
33 "space.atbb.forum.forum": "handleForumCreate",
34 "space.atbb.forum.category": "handleCategoryCreate",
35 "space.atbb.forum.board": "handleBoardCreate",
36 "space.atbb.forum.role": "handleRoleCreate",
37 "space.atbb.membership": "handleMembershipCreate",
38 "space.atbb.modAction": "handleModActionCreate",
39 "space.atbb.forum.theme": "handleThemeCreate",
40 "space.atbb.forum.themePolicy": "handleThemePolicyCreate",
41};
42
43export enum BackfillStatus {
44 NotNeeded = "not_needed",
45 CatchUp = "catch_up",
46 FullSync = "full_sync",
47}
48
49export interface BackfillResult {
50 backfillId: bigint;
51 type: BackfillStatus;
52 didsProcessed: number;
53 recordsIndexed: number;
54 errors: number;
55 durationMs: number;
56}
57
58export interface SyncStats {
59 recordsFound: number;
60 recordsIndexed: number;
61 errors: number;
62}
63
64export class BackfillManager {
65 private cursorManager: CursorManager;
66 private isRunning = false;
67 private indexer: Indexer | null = null;
68
69 constructor(
70 private db: Database,
71 private config: AppConfig,
72 private logger: Logger,
73 ) {
74 this.cursorManager = new CursorManager(db, logger);
75 }
76
77 /**
78 * Inject the Indexer instance. Called during AppContext wiring.
79 */
80 setIndexer(indexer: Indexer): void {
81 this.indexer = indexer;
82 }
83
84 /**
85 * Sync all records from a single (DID, collection) pair via listRecords.
86 * Feeds each record through the matching Indexer handler.
87 */
88 async syncRepoRecords(
89 did: string,
90 collection: string,
91 agent: AtpAgent
92 ): Promise<SyncStats> {
93 const stats: SyncStats = { recordsFound: 0, recordsIndexed: 0, errors: 0 };
94 const handlerName = COLLECTION_HANDLER_MAP[collection];
95
96 if (!handlerName || !this.indexer) {
97 this.logger.error("backfill.sync_skipped", {
98 event: "backfill.sync_skipped",
99 did,
100 collection,
101 reason: !handlerName ? "unknown_collection" : "indexer_not_set",
102 });
103 stats.errors = 1;
104 return stats;
105 }
106
107 const handler = (this.indexer as any)[handlerName].bind(this.indexer);
108 const delayMs = 1000 / this.config.backfillRateLimit;
109 let cursor: string | undefined;
110
111 try {
112 do {
113 const response = await agent.com.atproto.repo.listRecords({
114 repo: did,
115 collection,
116 limit: 100,
117 cursor,
118 });
119
120 const records = response.data.records;
121 stats.recordsFound += records.length;
122
123 for (const record of records) {
124 try {
125 const rkey = record.uri.split("/").pop()!;
126 const event = {
127 did,
128 commit: { rkey, cid: record.cid, record: record.value },
129 };
130 await handler(event);
131 stats.recordsIndexed++;
132 } catch (error) {
133 if (isProgrammingError(error)) throw error;
134 stats.errors++;
135 this.logger.error("backfill.record_error", {
136 event: "backfill.record_error",
137 did,
138 collection,
139 uri: record.uri,
140 error: error instanceof Error ? error.message : String(error),
141 });
142 }
143 }
144
145 cursor = response.data.cursor;
146
147 // Rate limiting: delay between page fetches
148 if (cursor) {
149 await new Promise((resolve) => setTimeout(resolve, delayMs));
150 }
151 } while (cursor);
152 } catch (error) {
153 if (isProgrammingError(error)) throw error;
154 stats.errors++;
155 this.logger.error("backfill.pds_error", {
156 event: "backfill.pds_error",
157 did,
158 collection,
159 error: error instanceof Error ? error.message : String(error),
160 });
161 }
162
163 return stats;
164 }
165
166 /**
167 * Determine if backfill is needed based on cursor state and DB contents.
168 */
169 async checkIfNeeded(cursor: bigint | null): Promise<BackfillStatus> {
170 // No cursor at all → first startup or wiped cursor
171 if (cursor === null) {
172 this.logger.info("backfill.decision", {
173 event: "backfill.decision",
174 status: BackfillStatus.FullSync,
175 reason: "no_cursor",
176 });
177 return BackfillStatus.FullSync;
178 }
179
180 // Check if DB has forum data (consistency check)
181 let forum: { rkey: string } | undefined;
182 try {
183 const results = await this.db
184 .select()
185 .from(forums)
186 .where(eq(forums.rkey, "self"))
187 .limit(1);
188 forum = results[0];
189 } catch (error) {
190 this.logger.error("backfill.decision", {
191 event: "backfill.decision",
192 status: BackfillStatus.FullSync,
193 reason: "db_query_failed",
194 error: error instanceof Error ? error.message : String(error),
195 });
196 return BackfillStatus.FullSync;
197 }
198
199 if (!forum) {
200 this.logger.info("backfill.decision", {
201 event: "backfill.decision",
202 status: BackfillStatus.FullSync,
203 reason: "db_inconsistency",
204 cursorTimestamp: cursor.toString(),
205 });
206 return BackfillStatus.FullSync;
207 }
208
209 // Check cursor age
210 const ageHours = this.cursorManager.getCursorAgeHours(cursor)!;
211 if (ageHours > this.config.backfillCursorMaxAgeHours) {
212 this.logger.info("backfill.decision", {
213 event: "backfill.decision",
214 status: BackfillStatus.CatchUp,
215 reason: "cursor_too_old",
216 cursorAgeHours: Math.round(ageHours),
217 thresholdHours: this.config.backfillCursorMaxAgeHours,
218 cursorTimestamp: cursor.toString(),
219 });
220 return BackfillStatus.CatchUp;
221 }
222
223 this.logger.info("backfill.decision", {
224 event: "backfill.decision",
225 status: BackfillStatus.NotNeeded,
226 reason: "cursor_fresh",
227 cursorAgeHours: Math.round(ageHours),
228 });
229 return BackfillStatus.NotNeeded;
230 }
231
232 /**
233 * Check if a backfill is currently running.
234 */
235 getIsRunning(): boolean {
236 return this.isRunning;
237 }
238
239 /**
240 * Create an AtpAgent pointed at the forum's PDS.
241 * Extracted as a private method for test mocking.
242 */
243 private createAgentForPds(): AtpAgent {
244 return new AtpAgent({ service: this.config.pdsUrl });
245 }
246
247 /**
248 * Create a progress row and return its ID.
249 * Use this before performBackfill when you need the ID immediately (e.g., for a 202 response).
250 * Pass the returned ID as existingRowId to performBackfill to skip duplicate row creation.
251 */
252 async prepareBackfillRow(type: BackfillStatus): Promise<bigint> {
253 const [row] = await this.db
254 .insert(backfillProgress)
255 .values({
256 status: "in_progress",
257 backfillType: type,
258 startedAt: new Date(),
259 })
260 .returning({ id: backfillProgress.id });
261 return row.id;
262 }
263
264 /**
265 * Query the backfill_progress table for any row with status = 'in_progress'.
266 * Returns the first such row, or null if none exists.
267 */
268 async checkForInterruptedBackfill() {
269 try {
270 const [row] = await this.db
271 .select()
272 .from(backfillProgress)
273 .where(eq(backfillProgress.status, "in_progress"))
274 .limit(1);
275
276 return row ?? null;
277 } catch (error) {
278 if (isProgrammingError(error)) throw error;
279 this.logger.error("backfill.check_interrupted.failed", {
280 event: "backfill.check_interrupted.failed",
281 error: error instanceof Error ? error.message : String(error),
282 note: "Could not check for interrupted backfills — assuming none",
283 });
284 return null;
285 }
286 }
287
288 /**
289 * Resume a CatchUp backfill from its last checkpoint (lastProcessedDid).
290 * Only processes users with DID > lastProcessedDid.
291 * Does NOT re-run Phase 1 (forum-owned collections).
292 */
293 async resumeBackfill(interrupted: typeof backfillProgress.$inferSelect): Promise<BackfillResult> {
294 if (this.isRunning) {
295 throw new Error("Backfill is already in progress");
296 }
297
298 this.isRunning = true;
299 const startTime = Date.now();
300 let totalIndexed = interrupted.recordsIndexed;
301 let totalErrors = 0;
302 let didsProcessed = interrupted.didsProcessed;
303
304 this.logger.info("backfill.resuming", {
305 event: "backfill.resuming",
306 backfillId: interrupted.id.toString(),
307 lastProcessedDid: interrupted.lastProcessedDid,
308 didsProcessed: interrupted.didsProcessed,
309 didsTotal: interrupted.didsTotal,
310 });
311
312 try {
313 const agent = this.createAgentForPds();
314
315 if (interrupted.backfillType !== BackfillStatus.CatchUp) {
316 // FullSync cannot be resumed from a checkpoint — it must re-run from scratch
317 throw new Error(
318 "Interrupted FullSync cannot be resumed. Re-trigger via /api/admin/backfill?force=full_sync."
319 );
320 }
321
322 if (interrupted.lastProcessedDid) {
323 // Resume: fetch users after lastProcessedDid
324 // TODO(ATB-13): Paginate for large forums
325 const remainingUsers = await this.db
326 .select({ did: users.did })
327 .from(users)
328 .where(gt(users.did, interrupted.lastProcessedDid))
329 .orderBy(asc(users.did));
330
331 for (let i = 0; i < remainingUsers.length; i += this.config.backfillConcurrency) {
332 const batch = remainingUsers.slice(i, i + this.config.backfillConcurrency);
333 const backfillId = interrupted.id;
334
335 const batchResults = await Promise.allSettled(
336 batch.map(async (user) => {
337 let userIndexed = 0;
338 let userErrors = 0;
339 for (const collection of USER_OWNED_COLLECTIONS) {
340 const stats = await this.syncRepoRecords(user.did, collection, agent);
341 userIndexed += stats.recordsIndexed;
342 if (stats.errors > 0) {
343 userErrors += stats.errors;
344 await this.db.insert(backfillErrors).values({
345 backfillId,
346 did: user.did,
347 collection,
348 errorMessage: `${stats.errors} record(s) failed`,
349 createdAt: new Date(),
350 });
351 }
352 }
353 return { indexed: userIndexed, errors: userErrors };
354 })
355 );
356
357 // Aggregate results after settlement, including DID for debuggability
358 batchResults.forEach((result, i) => {
359 if (result.status === "fulfilled") {
360 totalIndexed += result.value.indexed;
361 totalErrors += result.value.errors;
362 } else {
363 totalErrors++;
364 this.logger.error("backfill.resume.batch_user_failed", {
365 event: "backfill.resume.batch_user_failed",
366 backfillId: backfillId.toString(),
367 did: batch[i].did,
368 error: result.reason instanceof Error ? result.reason.message : String(result.reason),
369 });
370 }
371 });
372
373 didsProcessed += batch.length;
374
375 try {
376 await this.db
377 .update(backfillProgress)
378 .set({
379 didsProcessed,
380 recordsIndexed: totalIndexed,
381 lastProcessedDid: batch[batch.length - 1].did,
382 })
383 .where(eq(backfillProgress.id, backfillId));
384 } catch (checkpointError) {
385 if (isProgrammingError(checkpointError)) throw checkpointError;
386 this.logger.warn("backfill.resume.checkpoint_failed", {
387 event: "backfill.resume.checkpoint_failed",
388 backfillId: backfillId.toString(),
389 didsProcessed,
390 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError),
391 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.",
392 });
393 }
394 }
395 }
396
397 // Mark completed
398 await this.db
399 .update(backfillProgress)
400 .set({
401 status: "completed",
402 didsProcessed,
403 recordsIndexed: totalIndexed,
404 completedAt: new Date(),
405 })
406 .where(eq(backfillProgress.id, interrupted.id));
407
408 const result: BackfillResult = {
409 backfillId: interrupted.id,
410 type: interrupted.backfillType as BackfillStatus,
411 didsProcessed,
412 recordsIndexed: totalIndexed,
413 errors: totalErrors,
414 durationMs: Date.now() - startTime,
415 };
416
417 const resumeEvent = totalErrors > 0 ? "backfill.resume.completed_with_errors" : "backfill.resume.completed";
418 this.logger.info(resumeEvent, {
419 event: resumeEvent,
420 ...result,
421 backfillId: result.backfillId.toString(),
422 });
423
424 return result;
425 } catch (error) {
426 // Best-effort: mark as failed
427 try {
428 await this.db
429 .update(backfillProgress)
430 .set({
431 status: "failed",
432 errorMessage: error instanceof Error ? error.message : String(error),
433 completedAt: new Date(),
434 })
435 .where(eq(backfillProgress.id, interrupted.id));
436 } catch (updateError) {
437 this.logger.error("backfill.resume.failed_status_update_error", {
438 event: "backfill.resume.failed_status_update_error",
439 backfillId: interrupted.id.toString(),
440 error: updateError instanceof Error ? updateError.message : String(updateError),
441 });
442 }
443
444 this.logger.error("backfill.resume.failed", {
445 event: "backfill.resume.failed",
446 backfillId: interrupted.id.toString(),
447 error: error instanceof Error ? error.message : String(error),
448 });
449 throw error;
450 } finally {
451 this.isRunning = false;
452 }
453 }
454
455 /**
456 * Execute a backfill operation.
457 * Phase 1: Syncs forum-owned collections from the Forum DID.
458 * Phase 2 (CatchUp only): Syncs user-owned collections from all known users.
459 *
460 * @param existingRowId - If provided (from prepareBackfillRow), skips creating a new progress row.
461 */
462 async performBackfill(type: BackfillStatus, existingRowId?: bigint): Promise<BackfillResult> {
463 if (this.isRunning) {
464 throw new Error("Backfill is already in progress");
465 }
466
467 this.isRunning = true;
468 const startTime = Date.now();
469 let backfillId: bigint | undefined = existingRowId;
470 let totalIndexed = 0;
471 let totalErrors = 0;
472 let didsProcessed = 0;
473
474 try {
475 // Create progress row only if not pre-created by prepareBackfillRow
476 if (backfillId === undefined) {
477 const [row] = await this.db
478 .insert(backfillProgress)
479 .values({
480 status: "in_progress",
481 backfillType: type,
482 startedAt: new Date(),
483 })
484 .returning({ id: backfillProgress.id });
485 backfillId = row.id;
486 }
487 // Capture in const so TypeScript can narrow through async closures
488 const resolvedBackfillId: bigint = backfillId;
489
490 const agent = this.createAgentForPds();
491
492 // Phase 1: Sync forum-owned collections from Forum DID
493 for (const collection of FORUM_OWNED_COLLECTIONS) {
494 const stats = await this.syncRepoRecords(
495 this.config.forumDid,
496 collection,
497 agent
498 );
499 totalIndexed += stats.recordsIndexed;
500 totalErrors += stats.errors;
501 if (stats.errors > 0) {
502 await this.db.insert(backfillErrors).values({
503 backfillId: resolvedBackfillId,
504 did: this.config.forumDid,
505 collection,
506 errorMessage: `${stats.errors} record(s) failed`,
507 createdAt: new Date(),
508 });
509 }
510 }
511
512 // Phase 2: For CatchUp, sync user-owned records from known DIDs
513 if (type === BackfillStatus.CatchUp) {
514 // TODO(ATB-13): Paginate for large forums — currently loads all DIDs into memory
515 const knownUsers = await this.db
516 .select({ did: users.did })
517 .from(users)
518 .orderBy(asc(users.did));
519
520 const didsTotal = knownUsers.length;
521
522 await this.db
523 .update(backfillProgress)
524 .set({ didsTotal })
525 .where(eq(backfillProgress.id, backfillId));
526
527 // Process in batches of backfillConcurrency
528 for (let i = 0; i < knownUsers.length; i += this.config.backfillConcurrency) {
529 const batch = knownUsers.slice(i, i + this.config.backfillConcurrency);
530
531 const batchResults = await Promise.allSettled(
532 batch.map(async (user) => {
533 let userIndexed = 0;
534 let userErrors = 0;
535 for (const collection of USER_OWNED_COLLECTIONS) {
536 const stats = await this.syncRepoRecords(user.did, collection, agent);
537 userIndexed += stats.recordsIndexed;
538 if (stats.errors > 0) {
539 userErrors += stats.errors;
540 await this.db.insert(backfillErrors).values({
541 backfillId: resolvedBackfillId,
542 did: user.did,
543 collection,
544 errorMessage: `${stats.errors} record(s) failed`,
545 createdAt: new Date(),
546 });
547 }
548 }
549 return { indexed: userIndexed, errors: userErrors };
550 })
551 );
552
553 // Aggregate results after settlement, including DID for debuggability
554 batchResults.forEach((result, i) => {
555 if (result.status === "fulfilled") {
556 totalIndexed += result.value.indexed;
557 totalErrors += result.value.errors;
558 } else {
559 totalErrors++;
560 this.logger.error("backfill.batch_user_failed", {
561 event: "backfill.batch_user_failed",
562 backfillId: resolvedBackfillId.toString(),
563 did: batch[i].did,
564 error: result.reason instanceof Error ? result.reason.message : String(result.reason),
565 });
566 }
567 });
568
569 didsProcessed += batch.length;
570
571 try {
572 await this.db
573 .update(backfillProgress)
574 .set({
575 didsProcessed,
576 recordsIndexed: totalIndexed,
577 lastProcessedDid: batch[batch.length - 1].did,
578 })
579 .where(eq(backfillProgress.id, backfillId));
580 } catch (checkpointError) {
581 if (isProgrammingError(checkpointError)) throw checkpointError;
582 this.logger.warn("backfill.checkpoint_failed", {
583 event: "backfill.checkpoint_failed",
584 backfillId: resolvedBackfillId.toString(),
585 didsProcessed,
586 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError),
587 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.",
588 });
589 }
590
591 this.logger.info("backfill.progress", {
592 event: "backfill.progress",
593 backfillId: backfillId.toString(),
594 type,
595 didsProcessed,
596 didsTotal,
597 recordsIndexed: totalIndexed,
598 elapsedMs: Date.now() - startTime,
599 });
600 }
601 }
602
603 // Mark completed
604 await this.db
605 .update(backfillProgress)
606 .set({
607 status: "completed",
608 didsProcessed,
609 recordsIndexed: totalIndexed,
610 completedAt: new Date(),
611 })
612 .where(eq(backfillProgress.id, backfillId));
613
614 const result: BackfillResult = {
615 backfillId: resolvedBackfillId,
616 type,
617 didsProcessed,
618 recordsIndexed: totalIndexed,
619 errors: totalErrors,
620 durationMs: Date.now() - startTime,
621 };
622
623 const completedEvent = totalErrors > 0 ? "backfill.completed_with_errors" : "backfill.completed";
624 this.logger.info(completedEvent, {
625 event: completedEvent,
626 ...result,
627 backfillId: result.backfillId.toString(),
628 });
629
630 return result;
631 } catch (error) {
632 // Best-effort: mark progress row as failed (if it was created)
633 if (backfillId !== undefined) {
634 try {
635 await this.db
636 .update(backfillProgress)
637 .set({
638 status: "failed",
639 errorMessage: error instanceof Error ? error.message : String(error),
640 completedAt: new Date(),
641 })
642 .where(eq(backfillProgress.id, backfillId));
643 } catch (updateError) {
644 this.logger.error("backfill.failed_status_update_error", {
645 event: "backfill.failed_status_update_error",
646 backfillId: backfillId.toString(),
647 error: updateError instanceof Error ? updateError.message : String(updateError),
648 });
649 }
650 }
651
652 this.logger.error("backfill.failed", {
653 event: "backfill.failed",
654 backfillId: backfillId !== undefined ? backfillId.toString() : "not_created",
655 error: error instanceof Error ? error.message : String(error),
656 });
657 throw error;
658 } finally {
659 this.isRunning = false;
660 }
661 }
662}
663