WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
at main 1246 lines 40 kB view raw
1import type { 2 CommitCreateEvent, 3 CommitDeleteEvent, 4 CommitUpdateEvent, 5} from "@skyware/jetstream"; 6import type { Database, DbOrTransaction } from "@atbb/db"; 7import type { Logger } from "@atbb/logger"; 8import { 9 posts, 10 forums, 11 categories, 12 boards, 13 users, 14 memberships, 15 modActions, 16 roles, 17 rolePermissions, 18 themes, 19 themePolicies, 20 themePolicyAvailableThemes, 21} from "@atbb/db"; 22import { eq, and } from "drizzle-orm"; 23import { parseAtUri } from "./at-uri.js"; 24import { BanEnforcer } from "./ban-enforcer.js"; 25import { 26 SpaceAtbbPost as Post, 27 SpaceAtbbForumForum as Forum, 28 SpaceAtbbForumCategory as Category, 29 SpaceAtbbForumBoard as Board, 30 SpaceAtbbMembership as Membership, 31 SpaceAtbbModAction as ModAction, 32 SpaceAtbbForumRole as Role, 33 SpaceAtbbForumTheme as Theme, 34 SpaceAtbbForumThemePolicy as ThemePolicy, 35} from "@atbb/lexicon"; 36 37// ── Collection Config Types ───────────────────────────── 38 39/** 40 * Configuration for a data-driven collection handler. 41 * Encodes the per-collection logic that differs across the 5 indexed types, 42 * while the generic handler methods supply the shared try/catch/log/throw scaffolding. 43 */ 44interface CollectionConfig<TRecord> { 45 /** Human-readable name for logging (e.g. "Post", "Forum") */ 46 name: string; 47 /** Drizzle table reference */ 48 table: any; 49 /** "hard" = DELETE FROM (all non-post collections) */ 50 deleteStrategy: "hard"; 51 /** Call ensureUser(event.did) before insert? (user-owned records) */ 52 ensureUserOnCreate?: boolean; 53 /** 54 * Transform event+record into DB insert values. 55 * Return null to skip the insert (e.g. when a required foreign key is missing). 56 */ 57 toInsertValues: ( 58 event: any, 59 record: TRecord, 60 tx: DbOrTransaction 61 ) => Promise<Record<string, any> | null>; 62 /** 63 * Transform event+record into DB update set values. 64 * Runs inside a transaction. Return null to skip the update. 65 */ 66 toUpdateValues: ( 67 event: any, 68 record: TRecord, 69 tx: DbOrTransaction 70 ) => Promise<Record<string, any> | null>; 71 /** 72 * Optional hook called after a row is inserted or updated, within the same 73 * transaction. Receives the row's numeric id (bigint) so callers can write 74 * to child tables (e.g. role_permissions). 75 */ 76 afterUpsert?: ( 77 event: any, 78 record: TRecord, 79 rowId: bigint, 80 tx: DbOrTransaction 81 ) => Promise<void>; 82} 83 84 85/** 86 * Indexer class for processing AT Proto firehose events 87 * Converts events into database records for the atBB AppView 88 */ 89export class Indexer { 90 private banEnforcer: BanEnforcer; 91 92 constructor(private db: Database, private logger: Logger) { 93 this.banEnforcer = new BanEnforcer(db, logger); 94 } 95 96 // ── Collection Configs ────────────────────────────────── 97 98 private postConfig: CollectionConfig<Post.Record> = { 99 name: "Post", 100 table: posts, 101 deleteStrategy: "hard", 102 ensureUserOnCreate: true, 103 toInsertValues: async (event, record, tx) => { 104 // Look up parent/root for replies 105 let rootId: bigint | null = null; 106 let parentId: bigint | null = null; 107 108 if (Post.isReplyRef(record.reply)) { 109 rootId = await this.getPostIdByUri(record.reply.root.uri, tx); 110 parentId = await this.getPostIdByUri(record.reply.parent.uri, tx); 111 } else if (record.reply) { 112 // reply ref present but $type omitted — rootPostId/parentPostId will be null, 113 // making this reply unreachable in thread navigation (data corruption). 114 this.logger.error("Post reply ref missing $type — rootPostId/parentPostId not resolved", { 115 operation: "Post CREATE", 116 postDid: event.did, 117 postRkey: event.commit.rkey, 118 errorId: "POST_REPLY_REF_MISSING_TYPE", 119 }); 120 } 121 122 // Look up board ID if board reference exists 123 let boardId: bigint | null = null; 124 if (record.board?.board.uri) { 125 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 126 if (!boardId) { 127 this.logger.error("Failed to index post: board not found", { 128 operation: "Post CREATE", 129 postDid: event.did, 130 postRkey: event.commit.rkey, 131 boardUri: record.board.board.uri, 132 errorId: "POST_BOARD_MISSING", 133 }); 134 throw new Error(`Board not found: ${record.board.board.uri}`); 135 } 136 } 137 138 return { 139 did: event.did, 140 rkey: event.commit.rkey, 141 cid: event.commit.cid, 142 title: record.reply ? null : (record.title ?? null), 143 text: record.text, 144 forumUri: record.forum?.forum.uri ?? null, 145 boardUri: record.board?.board.uri ?? null, 146 boardId, 147 rootPostId: rootId, 148 rootUri: record.reply?.root.uri ?? null, 149 parentPostId: parentId, 150 parentUri: record.reply?.parent.uri ?? null, 151 createdAt: new Date(record.createdAt), 152 indexedAt: new Date(), 153 }; 154 }, 155 toUpdateValues: async (event, record, tx) => { 156 // Look up board ID if board reference exists 157 let boardId: bigint | null = null; 158 if (record.board?.board.uri) { 159 boardId = await this.getBoardIdByUri(record.board.board.uri, tx); 160 if (!boardId) { 161 this.logger.error("Failed to index post: board not found", { 162 operation: "Post UPDATE", 163 postDid: event.did, 164 postRkey: event.commit.rkey, 165 boardUri: record.board.board.uri, 166 errorId: "POST_BOARD_MISSING", 167 }); 168 throw new Error(`Board not found: ${record.board.board.uri}`); 169 } 170 } 171 172 return { 173 cid: event.commit.cid, 174 title: record.reply ? null : (record.title ?? null), 175 text: record.text, 176 forumUri: record.forum?.forum.uri ?? null, 177 boardUri: record.board?.board.uri ?? null, 178 boardId, 179 indexedAt: new Date(), 180 }; 181 }, 182 }; 183 184 private forumConfig: CollectionConfig<Forum.Record> = { 185 name: "Forum", 186 table: forums, 187 deleteStrategy: "hard", 188 ensureUserOnCreate: true, 189 toInsertValues: async (event, record) => ({ 190 did: event.did, 191 rkey: event.commit.rkey, 192 cid: event.commit.cid, 193 name: record.name, 194 description: record.description ?? null, 195 indexedAt: new Date(), 196 }), 197 toUpdateValues: async (event, record) => ({ 198 cid: event.commit.cid, 199 name: record.name, 200 description: record.description ?? null, 201 indexedAt: new Date(), 202 }), 203 }; 204 205 private categoryConfig: CollectionConfig<Category.Record> = { 206 name: "Category", 207 table: categories, 208 deleteStrategy: "hard", 209 toInsertValues: async (event, record, tx) => { 210 // Categories are owned by the Forum DID, so event.did IS the forum DID 211 const forumId = await this.getForumIdByDid(event.did, tx); 212 213 if (!forumId) { 214 this.logger.warn("Category: Forum not found for DID", { 215 operation: "Category CREATE", 216 did: event.did, 217 }); 218 return null; 219 } 220 221 return { 222 did: event.did, 223 rkey: event.commit.rkey, 224 cid: event.commit.cid, 225 forumId, 226 name: record.name, 227 description: record.description ?? null, 228 slug: record.slug ?? null, 229 sortOrder: record.sortOrder ?? 0, 230 createdAt: new Date(record.createdAt), 231 indexedAt: new Date(), 232 }; 233 }, 234 toUpdateValues: async (event, record, tx) => { 235 // Categories are owned by the Forum DID, so event.did IS the forum DID 236 const forumId = await this.getForumIdByDid(event.did, tx); 237 238 if (!forumId) { 239 this.logger.warn("Category: Forum not found for DID", { 240 operation: "Category UPDATE", 241 did: event.did, 242 }); 243 return null; 244 } 245 246 return { 247 cid: event.commit.cid, 248 forumId, 249 name: record.name, 250 description: record.description ?? null, 251 slug: record.slug ?? null, 252 sortOrder: record.sortOrder ?? 0, 253 indexedAt: new Date(), 254 }; 255 }, 256 }; 257 258 private boardConfig: CollectionConfig<Board.Record> = { 259 name: "Board", 260 table: boards, 261 deleteStrategy: "hard", 262 toInsertValues: async (event, record, tx) => { 263 // Boards are owned by Forum DID 264 const categoryId = await this.getCategoryIdByUri( 265 record.category.category.uri, 266 tx 267 ); 268 269 if (!categoryId) { 270 this.logger.error("Failed to index board: category not found", { 271 operation: "Board CREATE", 272 boardDid: event.did, 273 boardRkey: event.commit.rkey, 274 categoryUri: record.category.category.uri, 275 errorId: "BOARD_CATEGORY_MISSING", 276 }); 277 throw new Error(`Category not found: ${record.category.category.uri}`); 278 } 279 280 return { 281 did: event.did, 282 rkey: event.commit.rkey, 283 cid: event.commit.cid, 284 name: record.name, 285 description: record.description ?? null, 286 slug: record.slug ?? null, 287 sortOrder: record.sortOrder ?? null, 288 categoryId, 289 categoryUri: record.category.category.uri, 290 createdAt: new Date(record.createdAt), 291 indexedAt: new Date(), 292 }; 293 }, 294 toUpdateValues: async (event, record, tx) => { 295 const categoryId = await this.getCategoryIdByUri( 296 record.category.category.uri, 297 tx 298 ); 299 300 if (!categoryId) { 301 this.logger.error("Failed to index board: category not found", { 302 operation: "Board UPDATE", 303 boardDid: event.did, 304 boardRkey: event.commit.rkey, 305 categoryUri: record.category.category.uri, 306 errorId: "BOARD_CATEGORY_MISSING", 307 }); 308 throw new Error(`Category not found: ${record.category.category.uri}`); 309 } 310 311 return { 312 cid: event.commit.cid, 313 name: record.name, 314 description: record.description ?? null, 315 slug: record.slug ?? null, 316 sortOrder: record.sortOrder ?? null, 317 categoryId, 318 categoryUri: record.category.category.uri, 319 indexedAt: new Date(), 320 }; 321 }, 322 }; 323 324 private roleConfig: CollectionConfig<Role.Record> = { 325 name: "Role", 326 table: roles, 327 deleteStrategy: "hard", 328 toInsertValues: async (event, record) => ({ 329 did: event.did, 330 rkey: event.commit.rkey, 331 cid: event.commit.cid, 332 name: record.name, 333 description: record.description ?? null, 334 priority: record.priority, 335 createdAt: new Date(record.createdAt), 336 indexedAt: new Date(), 337 }), 338 toUpdateValues: async (event, record) => ({ 339 cid: event.commit.cid, 340 name: record.name, 341 description: record.description ?? null, 342 priority: record.priority, 343 indexedAt: new Date(), 344 }), 345 afterUpsert: async (event, record, roleId, tx) => { 346 // Replace all permissions for this role atomically 347 await tx 348 .delete(rolePermissions) 349 .where(eq(rolePermissions.roleId, roleId)); 350 351 if (record.permissions && record.permissions.length > 0) { 352 await tx.insert(rolePermissions).values( 353 record.permissions.map((permission: string) => ({ 354 roleId, 355 permission, 356 })) 357 ); 358 } 359 }, 360 }; 361 362 private themeConfig: CollectionConfig<Theme.Record> = { 363 name: "Theme", 364 table: themes, 365 deleteStrategy: "hard", 366 toInsertValues: async (event, record) => ({ 367 did: event.did, 368 rkey: event.commit.rkey, 369 cid: event.commit.cid, 370 name: record.name, 371 colorScheme: record.colorScheme as string, 372 tokens: record.tokens, 373 cssOverrides: (record.cssOverrides as string | undefined) ?? null, 374 fontUrls: (record.fontUrls as string[] | undefined) ?? null, 375 createdAt: new Date(record.createdAt as string), 376 indexedAt: new Date(), 377 }), 378 toUpdateValues: async (event, record) => ({ 379 cid: event.commit.cid, 380 name: record.name, 381 colorScheme: record.colorScheme as string, 382 tokens: record.tokens, 383 cssOverrides: (record.cssOverrides as string | undefined) ?? null, 384 fontUrls: (record.fontUrls as string[] | undefined) ?? null, 385 indexedAt: new Date(), 386 }), 387 }; 388 389 private themePolicyConfig: CollectionConfig<ThemePolicy.Record> = { 390 name: "ThemePolicy", 391 table: themePolicies, 392 deleteStrategy: "hard", 393 toInsertValues: async (event, record) => { 394 if (!record.defaultLightTheme?.uri || !record.defaultDarkTheme?.uri) { 395 this.logger.warn("ThemePolicy record missing required theme refs — skipping", { 396 did: event.did, 397 rkey: event.commit.rkey, 398 }); 399 return null; 400 } 401 return { 402 did: event.did, 403 rkey: event.commit.rkey, 404 cid: event.commit.cid, 405 defaultLightThemeUri: record.defaultLightTheme.uri, 406 defaultDarkThemeUri: record.defaultDarkTheme.uri, 407 allowUserChoice: record.allowUserChoice, 408 indexedAt: new Date(), 409 }; 410 }, 411 toUpdateValues: async (event, record) => { 412 if (!record.defaultLightTheme?.uri || !record.defaultDarkTheme?.uri) { 413 this.logger.warn("ThemePolicy record missing required theme refs — skipping update", { 414 did: event.did, 415 rkey: event.commit.rkey, 416 }); 417 return null; 418 } 419 return { 420 cid: event.commit.cid, 421 defaultLightThemeUri: record.defaultLightTheme.uri, 422 defaultDarkThemeUri: record.defaultDarkTheme.uri, 423 allowUserChoice: record.allowUserChoice, 424 indexedAt: new Date(), 425 }; 426 }, 427 afterUpsert: async (_event, record, policyId, tx) => { 428 // Atomically replace all available-theme rows for this policy 429 await tx 430 .delete(themePolicyAvailableThemes) 431 .where(eq(themePolicyAvailableThemes.policyId, policyId)); 432 433 const available = record.availableThemes ?? []; 434 if (available.length > 0) { 435 await tx.insert(themePolicyAvailableThemes).values( 436 available.map((themeRef) => ({ 437 policyId, 438 themeUri: themeRef.uri, 439 themeCid: themeRef.cid ?? null, 440 })) 441 ); 442 } 443 }, 444 }; 445 446 private membershipConfig: CollectionConfig<Membership.Record> = { 447 name: "Membership", 448 table: memberships, 449 deleteStrategy: "hard", 450 ensureUserOnCreate: true, 451 toInsertValues: async (event, record, tx) => { 452 // Look up forum by URI (inside transaction) 453 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 454 455 if (!forumId) { 456 this.logger.warn("Membership: Forum not found", { 457 operation: "Membership CREATE", 458 forumUri: record.forum.forum.uri, 459 }); 460 return null; 461 } 462 463 return { 464 did: event.did, 465 rkey: event.commit.rkey, 466 cid: event.commit.cid, 467 forumId, 468 forumUri: record.forum.forum.uri, 469 role: null, // TODO: Extract role name from roleUri or lexicon 470 roleUri: record.role?.role.uri ?? null, 471 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 472 createdAt: new Date(record.createdAt), 473 indexedAt: new Date(), 474 }; 475 }, 476 toUpdateValues: async (event, record, tx) => { 477 // Look up forum by URI (may have changed) 478 const forumId = await this.getForumIdByUri(record.forum.forum.uri, tx); 479 480 if (!forumId) { 481 this.logger.warn("Membership: Forum not found", { 482 operation: "Membership UPDATE", 483 forumUri: record.forum.forum.uri, 484 }); 485 return null; 486 } 487 488 return { 489 cid: event.commit.cid, 490 forumId, 491 forumUri: record.forum.forum.uri, 492 role: null, // TODO: Extract role name from roleUri or lexicon 493 roleUri: record.role?.role.uri ?? null, 494 joinedAt: record.joinedAt ? new Date(record.joinedAt) : null, 495 indexedAt: new Date(), 496 }; 497 }, 498 }; 499 500 private modActionConfig: CollectionConfig<ModAction.Record> = { 501 name: "ModAction", 502 table: modActions, 503 deleteStrategy: "hard", 504 toInsertValues: async (event, record, tx) => { 505 // ModActions are owned by the Forum DID, so event.did IS the forum DID 506 const forumId = await this.getForumIdByDid(event.did, tx); 507 508 if (!forumId) { 509 this.logger.warn("ModAction: Forum not found for DID", { 510 operation: "ModAction CREATE", 511 did: event.did, 512 }); 513 return null; 514 } 515 516 // Ensure moderator exists 517 await this.ensureUser(record.createdBy, tx); 518 519 // Determine subject type (post or user) 520 let subjectPostUri: string | null = null; 521 let subjectDid: string | null = null; 522 523 if (record.subject.post) { 524 subjectPostUri = record.subject.post.uri; 525 } 526 if (record.subject.did) { 527 subjectDid = record.subject.did; 528 } 529 530 return { 531 did: event.did, 532 rkey: event.commit.rkey, 533 cid: event.commit.cid, 534 forumId, 535 action: record.action, 536 subjectPostUri, 537 subjectDid, 538 reason: record.reason ?? null, 539 createdBy: record.createdBy, 540 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 541 createdAt: new Date(record.createdAt), 542 indexedAt: new Date(), 543 }; 544 }, 545 toUpdateValues: async (event, record, tx) => { 546 // ModActions are owned by the Forum DID, so event.did IS the forum DID 547 const forumId = await this.getForumIdByDid(event.did, tx); 548 549 if (!forumId) { 550 this.logger.warn("ModAction: Forum not found for DID", { 551 operation: "ModAction UPDATE", 552 did: event.did, 553 }); 554 return null; 555 } 556 557 // Determine subject type (post or user) 558 let subjectPostUri: string | null = null; 559 let subjectDid: string | null = null; 560 561 if (record.subject.post) { 562 subjectPostUri = record.subject.post.uri; 563 } 564 if (record.subject.did) { 565 subjectDid = record.subject.did; 566 } 567 568 return { 569 cid: event.commit.cid, 570 forumId, 571 action: record.action, 572 subjectPostUri, 573 subjectDid, 574 reason: record.reason ?? null, 575 createdBy: record.createdBy, 576 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 577 indexedAt: new Date(), 578 }; 579 }, 580 }; 581 582 // ── Generic Handler Methods ───────────────────────────── 583 584 /** 585 * Generic create handler. Wraps the insert in a transaction, 586 * optionally ensures the user exists, and delegates to the 587 * config's toInsertValues callback for collection-specific logic. 588 */ 589 private async genericCreate<TRecord>( 590 config: CollectionConfig<TRecord>, 591 event: any 592 ): Promise<boolean> { 593 try { 594 const record = event.commit.record as unknown as TRecord; 595 let skipped = false; 596 597 await this.db.transaction(async (tx) => { 598 if (config.ensureUserOnCreate) { 599 await this.ensureUser(event.did, tx); 600 } 601 602 const values = await config.toInsertValues(event, record, tx); 603 if (!values) { 604 skipped = true; 605 return; // Skip insert (e.g. foreign key not found) 606 } 607 608 if (config.afterUpsert) { 609 const [inserted] = await tx 610 .insert(config.table) 611 .values(values) 612 .returning({ id: config.table.id }); 613 await config.afterUpsert(event, record, inserted.id, tx); 614 } else { 615 await tx.insert(config.table).values(values); 616 } 617 }); 618 619 // Only log success if insert actually happened 620 if (!skipped) { 621 this.logger.info(`${config.name} created`, { 622 did: event.did, 623 rkey: event.commit.rkey, 624 }); 625 } 626 return !skipped; 627 } catch (error) { 628 this.logger.error(`Failed to index ${config.name.toLowerCase()} create`, { 629 did: event.did, 630 rkey: event.commit.rkey, 631 error: error instanceof Error ? error.message : String(error), 632 }); 633 throw error; 634 } 635 } 636 637 /** 638 * Generic update handler. Wraps the update in a transaction 639 * and delegates to the config's toUpdateValues callback for 640 * collection-specific logic. 641 */ 642 private async genericUpdate<TRecord>( 643 config: CollectionConfig<TRecord>, 644 event: any 645 ) { 646 try { 647 const record = event.commit.record as unknown as TRecord; 648 let skipped = false; 649 650 await this.db.transaction(async (tx) => { 651 const values = await config.toUpdateValues(event, record, tx); 652 if (!values) { 653 skipped = true; 654 return; // Skip update (e.g. foreign key not found) 655 } 656 657 if (config.afterUpsert) { 658 const [updated] = await tx 659 .update(config.table) 660 .set(values) 661 .where( 662 and( 663 eq(config.table.did, event.did), 664 eq(config.table.rkey, event.commit.rkey) 665 ) 666 ) 667 .returning({ id: config.table.id }); 668 if (!updated) return; // Out-of-order UPDATE before CREATE: no row to update yet 669 await config.afterUpsert(event, record, updated.id, tx); 670 } else { 671 await tx 672 .update(config.table) 673 .set(values) 674 .where( 675 and( 676 eq(config.table.did, event.did), 677 eq(config.table.rkey, event.commit.rkey) 678 ) 679 ); 680 } 681 }); 682 683 // Only log success if update actually happened 684 if (!skipped) { 685 this.logger.info(`${config.name} updated`, { 686 did: event.did, 687 rkey: event.commit.rkey, 688 }); 689 } 690 } catch (error) { 691 this.logger.error(`Failed to update ${config.name.toLowerCase()}`, { 692 did: event.did, 693 rkey: event.commit.rkey, 694 error: error instanceof Error ? error.message : String(error), 695 }); 696 throw error; 697 } 698 } 699 700 /** 701 * Generic delete handler. Hard-deletes a record (DELETE FROM). 702 * Posts use handlePostDelete instead (always tombstone). 703 */ 704 private async genericDelete(config: CollectionConfig<any>, event: any) { 705 try { 706 await this.db 707 .delete(config.table) 708 .where( 709 and( 710 eq(config.table.did, event.did), 711 eq(config.table.rkey, event.commit.rkey) 712 ) 713 ); 714 715 this.logger.info(`${config.name} deleted`, { 716 did: event.did, 717 rkey: event.commit.rkey, 718 }); 719 } catch (error) { 720 this.logger.error(`Failed to delete ${config.name.toLowerCase()}`, { 721 did: event.did, 722 rkey: event.commit.rkey, 723 error: error instanceof Error ? error.message : String(error), 724 }); 725 throw error; 726 } 727 } 728 729 // ── Post Handlers ─────────────────────────────────────── 730 731 async handlePostCreate(event: CommitCreateEvent<"space.atbb.post">) { 732 const banned = await this.banEnforcer.isBanned(event.did); 733 if (banned) { 734 this.logger.info("Skipping post from banned user", { 735 did: event.did, 736 rkey: event.commit.rkey, 737 }); 738 return; 739 } 740 await this.genericCreate(this.postConfig, event); 741 } 742 743 async handlePostUpdate(event: CommitUpdateEvent<"space.atbb.post">) { 744 await this.genericUpdate(this.postConfig, event); 745 } 746 747 /** 748 * Handles a user-initiated post delete from the PDS. 749 * Always tombstones: replaces personal content with a placeholder and marks 750 * deletedByUser=true. The row is kept so threads referencing this post as 751 * their root or parent remain intact. Personal content is gone; structure is preserved. 752 */ 753 async handlePostDelete(event: CommitDeleteEvent<"space.atbb.post">) { 754 const { did, commit: { rkey } } = event; 755 try { 756 await this.db 757 .update(posts) 758 .set({ text: "[user deleted this post]", deletedByUser: true }) 759 .where(and(eq(posts.did, did), eq(posts.rkey, rkey))); 760 this.logger.info("Post tombstoned: content replaced, structure preserved", { did, rkey }); 761 } catch (error) { 762 this.logger.error("Failed to tombstone post", { 763 did, 764 rkey, 765 error: error instanceof Error ? error.message : String(error), 766 }); 767 throw error; 768 } 769 } 770 771 // ── Forum Handlers ────────────────────────────────────── 772 773 async handleForumCreate(event: CommitCreateEvent<"space.atbb.forum.forum">) { 774 await this.genericCreate(this.forumConfig, event); 775 } 776 777 async handleForumUpdate(event: CommitUpdateEvent<"space.atbb.forum.forum">) { 778 await this.genericUpdate(this.forumConfig, event); 779 } 780 781 async handleForumDelete(event: CommitDeleteEvent<"space.atbb.forum.forum">) { 782 await this.genericDelete(this.forumConfig, event); 783 } 784 785 // ── Category Handlers ─────────────────────────────────── 786 787 async handleCategoryCreate( 788 event: CommitCreateEvent<"space.atbb.forum.category"> 789 ) { 790 await this.genericCreate(this.categoryConfig, event); 791 } 792 793 async handleCategoryUpdate( 794 event: CommitUpdateEvent<"space.atbb.forum.category"> 795 ) { 796 await this.genericUpdate(this.categoryConfig, event); 797 } 798 799 async handleCategoryDelete( 800 event: CommitDeleteEvent<"space.atbb.forum.category"> 801 ) { 802 await this.genericDelete(this.categoryConfig, event); 803 } 804 805 // ── Board Handlers ────────────────────────────────────── 806 807 async handleBoardCreate(event: CommitCreateEvent<"space.atbb.forum.board">) { 808 await this.genericCreate(this.boardConfig, event); 809 } 810 811 async handleBoardUpdate(event: CommitUpdateEvent<"space.atbb.forum.board">) { 812 await this.genericUpdate(this.boardConfig, event); 813 } 814 815 async handleBoardDelete(event: CommitDeleteEvent<"space.atbb.forum.board">) { 816 await this.genericDelete(this.boardConfig, event); 817 } 818 819 // ── Role Handlers ─────────────────────────────────────── 820 821 async handleRoleCreate(event: CommitCreateEvent<"space.atbb.forum.role">) { 822 await this.genericCreate(this.roleConfig, event); 823 } 824 825 async handleRoleUpdate(event: CommitUpdateEvent<"space.atbb.forum.role">) { 826 await this.genericUpdate(this.roleConfig, event); 827 } 828 829 async handleRoleDelete(event: CommitDeleteEvent<"space.atbb.forum.role">) { 830 await this.genericDelete(this.roleConfig, event); 831 } 832 833 // ── Theme Handlers ────────────────────────────────────── 834 835 async handleThemeCreate(event: CommitCreateEvent<"space.atbb.forum.theme">) { 836 await this.genericCreate(this.themeConfig, event); 837 } 838 839 async handleThemeUpdate(event: CommitUpdateEvent<"space.atbb.forum.theme">) { 840 await this.genericUpdate(this.themeConfig, event); 841 } 842 843 async handleThemeDelete(event: CommitDeleteEvent<"space.atbb.forum.theme">) { 844 await this.genericDelete(this.themeConfig, event); 845 } 846 847 // ── ThemePolicy Handlers ───────────────────────────────── 848 849 async handleThemePolicyCreate(event: CommitCreateEvent<"space.atbb.forum.themePolicy">) { 850 await this.genericCreate(this.themePolicyConfig, event); 851 } 852 853 async handleThemePolicyUpdate(event: CommitUpdateEvent<"space.atbb.forum.themePolicy">) { 854 await this.genericUpdate(this.themePolicyConfig, event); 855 } 856 857 async handleThemePolicyDelete(event: CommitDeleteEvent<"space.atbb.forum.themePolicy">) { 858 await this.genericDelete(this.themePolicyConfig, event); 859 } 860 861 // ── Membership Handlers ───────────────────────────────── 862 863 async handleMembershipCreate( 864 event: CommitCreateEvent<"space.atbb.membership"> 865 ) { 866 await this.genericCreate(this.membershipConfig, event); 867 } 868 869 async handleMembershipUpdate( 870 event: CommitUpdateEvent<"space.atbb.membership"> 871 ) { 872 await this.genericUpdate(this.membershipConfig, event); 873 } 874 875 async handleMembershipDelete( 876 event: CommitDeleteEvent<"space.atbb.membership"> 877 ) { 878 await this.genericDelete(this.membershipConfig, event); 879 } 880 881 // ── ModAction Handlers ────────────────────────────────── 882 883 async handleModActionCreate( 884 event: CommitCreateEvent<"space.atbb.modAction"> 885 ) { 886 const record = event.commit.record as unknown as ModAction.Record; 887 const isBan = 888 record.action === "space.atbb.modAction.ban" && record.subject.did; 889 const isUnban = 890 record.action === "space.atbb.modAction.unban" && record.subject.did; 891 892 try { 893 if (isBan) { 894 // Custom atomic path: insert ban record + applyBan in one transaction 895 let skipped = false; 896 await this.db.transaction(async (tx) => { 897 const forumId = await this.getForumIdByDid(event.did, tx); 898 if (!forumId) { 899 this.logger.warn("ModAction (ban): Forum not found for DID", { 900 operation: "ModAction CREATE", 901 did: event.did, 902 }); 903 skipped = true; 904 return; 905 } 906 await this.ensureUser(record.createdBy, tx); 907 await tx.insert(modActions).values({ 908 did: event.did, 909 rkey: event.commit.rkey, 910 cid: event.commit.cid, 911 forumId, 912 action: record.action, 913 subjectPostUri: null, 914 subjectDid: record.subject.did ?? null, 915 reason: record.reason ?? null, 916 createdBy: record.createdBy, 917 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 918 createdAt: new Date(record.createdAt), 919 indexedAt: new Date(), 920 }); 921 await this.banEnforcer.applyBan(record.subject.did!, tx); 922 }); 923 if (!skipped) { 924 this.logger.info("ModAction (ban) created", { 925 did: event.did, 926 rkey: event.commit.rkey, 927 }); 928 } 929 } else if (isUnban) { 930 // Custom atomic path: insert unban record + liftBan in one transaction 931 let skipped = false; 932 await this.db.transaction(async (tx) => { 933 const forumId = await this.getForumIdByDid(event.did, tx); 934 if (!forumId) { 935 this.logger.warn("ModAction (unban): Forum not found for DID", { 936 operation: "ModAction CREATE", 937 did: event.did, 938 }); 939 skipped = true; 940 return; 941 } 942 await this.ensureUser(record.createdBy, tx); 943 await tx.insert(modActions).values({ 944 did: event.did, 945 rkey: event.commit.rkey, 946 cid: event.commit.cid, 947 forumId, 948 action: record.action, 949 subjectPostUri: null, 950 subjectDid: record.subject.did ?? null, 951 reason: record.reason ?? null, 952 createdBy: record.createdBy, 953 expiresAt: record.expiresAt ? new Date(record.expiresAt) : null, 954 createdAt: new Date(record.createdAt), 955 indexedAt: new Date(), 956 }); 957 await this.banEnforcer.liftBan(record.subject.did!, tx); 958 }); 959 if (!skipped) { 960 this.logger.info("ModAction (unban) created", { 961 did: event.did, 962 rkey: event.commit.rkey, 963 }); 964 } 965 } else { 966 // Generic path for all other mod actions (mute, pin, lock, delete, etc.) 967 await this.genericCreate(this.modActionConfig, event); 968 969 // Ban/unban without subject.did — shouldn't happen but log if it does 970 if ( 971 record.action === "space.atbb.modAction.ban" || 972 record.action === "space.atbb.modAction.unban" 973 ) { 974 this.logger.warn("ModAction: ban/unban action missing subject.did, skipping enforcement", { 975 did: event.did, 976 rkey: event.commit.rkey, 977 action: record.action, 978 }); 979 } 980 } 981 } catch (error) { 982 this.logger.error("Failed to index ModAction create", { 983 did: event.did, 984 rkey: event.commit.rkey, 985 error: error instanceof Error ? error.message : String(error), 986 }); 987 throw error; 988 } 989 } 990 991 async handleModActionUpdate( 992 event: CommitUpdateEvent<"space.atbb.modAction"> 993 ) { 994 await this.genericUpdate(this.modActionConfig, event); 995 } 996 997 async handleModActionDelete( 998 event: CommitDeleteEvent<"space.atbb.modAction"> 999 ) { 1000 try { 1001 await this.db.transaction(async (tx) => { 1002 // 1. Read before delete to capture action type and subject 1003 const [existing] = await tx 1004 .select({ 1005 action: modActions.action, 1006 subjectDid: modActions.subjectDid, 1007 }) 1008 .from(modActions) 1009 .where( 1010 and( 1011 eq(modActions.did, event.did), 1012 eq(modActions.rkey, event.commit.rkey) 1013 ) 1014 ) 1015 .limit(1); 1016 1017 // 2. Hard delete the record 1018 await tx 1019 .delete(modActions) 1020 .where( 1021 and( 1022 eq(modActions.did, event.did), 1023 eq(modActions.rkey, event.commit.rkey) 1024 ) 1025 ); 1026 1027 // 3. Restore posts if the deleted record was a ban 1028 if ( 1029 existing?.action === "space.atbb.modAction.ban" && 1030 existing?.subjectDid 1031 ) { 1032 await this.banEnforcer.liftBan(existing.subjectDid, tx); 1033 } 1034 }); 1035 1036 this.logger.info("ModAction deleted", { 1037 did: event.did, 1038 rkey: event.commit.rkey, 1039 }); 1040 } catch (error) { 1041 this.logger.error("Failed to delete modAction", { 1042 did: event.did, 1043 rkey: event.commit.rkey, 1044 error: error instanceof Error ? error.message : String(error), 1045 }); 1046 throw error; 1047 } 1048 } 1049 1050 // ── Reaction Handlers (Stub) ──────────────────────────── 1051 1052 async handleReactionCreate( 1053 event: CommitCreateEvent<"space.atbb.reaction"> 1054 ) { 1055 this.logger.warn("Reaction created (not implemented)", { did: event.did, rkey: event.commit.rkey }); 1056 // TODO: Add reactions table to schema 1057 } 1058 1059 async handleReactionUpdate( 1060 event: CommitUpdateEvent<"space.atbb.reaction"> 1061 ) { 1062 this.logger.warn("Reaction updated (not implemented)", { did: event.did, rkey: event.commit.rkey }); 1063 // TODO: Add reactions table to schema 1064 } 1065 1066 async handleReactionDelete( 1067 event: CommitDeleteEvent<"space.atbb.reaction"> 1068 ) { 1069 this.logger.warn("Reaction deleted (not implemented)", { did: event.did, rkey: event.commit.rkey }); 1070 // TODO: Add reactions table to schema 1071 } 1072 1073 // ── Helper Methods ────────────────────────────────────── 1074 1075 /** 1076 * Ensure a user exists in the database. Creates if not exists. 1077 * @param dbOrTx - Database instance or transaction 1078 */ 1079 private async ensureUser(did: string, dbOrTx: DbOrTransaction = this.db) { 1080 try { 1081 const existing = await dbOrTx.select().from(users).where(eq(users.did, did)).limit(1); 1082 1083 if (existing.length === 0) { 1084 await dbOrTx.insert(users).values({ 1085 did, 1086 handle: null, // Will be updated by identity events 1087 indexedAt: new Date(), 1088 }); 1089 this.logger.info("Created user", { did }); 1090 } 1091 } catch (error) { 1092 this.logger.error("Failed to ensure user exists", { 1093 did, 1094 error: error instanceof Error ? error.message : String(error), 1095 }); 1096 throw error; 1097 } 1098 } 1099 1100 /** 1101 * Look up a forum ID by its AT URI 1102 * @param dbOrTx - Database instance or transaction 1103 */ 1104 private async getForumIdByUri( 1105 forumUri: string, 1106 dbOrTx: DbOrTransaction = this.db 1107 ): Promise<bigint | null> { 1108 const parsed = parseAtUri(forumUri); 1109 if (!parsed) return null; 1110 1111 try { 1112 const result = await dbOrTx 1113 .select({ id: forums.id }) 1114 .from(forums) 1115 .where(and(eq(forums.did, parsed.did), eq(forums.rkey, parsed.rkey))) 1116 .limit(1); 1117 1118 return result.length > 0 ? result[0].id : null; 1119 } catch (error) { 1120 this.logger.error("Database error in getForumIdByUri", { 1121 operation: "getForumIdByUri", 1122 forumUri, 1123 error: error instanceof Error ? error.message : String(error), 1124 }); 1125 throw error; 1126 } 1127 } 1128 1129 /** 1130 * Look up a forum ID by the forum's DID 1131 * Used for records owned by the forum (categories, modActions) 1132 * @param dbOrTx - Database instance or transaction 1133 */ 1134 private async getForumIdByDid( 1135 forumDid: string, 1136 dbOrTx: DbOrTransaction = this.db 1137 ): Promise<bigint | null> { 1138 try { 1139 const result = await dbOrTx 1140 .select({ id: forums.id }) 1141 .from(forums) 1142 .where(eq(forums.did, forumDid)) 1143 .limit(1); 1144 1145 return result.length > 0 ? result[0].id : null; 1146 } catch (error) { 1147 this.logger.error("Database error in getForumIdByDid", { 1148 operation: "getForumIdByDid", 1149 forumDid, 1150 error: error instanceof Error ? error.message : String(error), 1151 }); 1152 throw error; 1153 } 1154 } 1155 1156 /** 1157 * Look up a post ID by its AT URI 1158 * @param dbOrTx - Database instance or transaction 1159 */ 1160 private async getPostIdByUri( 1161 postUri: string, 1162 dbOrTx: DbOrTransaction = this.db 1163 ): Promise<bigint | null> { 1164 const parsed = parseAtUri(postUri); 1165 if (!parsed) return null; 1166 1167 try { 1168 const result = await dbOrTx 1169 .select({ id: posts.id }) 1170 .from(posts) 1171 .where(and(eq(posts.did, parsed.did), eq(posts.rkey, parsed.rkey))) 1172 .limit(1); 1173 1174 return result.length > 0 ? result[0].id : null; 1175 } catch (error) { 1176 this.logger.error("Database error in getPostIdByUri", { 1177 operation: "getPostIdByUri", 1178 postUri, 1179 error: error instanceof Error ? error.message : String(error), 1180 }); 1181 throw error; 1182 } 1183 } 1184 1185 /** 1186 * Look up board ID by AT URI (at://did/collection/rkey) 1187 * @param uri - AT URI of the board 1188 * @param dbOrTx - Database instance or transaction 1189 */ 1190 private async getBoardIdByUri( 1191 uri: string, 1192 dbOrTx: DbOrTransaction = this.db 1193 ): Promise<bigint | null> { 1194 const parsed = parseAtUri(uri); 1195 if (!parsed) return null; 1196 1197 try { 1198 const [result] = await dbOrTx 1199 .select({ id: boards.id }) 1200 .from(boards) 1201 .where(and(eq(boards.did, parsed.did), eq(boards.rkey, parsed.rkey))) 1202 .limit(1); 1203 return result?.id ?? null; 1204 } catch (error) { 1205 this.logger.error("Database error in getBoardIdByUri", { 1206 operation: "getBoardIdByUri", 1207 uri, 1208 did: parsed.did, 1209 rkey: parsed.rkey, 1210 error: error instanceof Error ? error.message : String(error), 1211 }); 1212 throw error; 1213 } 1214 } 1215 1216 /** 1217 * Look up category ID by AT URI (at://did/collection/rkey) 1218 * @param uri - AT URI of the category 1219 * @param dbOrTx - Database instance or transaction 1220 */ 1221 private async getCategoryIdByUri( 1222 uri: string, 1223 dbOrTx: DbOrTransaction = this.db 1224 ): Promise<bigint | null> { 1225 const parsed = parseAtUri(uri); 1226 if (!parsed) return null; 1227 1228 try { 1229 const [result] = await dbOrTx 1230 .select({ id: categories.id }) 1231 .from(categories) 1232 .where(and(eq(categories.did, parsed.did), eq(categories.rkey, parsed.rkey))) 1233 .limit(1); 1234 return result?.id ?? null; 1235 } catch (error) { 1236 this.logger.error("Database error in getCategoryIdByUri", { 1237 operation: "getCategoryIdByUri", 1238 uri, 1239 did: parsed.did, 1240 rkey: parsed.rkey, 1241 error: error instanceof Error ? error.message : String(error), 1242 }); 1243 throw error; 1244 } 1245 } 1246}