WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import type {
2 CommitCreateEvent,
3 CommitDeleteEvent,
4 CommitUpdateEvent,
5} from "@skyware/jetstream";
6import type { Database, DbOrTransaction } from "@atbb/db";
7import type { Logger } from "@atbb/logger";
8import {
9 posts,
10 forums,
11 categories,
12 boards,
13 users,
14 memberships,
15 modActions,
16 roles,
17 rolePermissions,
18 themes,
19 themePolicies,
20 themePolicyAvailableThemes,
21} from "@atbb/db";
22import { eq, and } from "drizzle-orm";
23import { parseAtUri } from "./at-uri.js";
24import { BanEnforcer } from "./ban-enforcer.js";
25import {
26 SpaceAtbbPost as Post,
27 SpaceAtbbForumForum as Forum,
28 SpaceAtbbForumCategory as Category,
29 SpaceAtbbForumBoard as Board,
30 SpaceAtbbMembership as Membership,
31 SpaceAtbbModAction as ModAction,
32 SpaceAtbbForumRole as Role,
33 SpaceAtbbForumTheme as Theme,
34 SpaceAtbbForumThemePolicy as ThemePolicy,
35} from "@atbb/lexicon";
36
37// ── Collection Config Types ─────────────────────────────
38
39/**
40 * Configuration for a data-driven collection handler.
41 * Encodes the per-collection logic that differs across the 5 indexed types,
42 * while the generic handler methods supply the shared try/catch/log/throw scaffolding.
43 */
44interface CollectionConfig<TRecord> {
45 /** Human-readable name for logging (e.g. "Post", "Forum") */
46 name: string;
47 /** Drizzle table reference */
48 table: any;
49 /** "hard" = DELETE FROM (all non-post collections) */
50 deleteStrategy: "hard";
51 /** Call ensureUser(event.did) before insert? (user-owned records) */
52 ensureUserOnCreate?: boolean;
53 /**
54 * Transform event+record into DB insert values.
55 * Return null to skip the insert (e.g. when a required foreign key is missing).
56 */
57 toInsertValues: (
58 event: any,
59 record: TRecord,
60 tx: DbOrTransaction
61 ) => Promise<Record<string, any> | null>;
62 /**
63 * Transform event+record into DB update set values.
64 * Runs inside a transaction. Return null to skip the update.
65 */
66 toUpdateValues: (
67 event: any,
68 record: TRecord,
69 tx: DbOrTransaction
70 ) => Promise<Record<string, any> | null>;
71 /**
72 * Optional hook called after a row is inserted or updated, within the same
73 * transaction. Receives the row's numeric id (bigint) so callers can write
74 * to child tables (e.g. role_permissions).
75 */
76 afterUpsert?: (
77 event: any,
78 record: TRecord,
79 rowId: bigint,
80 tx: DbOrTransaction
81 ) => Promise<void>;
82}
83
84
85/**
86 * Indexer class for processing AT Proto firehose events
87 * Converts events into database records for the atBB AppView
88 */
89export class Indexer {
90 private banEnforcer: BanEnforcer;
91
92 constructor(private db: Database, private logger: Logger) {
93 this.banEnforcer = new BanEnforcer(db, logger);
94 }
95
96 // ── Collection Configs ──────────────────────────────────
97
98 private postConfig: CollectionConfig<Post.Record> = {
99 name: "Post",
100 table: posts,
101 deleteStrategy: "hard",
102 ensureUserOnCreate: true,
103 toInsertValues: async (event, record, tx) => {
104 // Look up parent/root for replies
105 let rootId: bigint | null = null;
106 let parentId: bigint | null = null;
107
108 if (Post.isReplyRef(record.reply)) {
109 rootId = await this.getPostIdByUri(record.reply.root.uri, tx);
110 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx);
111 } else if (record.reply) {
112 // reply ref present but $type omitted — rootPostId/parentPostId will be null,
113 // making this reply unreachable in thread navigation (data corruption).
114 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", {
115 operation: "Post CREATE",
116 postDid: event.did,
117 postRkey: event.commit.rkey,
118 errorId: "POST_REPLY_REF_MISSING_TYPE",
119 });
120 }
121
122 // Look up board ID if board reference exists
123 let boardId: bigint | null = null;
124 if (record.board?.board.uri) {
125 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
126 if (!boardId) {
127 this.logger.error("Failed to index post: board not found", {
128 operation: "Post CREATE",
129 postDid: event.did,
130 postRkey: event.commit.rkey,
131 boardUri: record.board.board.uri,
132 errorId: "POST_BOARD_MISSING",
133 });
134 throw new Error(`Board not found: ${record.board.board.uri}`);
135 }
136 }
137
138 return {
139 did: event.did,
140 rkey: event.commit.rkey,
141 cid: event.commit.cid,
142 title: record.reply ? null : (record.title ?? null),
143 text: record.text,
144 forumUri: record.forum?.forum.uri ?? null,
145 boardUri: record.board?.board.uri ?? null,
146 boardId,
147 rootPostId: rootId,
148 rootUri: record.reply?.root.uri ?? null,
149 parentPostId: parentId,
150 parentUri: record.reply?.parent.uri ?? null,
151 createdAt: new Date(record.createdAt),
152 indexedAt: new Date(),
153 };
154 },
155 toUpdateValues: async (event, record, tx) => {
156 // Look up board ID if board reference exists
157 let boardId: bigint | null = null;
158 if (record.board?.board.uri) {
159 boardId = await this.getBoardIdByUri(record.board.board.uri, tx);
160 if (!boardId) {
161 this.logger.error("Failed to index post: board not found", {
162 operation: "Post UPDATE",
163 postDid: event.did,
164 postRkey: event.commit.rkey,
165 boardUri: record.board.board.uri,
166 errorId: "POST_BOARD_MISSING",
167 });
168 throw new Error(`Board not found: ${record.board.board.uri}`);
169 }
170 }
171
172 return {
173 cid: event.commit.cid,
174 title: record.reply ? null : (record.title ?? null),
175 text: record.text,
176 forumUri: record.forum?.forum.uri ?? null,
177 boardUri: record.board?.board.uri ?? null,
178 boardId,
179 indexedAt: new Date(),
180 };
181 },
182 };
183
184 private forumConfig: CollectionConfig<Forum.Record> = {
185 name: "Forum",
186 table: forums,
187 deleteStrategy: "hard",
188 ensureUserOnCreate: true,
189 toInsertValues: async (event, record) => ({
190 did: event.did,
191 rkey: event.commit.rkey,
192 cid: event.commit.cid,
193 name: record.name,
194 description: record.description ?? null,
195 indexedAt: new Date(),
196 }),
197 toUpdateValues: async (event, record) => ({
198 cid: event.commit.cid,
199 name: record.name,
200 description: record.description ?? null,
201 indexedAt: new Date(),
202 }),
203 };
204
205 private categoryConfig: CollectionConfig<Category.Record> = {
206 name: "Category",
207 table: categories,
208 deleteStrategy: "hard",
209 toInsertValues: async (event, record, tx) => {
210 // Categories are owned by the Forum DID, so event.did IS the forum DID
211 const forumId = await this.getForumIdByDid(event.did, tx);
212
213 if (!forumId) {
214 this.logger.warn("Category: Forum not found for DID", {
215 operation: "Category CREATE",
216 did: event.did,
217 });
218 return null;
219 }
220
221 return {
222 did: event.did,
223 rkey: event.commit.rkey,
224 cid: event.commit.cid,
225 forumId,
226 name: record.name,
227 description: record.description ?? null,
228 slug: record.slug ?? null,
229 sortOrder: record.sortOrder ?? 0,
230 createdAt: new Date(record.createdAt),
231 indexedAt: new Date(),
232 };
233 },
234 toUpdateValues: async (event, record, tx) => {
235 // Categories are owned by the Forum DID, so event.did IS the forum DID
236 const forumId = await this.getForumIdByDid(event.did, tx);
237
238 if (!forumId) {
239 this.logger.warn("Category: Forum not found for DID", {
240 operation: "Category UPDATE",
241 did: event.did,
242 });
243 return null;
244 }
245
246 return {
247 cid: event.commit.cid,
248 forumId,
249 name: record.name,
250 description: record.description ?? null,
251 slug: record.slug ?? null,
252 sortOrder: record.sortOrder ?? 0,
253 indexedAt: new Date(),
254 };
255 },
256 };
257
258 private boardConfig: CollectionConfig<Board.Record> = {
259 name: "Board",
260 table: boards,
261 deleteStrategy: "hard",
262 toInsertValues: async (event, record, tx) => {
263 // Boards are owned by Forum DID
264 const categoryId = await this.getCategoryIdByUri(
265 record.category.category.uri,
266 tx
267 );
268
269 if (!categoryId) {
270 this.logger.error("Failed to index board: category not found", {
271 operation: "Board CREATE",
272 boardDid: event.did,
273 boardRkey: event.commit.rkey,
274 categoryUri: record.category.category.uri,
275 errorId: "BOARD_CATEGORY_MISSING",
276 });
277 throw new Error(`Category not found: ${record.category.category.uri}`);
278 }
279
280 return {
281 did: event.did,
282 rkey: event.commit.rkey,
283 cid: event.commit.cid,
284 name: record.name,
285 description: record.description ?? null,
286 slug: record.slug ?? null,
287 sortOrder: record.sortOrder ?? null,
288 categoryId,
289 categoryUri: record.category.category.uri,
290 createdAt: new Date(record.createdAt),
291 indexedAt: new Date(),
292 };
293 },
294 toUpdateValues: async (event, record, tx) => {
295 const categoryId = await this.getCategoryIdByUri(
296 record.category.category.uri,
297 tx
298 );
299
300 if (!categoryId) {
301 this.logger.error("Failed to index board: category not found", {
302 operation: "Board UPDATE",
303 boardDid: event.did,
304 boardRkey: event.commit.rkey,
305 categoryUri: record.category.category.uri,
306 errorId: "BOARD_CATEGORY_MISSING",
307 });
308 throw new Error(`Category not found: ${record.category.category.uri}`);
309 }
310
311 return {
312 cid: event.commit.cid,
313 name: record.name,
314 description: record.description ?? null,
315 slug: record.slug ?? null,
316 sortOrder: record.sortOrder ?? null,
317 categoryId,
318 categoryUri: record.category.category.uri,
319 indexedAt: new Date(),
320 };
321 },
322 };
323
324 private roleConfig: CollectionConfig<Role.Record> = {
325 name: "Role",
326 table: roles,
327 deleteStrategy: "hard",
328 toInsertValues: async (event, record) => ({
329 did: event.did,
330 rkey: event.commit.rkey,
331 cid: event.commit.cid,
332 name: record.name,
333 description: record.description ?? null,
334 priority: record.priority,
335 createdAt: new Date(record.createdAt),
336 indexedAt: new Date(),
337 }),
338 toUpdateValues: async (event, record) => ({
339 cid: event.commit.cid,
340 name: record.name,
341 description: record.description ?? null,
342 priority: record.priority,
343 indexedAt: new Date(),
344 }),
345 afterUpsert: async (event, record, roleId, tx) => {
346 // Replace all permissions for this role atomically
347 await tx
348 .delete(rolePermissions)
349 .where(eq(rolePermissions.roleId, roleId));
350
351 if (record.permissions && record.permissions.length > 0) {
352 await tx.insert(rolePermissions).values(
353 record.permissions.map((permission: string) => ({
354 roleId,
355 permission,
356 }))
357 );
358 }
359 },
360 };
361
362 private themeConfig: CollectionConfig<Theme.Record> = {
363 name: "Theme",
364 table: themes,
365 deleteStrategy: "hard",
366 toInsertValues: async (event, record) => ({
367 did: event.did,
368 rkey: event.commit.rkey,
369 cid: event.commit.cid,
370 name: record.name,
371 colorScheme: record.colorScheme as string,
372 tokens: record.tokens,
373 cssOverrides: (record.cssOverrides as string | undefined) ?? null,
374 fontUrls: (record.fontUrls as string[] | undefined) ?? null,
375 createdAt: new Date(record.createdAt as string),
376 indexedAt: new Date(),
377 }),
378 toUpdateValues: async (event, record) => ({
379 cid: event.commit.cid,
380 name: record.name,
381 colorScheme: record.colorScheme as string,
382 tokens: record.tokens,
383 cssOverrides: (record.cssOverrides as string | undefined) ?? null,
384 fontUrls: (record.fontUrls as string[] | undefined) ?? null,
385 indexedAt: new Date(),
386 }),
387 };
388
389 private themePolicyConfig: CollectionConfig<ThemePolicy.Record> = {
390 name: "ThemePolicy",
391 table: themePolicies,
392 deleteStrategy: "hard",
393 toInsertValues: async (event, record) => {
394 if (!record.defaultLightTheme?.uri || !record.defaultDarkTheme?.uri) {
395 this.logger.warn("ThemePolicy record missing required theme refs — skipping", {
396 did: event.did,
397 rkey: event.commit.rkey,
398 });
399 return null;
400 }
401 return {
402 did: event.did,
403 rkey: event.commit.rkey,
404 cid: event.commit.cid,
405 defaultLightThemeUri: record.defaultLightTheme.uri,
406 defaultDarkThemeUri: record.defaultDarkTheme.uri,
407 allowUserChoice: record.allowUserChoice,
408 indexedAt: new Date(),
409 };
410 },
411 toUpdateValues: async (event, record) => {
412 if (!record.defaultLightTheme?.uri || !record.defaultDarkTheme?.uri) {
413 this.logger.warn("ThemePolicy record missing required theme refs — skipping update", {
414 did: event.did,
415 rkey: event.commit.rkey,
416 });
417 return null;
418 }
419 return {
420 cid: event.commit.cid,
421 defaultLightThemeUri: record.defaultLightTheme.uri,
422 defaultDarkThemeUri: record.defaultDarkTheme.uri,
423 allowUserChoice: record.allowUserChoice,
424 indexedAt: new Date(),
425 };
426 },
427 afterUpsert: async (_event, record, policyId, tx) => {
428 // Atomically replace all available-theme rows for this policy
429 await tx
430 .delete(themePolicyAvailableThemes)
431 .where(eq(themePolicyAvailableThemes.policyId, policyId));
432
433 const available = record.availableThemes ?? [];
434 if (available.length > 0) {
435 await tx.insert(themePolicyAvailableThemes).values(
436 available.map((themeRef) => ({
437 policyId,
438 themeUri: themeRef.uri,
439 themeCid: themeRef.cid ?? null,
440 }))
441 );
442 }
443 },
444 };
445
446 private membershipConfig: CollectionConfig<Membership.Record> = {
447 name: "Membership",
448 table: memberships,
449 deleteStrategy: "hard",
450 ensureUserOnCreate: true,
451 toInsertValues: async (event, record, tx) => {
452 // Look up forum by URI (inside transaction)
453 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
454
455 if (!forumId) {
456 this.logger.warn("Membership: Forum not found", {
457 operation: "Membership CREATE",
458 forumUri: record.forum.forum.uri,
459 });
460 return null;
461 }
462
463 return {
464 did: event.did,
465 rkey: event.commit.rkey,
466 cid: event.commit.cid,
467 forumId,
468 forumUri: record.forum.forum.uri,
469 role: null, // TODO: Extract role name from roleUri or lexicon
470 roleUri: record.role?.role.uri ?? null,
471 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
472 createdAt: new Date(record.createdAt),
473 indexedAt: new Date(),
474 };
475 },
476 toUpdateValues: async (event, record, tx) => {
477 // Look up forum by URI (may have changed)
478 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx);
479
480 if (!forumId) {
481 this.logger.warn("Membership: Forum not found", {
482 operation: "Membership UPDATE",
483 forumUri: record.forum.forum.uri,
484 });
485 return null;
486 }
487
488 return {
489 cid: event.commit.cid,
490 forumId,
491 forumUri: record.forum.forum.uri,
492 role: null, // TODO: Extract role name from roleUri or lexicon
493 roleUri: record.role?.role.uri ?? null,
494 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null,
495 indexedAt: new Date(),
496 };
497 },
498 };
499
500 private modActionConfig: CollectionConfig<ModAction.Record> = {
501 name: "ModAction",
502 table: modActions,
503 deleteStrategy: "hard",
504 toInsertValues: async (event, record, tx) => {
505 // ModActions are owned by the Forum DID, so event.did IS the forum DID
506 const forumId = await this.getForumIdByDid(event.did, tx);
507
508 if (!forumId) {
509 this.logger.warn("ModAction: Forum not found for DID", {
510 operation: "ModAction CREATE",
511 did: event.did,
512 });
513 return null;
514 }
515
516 // Ensure moderator exists
517 await this.ensureUser(record.createdBy, tx);
518
519 // Determine subject type (post or user)
520 let subjectPostUri: string | null = null;
521 let subjectDid: string | null = null;
522
523 if (record.subject.post) {
524 subjectPostUri = record.subject.post.uri;
525 }
526 if (record.subject.did) {
527 subjectDid = record.subject.did;
528 }
529
530 return {
531 did: event.did,
532 rkey: event.commit.rkey,
533 cid: event.commit.cid,
534 forumId,
535 action: record.action,
536 subjectPostUri,
537 subjectDid,
538 reason: record.reason ?? null,
539 createdBy: record.createdBy,
540 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
541 createdAt: new Date(record.createdAt),
542 indexedAt: new Date(),
543 };
544 },
545 toUpdateValues: async (event, record, tx) => {
546 // ModActions are owned by the Forum DID, so event.did IS the forum DID
547 const forumId = await this.getForumIdByDid(event.did, tx);
548
549 if (!forumId) {
550 this.logger.warn("ModAction: Forum not found for DID", {
551 operation: "ModAction UPDATE",
552 did: event.did,
553 });
554 return null;
555 }
556
557 // Determine subject type (post or user)
558 let subjectPostUri: string | null = null;
559 let subjectDid: string | null = null;
560
561 if (record.subject.post) {
562 subjectPostUri = record.subject.post.uri;
563 }
564 if (record.subject.did) {
565 subjectDid = record.subject.did;
566 }
567
568 return {
569 cid: event.commit.cid,
570 forumId,
571 action: record.action,
572 subjectPostUri,
573 subjectDid,
574 reason: record.reason ?? null,
575 createdBy: record.createdBy,
576 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
577 indexedAt: new Date(),
578 };
579 },
580 };
581
582 // ── Generic Handler Methods ─────────────────────────────
583
584 /**
585 * Generic create handler. Wraps the insert in a transaction,
586 * optionally ensures the user exists, and delegates to the
587 * config's toInsertValues callback for collection-specific logic.
588 */
589 private async genericCreate<TRecord>(
590 config: CollectionConfig<TRecord>,
591 event: any
592 ): Promise<boolean> {
593 try {
594 const record = event.commit.record as unknown as TRecord;
595 let skipped = false;
596
597 await this.db.transaction(async (tx) => {
598 if (config.ensureUserOnCreate) {
599 await this.ensureUser(event.did, tx);
600 }
601
602 const values = await config.toInsertValues(event, record, tx);
603 if (!values) {
604 skipped = true;
605 return; // Skip insert (e.g. foreign key not found)
606 }
607
608 if (config.afterUpsert) {
609 const [inserted] = await tx
610 .insert(config.table)
611 .values(values)
612 .returning({ id: config.table.id });
613 await config.afterUpsert(event, record, inserted.id, tx);
614 } else {
615 await tx.insert(config.table).values(values);
616 }
617 });
618
619 // Only log success if insert actually happened
620 if (!skipped) {
621 this.logger.info(`${config.name} created`, {
622 did: event.did,
623 rkey: event.commit.rkey,
624 });
625 }
626 return !skipped;
627 } catch (error) {
628 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, {
629 did: event.did,
630 rkey: event.commit.rkey,
631 error: error instanceof Error ? error.message : String(error),
632 });
633 throw error;
634 }
635 }
636
637 /**
638 * Generic update handler. Wraps the update in a transaction
639 * and delegates to the config's toUpdateValues callback for
640 * collection-specific logic.
641 */
642 private async genericUpdate<TRecord>(
643 config: CollectionConfig<TRecord>,
644 event: any
645 ) {
646 try {
647 const record = event.commit.record as unknown as TRecord;
648 let skipped = false;
649
650 await this.db.transaction(async (tx) => {
651 const values = await config.toUpdateValues(event, record, tx);
652 if (!values) {
653 skipped = true;
654 return; // Skip update (e.g. foreign key not found)
655 }
656
657 if (config.afterUpsert) {
658 const [updated] = await tx
659 .update(config.table)
660 .set(values)
661 .where(
662 and(
663 eq(config.table.did, event.did),
664 eq(config.table.rkey, event.commit.rkey)
665 )
666 )
667 .returning({ id: config.table.id });
668 if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet
669 await config.afterUpsert(event, record, updated.id, tx);
670 } else {
671 await tx
672 .update(config.table)
673 .set(values)
674 .where(
675 and(
676 eq(config.table.did, event.did),
677 eq(config.table.rkey, event.commit.rkey)
678 )
679 );
680 }
681 });
682
683 // Only log success if update actually happened
684 if (!skipped) {
685 this.logger.info(`${config.name} updated`, {
686 did: event.did,
687 rkey: event.commit.rkey,
688 });
689 }
690 } catch (error) {
691 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, {
692 did: event.did,
693 rkey: event.commit.rkey,
694 error: error instanceof Error ? error.message : String(error),
695 });
696 throw error;
697 }
698 }
699
700 /**
701 * Generic delete handler. Hard-deletes a record (DELETE FROM).
702 * Posts use handlePostDelete instead (always tombstone).
703 */
704 private async genericDelete(config: CollectionConfig<any>, event: any) {
705 try {
706 await this.db
707 .delete(config.table)
708 .where(
709 and(
710 eq(config.table.did, event.did),
711 eq(config.table.rkey, event.commit.rkey)
712 )
713 );
714
715 this.logger.info(`${config.name} deleted`, {
716 did: event.did,
717 rkey: event.commit.rkey,
718 });
719 } catch (error) {
720 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, {
721 did: event.did,
722 rkey: event.commit.rkey,
723 error: error instanceof Error ? error.message : String(error),
724 });
725 throw error;
726 }
727 }
728
729 // ── Post Handlers ───────────────────────────────────────
730
731 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) {
732 const banned = await this.banEnforcer.isBanned(event.did);
733 if (banned) {
734 this.logger.info("Skipping post from banned user", {
735 did: event.did,
736 rkey: event.commit.rkey,
737 });
738 return;
739 }
740 await this.genericCreate(this.postConfig, event);
741 }
742
743 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) {
744 await this.genericUpdate(this.postConfig, event);
745 }
746
747 /**
748 * Handles a user-initiated post delete from the PDS.
749 * Always tombstones: replaces personal content with a placeholder and marks
750 * deletedByUser=true. The row is kept so threads referencing this post as
751 * their root or parent remain intact. Personal content is gone; structure is preserved.
752 */
753 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) {
754 const { did, commit: { rkey } } = event;
755 try {
756 await this.db
757 .update(posts)
758 .set({ text: "[user deleted this post]", deletedByUser: true })
759 .where(and(eq(posts.did, did), eq(posts.rkey, rkey)));
760 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey });
761 } catch (error) {
762 this.logger.error("Failed to tombstone post", {
763 did,
764 rkey,
765 error: error instanceof Error ? error.message : String(error),
766 });
767 throw error;
768 }
769 }
770
771 // ── Forum Handlers ──────────────────────────────────────
772
773 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) {
774 await this.genericCreate(this.forumConfig, event);
775 }
776
777 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) {
778 await this.genericUpdate(this.forumConfig, event);
779 }
780
781 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) {
782 await this.genericDelete(this.forumConfig, event);
783 }
784
785 // ── Category Handlers ───────────────────────────────────
786
787 async handleCategoryCreate(
788 event: CommitCreateEvent<"space.atbb.forum.category">
789 ) {
790 await this.genericCreate(this.categoryConfig, event);
791 }
792
793 async handleCategoryUpdate(
794 event: CommitUpdateEvent<"space.atbb.forum.category">
795 ) {
796 await this.genericUpdate(this.categoryConfig, event);
797 }
798
799 async handleCategoryDelete(
800 event: CommitDeleteEvent<"space.atbb.forum.category">
801 ) {
802 await this.genericDelete(this.categoryConfig, event);
803 }
804
805 // ── Board Handlers ──────────────────────────────────────
806
807 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) {
808 await this.genericCreate(this.boardConfig, event);
809 }
810
811 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) {
812 await this.genericUpdate(this.boardConfig, event);
813 }
814
815 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) {
816 await this.genericDelete(this.boardConfig, event);
817 }
818
819 // ── Role Handlers ───────────────────────────────────────
820
821 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) {
822 await this.genericCreate(this.roleConfig, event);
823 }
824
825 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) {
826 await this.genericUpdate(this.roleConfig, event);
827 }
828
829 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) {
830 await this.genericDelete(this.roleConfig, event);
831 }
832
833 // ── Theme Handlers ──────────────────────────────────────
834
835 async handleThemeCreate(event: CommitCreateEvent<"space.atbb.forum.theme">) {
836 await this.genericCreate(this.themeConfig, event);
837 }
838
839 async handleThemeUpdate(event: CommitUpdateEvent<"space.atbb.forum.theme">) {
840 await this.genericUpdate(this.themeConfig, event);
841 }
842
843 async handleThemeDelete(event: CommitDeleteEvent<"space.atbb.forum.theme">) {
844 await this.genericDelete(this.themeConfig, event);
845 }
846
847 // ── ThemePolicy Handlers ─────────────────────────────────
848
849 async handleThemePolicyCreate(event: CommitCreateEvent<"space.atbb.forum.themePolicy">) {
850 await this.genericCreate(this.themePolicyConfig, event);
851 }
852
853 async handleThemePolicyUpdate(event: CommitUpdateEvent<"space.atbb.forum.themePolicy">) {
854 await this.genericUpdate(this.themePolicyConfig, event);
855 }
856
857 async handleThemePolicyDelete(event: CommitDeleteEvent<"space.atbb.forum.themePolicy">) {
858 await this.genericDelete(this.themePolicyConfig, event);
859 }
860
861 // ── Membership Handlers ─────────────────────────────────
862
863 async handleMembershipCreate(
864 event: CommitCreateEvent<"space.atbb.membership">
865 ) {
866 await this.genericCreate(this.membershipConfig, event);
867 }
868
869 async handleMembershipUpdate(
870 event: CommitUpdateEvent<"space.atbb.membership">
871 ) {
872 await this.genericUpdate(this.membershipConfig, event);
873 }
874
875 async handleMembershipDelete(
876 event: CommitDeleteEvent<"space.atbb.membership">
877 ) {
878 await this.genericDelete(this.membershipConfig, event);
879 }
880
881 // ── ModAction Handlers ──────────────────────────────────
882
883 async handleModActionCreate(
884 event: CommitCreateEvent<"space.atbb.modAction">
885 ) {
886 const record = event.commit.record as unknown as ModAction.Record;
887 const isBan =
888 record.action === "space.atbb.modAction.ban" && record.subject.did;
889 const isUnban =
890 record.action === "space.atbb.modAction.unban" && record.subject.did;
891
892 try {
893 if (isBan) {
894 // Custom atomic path: insert ban record + applyBan in one transaction
895 let skipped = false;
896 await this.db.transaction(async (tx) => {
897 const forumId = await this.getForumIdByDid(event.did, tx);
898 if (!forumId) {
899 this.logger.warn("ModAction (ban): Forum not found for DID", {
900 operation: "ModAction CREATE",
901 did: event.did,
902 });
903 skipped = true;
904 return;
905 }
906 await this.ensureUser(record.createdBy, tx);
907 await tx.insert(modActions).values({
908 did: event.did,
909 rkey: event.commit.rkey,
910 cid: event.commit.cid,
911 forumId,
912 action: record.action,
913 subjectPostUri: null,
914 subjectDid: record.subject.did ?? null,
915 reason: record.reason ?? null,
916 createdBy: record.createdBy,
917 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
918 createdAt: new Date(record.createdAt),
919 indexedAt: new Date(),
920 });
921 await this.banEnforcer.applyBan(record.subject.did!, tx);
922 });
923 if (!skipped) {
924 this.logger.info("ModAction (ban) created", {
925 did: event.did,
926 rkey: event.commit.rkey,
927 });
928 }
929 } else if (isUnban) {
930 // Custom atomic path: insert unban record + liftBan in one transaction
931 let skipped = false;
932 await this.db.transaction(async (tx) => {
933 const forumId = await this.getForumIdByDid(event.did, tx);
934 if (!forumId) {
935 this.logger.warn("ModAction (unban): Forum not found for DID", {
936 operation: "ModAction CREATE",
937 did: event.did,
938 });
939 skipped = true;
940 return;
941 }
942 await this.ensureUser(record.createdBy, tx);
943 await tx.insert(modActions).values({
944 did: event.did,
945 rkey: event.commit.rkey,
946 cid: event.commit.cid,
947 forumId,
948 action: record.action,
949 subjectPostUri: null,
950 subjectDid: record.subject.did ?? null,
951 reason: record.reason ?? null,
952 createdBy: record.createdBy,
953 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null,
954 createdAt: new Date(record.createdAt),
955 indexedAt: new Date(),
956 });
957 await this.banEnforcer.liftBan(record.subject.did!, tx);
958 });
959 if (!skipped) {
960 this.logger.info("ModAction (unban) created", {
961 did: event.did,
962 rkey: event.commit.rkey,
963 });
964 }
965 } else {
966 // Generic path for all other mod actions (mute, pin, lock, delete, etc.)
967 await this.genericCreate(this.modActionConfig, event);
968
969 // Ban/unban without subject.did — shouldn't happen but log if it does
970 if (
971 record.action === "space.atbb.modAction.ban" ||
972 record.action === "space.atbb.modAction.unban"
973 ) {
974 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", {
975 did: event.did,
976 rkey: event.commit.rkey,
977 action: record.action,
978 });
979 }
980 }
981 } catch (error) {
982 this.logger.error("Failed to index ModAction create", {
983 did: event.did,
984 rkey: event.commit.rkey,
985 error: error instanceof Error ? error.message : String(error),
986 });
987 throw error;
988 }
989 }
990
991 async handleModActionUpdate(
992 event: CommitUpdateEvent<"space.atbb.modAction">
993 ) {
994 await this.genericUpdate(this.modActionConfig, event);
995 }
996
997 async handleModActionDelete(
998 event: CommitDeleteEvent<"space.atbb.modAction">
999 ) {
1000 try {
1001 await this.db.transaction(async (tx) => {
1002 // 1. Read before delete to capture action type and subject
1003 const [existing] = await tx
1004 .select({
1005 action: modActions.action,
1006 subjectDid: modActions.subjectDid,
1007 })
1008 .from(modActions)
1009 .where(
1010 and(
1011 eq(modActions.did, event.did),
1012 eq(modActions.rkey, event.commit.rkey)
1013 )
1014 )
1015 .limit(1);
1016
1017 // 2. Hard delete the record
1018 await tx
1019 .delete(modActions)
1020 .where(
1021 and(
1022 eq(modActions.did, event.did),
1023 eq(modActions.rkey, event.commit.rkey)
1024 )
1025 );
1026
1027 // 3. Restore posts if the deleted record was a ban
1028 if (
1029 existing?.action === "space.atbb.modAction.ban" &&
1030 existing?.subjectDid
1031 ) {
1032 await this.banEnforcer.liftBan(existing.subjectDid, tx);
1033 }
1034 });
1035
1036 this.logger.info("ModAction deleted", {
1037 did: event.did,
1038 rkey: event.commit.rkey,
1039 });
1040 } catch (error) {
1041 this.logger.error("Failed to delete modAction", {
1042 did: event.did,
1043 rkey: event.commit.rkey,
1044 error: error instanceof Error ? error.message : String(error),
1045 });
1046 throw error;
1047 }
1048 }
1049
1050 // ── Reaction Handlers (Stub) ────────────────────────────
1051
1052 async handleReactionCreate(
1053 event: CommitCreateEvent<"space.atbb.reaction">
1054 ) {
1055 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey });
1056 // TODO: Add reactions table to schema
1057 }
1058
1059 async handleReactionUpdate(
1060 event: CommitUpdateEvent<"space.atbb.reaction">
1061 ) {
1062 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey });
1063 // TODO: Add reactions table to schema
1064 }
1065
1066 async handleReactionDelete(
1067 event: CommitDeleteEvent<"space.atbb.reaction">
1068 ) {
1069 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey });
1070 // TODO: Add reactions table to schema
1071 }
1072
1073 // ── Helper Methods ──────────────────────────────────────
1074
1075 /**
1076 * Ensure a user exists in the database. Creates if not exists.
1077 * @param dbOrTx - Database instance or transaction
1078 */
1079 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) {
1080 try {
1081 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1);
1082
1083 if (existing.length === 0) {
1084 await dbOrTx.insert(users).values({
1085 did,
1086 handle: null, // Will be updated by identity events
1087 indexedAt: new Date(),
1088 });
1089 this.logger.info("Created user", { did });
1090 }
1091 } catch (error) {
1092 this.logger.error("Failed to ensure user exists", {
1093 did,
1094 error: error instanceof Error ? error.message : String(error),
1095 });
1096 throw error;
1097 }
1098 }
1099
1100 /**
1101 * Look up a forum ID by its AT URI
1102 * @param dbOrTx - Database instance or transaction
1103 */
1104 private async getForumIdByUri(
1105 forumUri: string,
1106 dbOrTx: DbOrTransaction = this.db
1107 ): Promise<bigint | null> {
1108 const parsed = parseAtUri(forumUri);
1109 if (!parsed) return null;
1110
1111 try {
1112 const result = await dbOrTx
1113 .select({ id: forums.id })
1114 .from(forums)
1115 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey)))
1116 .limit(1);
1117
1118 return result.length > 0 ? result[0].id : null;
1119 } catch (error) {
1120 this.logger.error("Database error in getForumIdByUri", {
1121 operation: "getForumIdByUri",
1122 forumUri,
1123 error: error instanceof Error ? error.message : String(error),
1124 });
1125 throw error;
1126 }
1127 }
1128
1129 /**
1130 * Look up a forum ID by the forum's DID
1131 * Used for records owned by the forum (categories, modActions)
1132 * @param dbOrTx - Database instance or transaction
1133 */
1134 private async getForumIdByDid(
1135 forumDid: string,
1136 dbOrTx: DbOrTransaction = this.db
1137 ): Promise<bigint | null> {
1138 try {
1139 const result = await dbOrTx
1140 .select({ id: forums.id })
1141 .from(forums)
1142 .where(eq(forums.did, forumDid))
1143 .limit(1);
1144
1145 return result.length > 0 ? result[0].id : null;
1146 } catch (error) {
1147 this.logger.error("Database error in getForumIdByDid", {
1148 operation: "getForumIdByDid",
1149 forumDid,
1150 error: error instanceof Error ? error.message : String(error),
1151 });
1152 throw error;
1153 }
1154 }
1155
1156 /**
1157 * Look up a post ID by its AT URI
1158 * @param dbOrTx - Database instance or transaction
1159 */
1160 private async getPostIdByUri(
1161 postUri: string,
1162 dbOrTx: DbOrTransaction = this.db
1163 ): Promise<bigint | null> {
1164 const parsed = parseAtUri(postUri);
1165 if (!parsed) return null;
1166
1167 try {
1168 const result = await dbOrTx
1169 .select({ id: posts.id })
1170 .from(posts)
1171 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey)))
1172 .limit(1);
1173
1174 return result.length > 0 ? result[0].id : null;
1175 } catch (error) {
1176 this.logger.error("Database error in getPostIdByUri", {
1177 operation: "getPostIdByUri",
1178 postUri,
1179 error: error instanceof Error ? error.message : String(error),
1180 });
1181 throw error;
1182 }
1183 }
1184
1185 /**
1186 * Look up board ID by AT URI (at://did/collection/rkey)
1187 * @param uri - AT URI of the board
1188 * @param dbOrTx - Database instance or transaction
1189 */
1190 private async getBoardIdByUri(
1191 uri: string,
1192 dbOrTx: DbOrTransaction = this.db
1193 ): Promise<bigint | null> {
1194 const parsed = parseAtUri(uri);
1195 if (!parsed) return null;
1196
1197 try {
1198 const [result] = await dbOrTx
1199 .select({ id: boards.id })
1200 .from(boards)
1201 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey)))
1202 .limit(1);
1203 return result?.id ?? null;
1204 } catch (error) {
1205 this.logger.error("Database error in getBoardIdByUri", {
1206 operation: "getBoardIdByUri",
1207 uri,
1208 did: parsed.did,
1209 rkey: parsed.rkey,
1210 error: error instanceof Error ? error.message : String(error),
1211 });
1212 throw error;
1213 }
1214 }
1215
1216 /**
1217 * Look up category ID by AT URI (at://did/collection/rkey)
1218 * @param uri - AT URI of the category
1219 * @param dbOrTx - Database instance or transaction
1220 */
1221 private async getCategoryIdByUri(
1222 uri: string,
1223 dbOrTx: DbOrTransaction = this.db
1224 ): Promise<bigint | null> {
1225 const parsed = parseAtUri(uri);
1226 if (!parsed) return null;
1227
1228 try {
1229 const [result] = await dbOrTx
1230 .select({ id: categories.id })
1231 .from(categories)
1232 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey)))
1233 .limit(1);
1234 return result?.id ?? null;
1235 } catch (error) {
1236 this.logger.error("Database error in getCategoryIdByUri", {
1237 operation: "getCategoryIdByUri",
1238 uri,
1239 did: parsed.did,
1240 rkey: parsed.rkey,
1241 error: error instanceof Error ? error.message : String(error),
1242 });
1243 throw error;
1244 }
1245 }
1246}