Barazo AppView backend
barazo.forum
1import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest'
2import { eq } from 'drizzle-orm'
3
4import { createDb } from '../../../src/db/index.js'
5import type { Database } from '../../../src/db/index.js'
6import { topics } from '../../../src/db/schema/topics.js'
7import { replies } from '../../../src/db/schema/replies.js'
8import { reactions } from '../../../src/db/schema/reactions.js'
9import { votes } from '../../../src/db/schema/votes.js'
10import { users } from '../../../src/db/schema/users.js'
11import { TopicIndexer } from '../../../src/firehose/indexers/topic.js'
12import { ReplyIndexer } from '../../../src/firehose/indexers/reply.js'
13import { ReactionIndexer } from '../../../src/firehose/indexers/reaction.js'
14import { VoteIndexer } from '../../../src/firehose/indexers/vote.js'
15import { RecordHandler } from '../../../src/firehose/handlers/record.js'
16import type { RecordEvent } from '../../../src/firehose/types.js'
17import type { AccountAgeService } from '../../../src/services/account-age.js'
18import type postgres from 'postgres'
19
20/** Stub that skips PLC resolution and always returns 'trusted'. */
21function createStubAccountAgeService(): AccountAgeService {
22 return {
23 // eslint-disable-next-line @typescript-eslint/require-await
24 resolveCreationDate: async () => null,
25 determineTrustStatus: () => 'trusted',
26 }
27}
28
29const DATABASE_URL =
30 process.env['DATABASE_URL'] ?? 'postgresql://barazo:barazo_dev@localhost:5432/barazo'
31
32function createLogger() {
33 return {
34 info: () => undefined,
35 error: () => undefined,
36 warn: () => undefined,
37 debug: () => undefined,
38 }
39}
40
41/** Asserts a single-row query result and returns the row. */
42function one<T>(rows: T[]): T {
43 expect(rows).toHaveLength(1)
44 return rows[0] as T
45}
46
47describe('firehose record processing (integration)', () => {
48 let db: Database
49 let client: postgres.Sql
50 let handler: RecordHandler
51
52 beforeAll(() => {
53 const conn = createDb(DATABASE_URL)
54 db = conn.db
55 client = conn.client
56
57 const logger = createLogger()
58 const topicIndexer = new TopicIndexer(db, logger as never)
59 const replyIndexer = new ReplyIndexer(db, logger as never)
60 const reactionIndexer = new ReactionIndexer(db, logger as never)
61 const voteIndexer = new VoteIndexer(db, logger as never)
62
63 handler = new RecordHandler(
64 { topic: topicIndexer, reply: replyIndexer, reaction: reactionIndexer, vote: voteIndexer },
65 db,
66 logger as never,
67 createStubAccountAgeService()
68 )
69 })
70
71 afterAll(async () => {
72 await client.end()
73 })
74
75 beforeEach(async () => {
76 // Clean tables in correct FK-safe order
77 await db.delete(votes)
78 await db.delete(reactions)
79 await db.delete(replies)
80 await db.delete(topics)
81 await db.delete(users)
82 })
83
84 describe('topic lifecycle', () => {
85 const topicEvent: RecordEvent = {
86 id: 1,
87 action: 'create',
88 did: 'did:plc:integ-user1',
89 rev: 'rev1',
90 collection: 'forum.barazo.topic.post',
91 rkey: 'topic1',
92 record: {
93 title: 'Integration Test Topic',
94 content: {
95 $type: 'forum.barazo.richtext#markdown' as const,
96 value: 'This is a test topic for integration testing.',
97 },
98 community: 'did:plc:community',
99 category: 'general',
100 publishedAt: '2026-01-15T10:00:00.000Z',
101 },
102 cid: 'bafytopic1',
103 live: true,
104 }
105
106 it('creates a topic and upserts user stub', async () => {
107 await handler.handle(topicEvent)
108
109 const topic = one(
110 await db
111 .select()
112 .from(topics)
113 .where(eq(topics.uri, 'at://did:plc:integ-user1/forum.barazo.topic.post/topic1'))
114 )
115
116 expect(topic.title).toBe('Integration Test Topic')
117 expect(topic.authorDid).toBe('did:plc:integ-user1')
118 expect(topic.category).toBe('general')
119 expect(topic.communityDid).toBe('did:plc:community')
120 expect(topic.replyCount).toBe(0)
121 expect(topic.reactionCount).toBe(0)
122
123 // Verify user stub was created
124 const user = one(await db.select().from(users).where(eq(users.did, 'did:plc:integ-user1')))
125
126 expect(user.handle).toBe('did:plc:integ-user1') // Stub uses DID as handle
127 })
128
129 it('updates a topic', async () => {
130 await handler.handle(topicEvent)
131
132 const updateEvent: RecordEvent = {
133 id: 2,
134 action: 'update',
135 did: 'did:plc:integ-user1',
136 rev: 'rev2',
137 collection: 'forum.barazo.topic.post',
138 rkey: 'topic1',
139 record: {
140 title: 'Updated Topic Title',
141 content: {
142 $type: 'forum.barazo.richtext#markdown' as const,
143 value: 'Updated content for the topic.',
144 },
145 community: 'did:plc:community',
146 category: 'discussion',
147 publishedAt: '2026-01-15T10:00:00.000Z',
148 },
149 cid: 'bafytopic1v2',
150 live: true,
151 }
152
153 await handler.handle(updateEvent)
154
155 const topic = one(
156 await db
157 .select()
158 .from(topics)
159 .where(eq(topics.uri, 'at://did:plc:integ-user1/forum.barazo.topic.post/topic1'))
160 )
161
162 expect(topic.title).toBe('Updated Topic Title')
163 expect(topic.content).toBe('Updated content for the topic.')
164 expect(topic.category).toBe('discussion')
165 expect(topic.cid).toBe('bafytopic1v2')
166 })
167
168 it('soft-deletes a topic', async () => {
169 await handler.handle(topicEvent)
170
171 const deleteEvent: RecordEvent = {
172 id: 3,
173 action: 'delete',
174 did: 'did:plc:integ-user1',
175 rev: 'rev3',
176 collection: 'forum.barazo.topic.post',
177 rkey: 'topic1',
178 live: true,
179 }
180
181 await handler.handle(deleteEvent)
182
183 const topic = one(
184 await db
185 .select()
186 .from(topics)
187 .where(eq(topics.uri, 'at://did:plc:integ-user1/forum.barazo.topic.post/topic1'))
188 )
189
190 expect(topic.isAuthorDeleted).toBe(true)
191 })
192 })
193
194 describe('reply with count updates', () => {
195 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/topic1'
196
197 beforeEach(async () => {
198 // Create a topic first for replies to attach to
199 await handler.handle({
200 id: 10,
201 action: 'create',
202 did: 'did:plc:integ-user1',
203 rev: 'rev1',
204 collection: 'forum.barazo.topic.post',
205 rkey: 'topic1',
206 record: {
207 title: 'Parent Topic',
208 content: {
209 $type: 'forum.barazo.richtext#markdown' as const,
210 value: 'Topic for reply tests',
211 },
212 community: 'did:plc:community',
213 category: 'general',
214 publishedAt: '2026-01-15T10:00:00.000Z',
215 },
216 cid: 'bafytopic1',
217 live: true,
218 })
219 })
220
221 it('creates a reply and increments reply count', async () => {
222 await handler.handle({
223 id: 11,
224 action: 'create',
225 did: 'did:plc:integ-user2',
226 rev: 'rev1',
227 collection: 'forum.barazo.topic.reply',
228 rkey: 'reply1',
229 record: {
230 content: { $type: 'forum.barazo.richtext#markdown' as const, value: 'This is a reply' },
231 root: { uri: topicUri, cid: 'bafytopic1' },
232 parent: { uri: topicUri, cid: 'bafytopic1' },
233 community: 'did:plc:community',
234 createdAt: '2026-01-15T11:00:00.000Z',
235 },
236 cid: 'bafyreply1',
237 live: true,
238 })
239
240 // Verify reply exists
241 const reply = one(
242 await db
243 .select()
244 .from(replies)
245 .where(eq(replies.uri, 'at://did:plc:integ-user2/forum.barazo.topic.reply/reply1'))
246 )
247
248 expect(reply.content).toBe('This is a reply')
249 expect(reply.rootUri).toBe(topicUri)
250
251 // Verify reply count incremented
252 const topic = one(await db.select().from(topics).where(eq(topics.uri, topicUri)))
253
254 expect(topic.replyCount).toBe(1)
255 })
256
257 it('handles multiple replies and correct count', async () => {
258 // Add two replies
259 for (let i = 1; i <= 2; i++) {
260 await handler.handle({
261 id: 20 + i,
262 action: 'create',
263 did: `did:plc:integ-user${String(i + 1)}`,
264 rev: 'rev1',
265 collection: 'forum.barazo.topic.reply',
266 rkey: `reply${String(i)}`,
267 record: {
268 content: {
269 $type: 'forum.barazo.richtext#markdown' as const,
270 value: `Reply ${String(i)}`,
271 },
272 root: { uri: topicUri, cid: 'bafytopic1' },
273 parent: { uri: topicUri, cid: 'bafytopic1' },
274 community: 'did:plc:community',
275 createdAt: `2026-01-15T1${String(i)}:00:00.000Z`,
276 },
277 cid: `bafyreply${String(i)}`,
278 live: true,
279 })
280 }
281
282 const topic = one(await db.select().from(topics).where(eq(topics.uri, topicUri)))
283
284 expect(topic.replyCount).toBe(2)
285 })
286 })
287
288 describe('reaction with count updates', () => {
289 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/topic1'
290
291 beforeEach(async () => {
292 await handler.handle({
293 id: 30,
294 action: 'create',
295 did: 'did:plc:integ-user1',
296 rev: 'rev1',
297 collection: 'forum.barazo.topic.post',
298 rkey: 'topic1',
299 record: {
300 title: 'Reactable Topic',
301 content: {
302 $type: 'forum.barazo.richtext#markdown' as const,
303 value: 'Topic for reaction tests',
304 },
305 community: 'did:plc:community',
306 category: 'general',
307 publishedAt: '2026-01-15T10:00:00.000Z',
308 },
309 cid: 'bafytopic1',
310 live: true,
311 })
312 })
313
314 it('creates a reaction and increments reaction count on topic', async () => {
315 await handler.handle({
316 id: 31,
317 action: 'create',
318 did: 'did:plc:integ-user2',
319 rev: 'rev1',
320 collection: 'forum.barazo.interaction.reaction',
321 rkey: 'react1',
322 record: {
323 subject: { uri: topicUri, cid: 'bafytopic1' },
324 type: 'like',
325 community: 'did:plc:community',
326 createdAt: '2026-01-15T12:00:00.000Z',
327 },
328 cid: 'bafyreact1',
329 live: true,
330 })
331
332 // Verify reaction exists
333 const reaction = one(
334 await db
335 .select()
336 .from(reactions)
337 .where(
338 eq(reactions.uri, 'at://did:plc:integ-user2/forum.barazo.interaction.reaction/react1')
339 )
340 )
341
342 expect(reaction.type).toBe('like')
343 expect(reaction.subjectUri).toBe(topicUri)
344
345 // Verify reaction count incremented on topic
346 const topic = one(await db.select().from(topics).where(eq(topics.uri, topicUri)))
347
348 expect(topic.reactionCount).toBe(1)
349 })
350 })
351
352 describe('idempotent replay', () => {
353 it('replaying a topic create is idempotent (upsert)', async () => {
354 const event: RecordEvent = {
355 id: 40,
356 action: 'create',
357 did: 'did:plc:integ-user1',
358 rev: 'rev1',
359 collection: 'forum.barazo.topic.post',
360 rkey: 'idem-topic1',
361 record: {
362 title: 'Idempotent Topic',
363 content: { $type: 'forum.barazo.richtext#markdown' as const, value: 'Original content' },
364 community: 'did:plc:community',
365 category: 'general',
366 publishedAt: '2026-01-15T10:00:00.000Z',
367 },
368 cid: 'bafyidem1',
369 live: false,
370 }
371
372 // Process same event twice
373 await handler.handle(event)
374 await handler.handle(event)
375
376 const result = await db
377 .select()
378 .from(topics)
379 .where(eq(topics.uri, 'at://did:plc:integ-user1/forum.barazo.topic.post/idem-topic1'))
380
381 // Should still be exactly one row
382 expect(result).toHaveLength(1)
383 const topic = one(result)
384 expect(topic.title).toBe('Idempotent Topic')
385 })
386
387 it('replaying a reply create does not duplicate rows', async () => {
388 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/idem-topic2'
389
390 // Create topic
391 await handler.handle({
392 id: 50,
393 action: 'create',
394 did: 'did:plc:integ-user1',
395 rev: 'rev1',
396 collection: 'forum.barazo.topic.post',
397 rkey: 'idem-topic2',
398 record: {
399 title: 'Topic for replay test',
400 content: { $type: 'forum.barazo.richtext#markdown' as const, value: 'Content' },
401 community: 'did:plc:community',
402 category: 'general',
403 publishedAt: '2026-01-15T10:00:00.000Z',
404 },
405 cid: 'bafyidem2',
406 live: false,
407 })
408
409 const replyEvent: RecordEvent = {
410 id: 51,
411 action: 'create',
412 did: 'did:plc:integ-user2',
413 rev: 'rev1',
414 collection: 'forum.barazo.topic.reply',
415 rkey: 'idem-reply1',
416 record: {
417 content: { $type: 'forum.barazo.richtext#markdown' as const, value: 'Replay test reply' },
418 root: { uri: topicUri, cid: 'bafyidem2' },
419 parent: { uri: topicUri, cid: 'bafyidem2' },
420 community: 'did:plc:community',
421 createdAt: '2026-01-15T11:00:00.000Z',
422 },
423 cid: 'bafyidemreply1',
424 live: false,
425 }
426
427 // Reply uses onConflictDoNothing, so second insert is a no-op for the row.
428 // In practice, Tap handles replay deduplication.
429 await handler.handle(replyEvent)
430 await handler.handle(replyEvent)
431
432 const replyRows = await db
433 .select()
434 .from(replies)
435 .where(eq(replies.uri, 'at://did:plc:integ-user2/forum.barazo.topic.reply/idem-reply1'))
436
437 // Exactly one reply row (onConflictDoNothing)
438 expect(replyRows).toHaveLength(1)
439 })
440 })
441
442 describe('tombstone: edit-then-delete preserves reply strongRefs', () => {
443 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/edit-del-topic'
444 const originalCid = 'bafyoriginalcid'
445 const updatedCid = 'bafyupdatedcid'
446
447 it('reply retains original CID reference after topic edit and delete', async () => {
448 // Step 1: Create topic with original CID
449 await handler.handle({
450 id: 70,
451 action: 'create',
452 did: 'did:plc:integ-user1',
453 rev: 'rev1',
454 collection: 'forum.barazo.topic.post',
455 rkey: 'edit-del-topic',
456 record: {
457 title: 'Original Title',
458 content: { $type: 'forum.barazo.richtext#markdown' as const, value: 'Original content' },
459 community: 'did:plc:community',
460 category: 'general',
461 publishedAt: '2026-01-15T10:00:00.000Z',
462 },
463 cid: originalCid,
464 live: true,
465 })
466
467 // Step 2: Another user replies, referencing the original CID
468 await handler.handle({
469 id: 71,
470 action: 'create',
471 did: 'did:plc:integ-user2',
472 rev: 'rev1',
473 collection: 'forum.barazo.topic.reply',
474 rkey: 'edit-del-reply',
475 record: {
476 content: {
477 $type: 'forum.barazo.richtext#markdown' as const,
478 value: 'Reply referencing original CID',
479 },
480 root: { uri: topicUri, cid: originalCid },
481 parent: { uri: topicUri, cid: originalCid },
482 community: 'did:plc:community',
483 createdAt: '2026-01-15T11:00:00.000Z',
484 },
485 cid: 'bafyreplyeditdel',
486 live: true,
487 })
488
489 // Step 3: Author edits the topic (CID changes)
490 await handler.handle({
491 id: 72,
492 action: 'update',
493 did: 'did:plc:integ-user1',
494 rev: 'rev2',
495 collection: 'forum.barazo.topic.post',
496 rkey: 'edit-del-topic',
497 record: {
498 title: 'Updated Title',
499 content: { $type: 'forum.barazo.richtext#markdown' as const, value: 'Updated content' },
500 community: 'did:plc:community',
501 category: 'general',
502 publishedAt: '2026-01-15T10:00:00.000Z',
503 },
504 cid: updatedCid,
505 live: true,
506 })
507
508 // Step 4: Author deletes the topic
509 await handler.handle({
510 id: 73,
511 action: 'delete',
512 did: 'did:plc:integ-user1',
513 rev: 'rev3',
514 collection: 'forum.barazo.topic.post',
515 rkey: 'edit-del-topic',
516 live: true,
517 })
518
519 // Verify: topic is soft-deleted, not hard-deleted
520 const topic = one(await db.select().from(topics).where(eq(topics.uri, topicUri)))
521 expect(topic.isAuthorDeleted).toBe(true)
522 expect(topic.cid).toBe(updatedCid)
523
524 // Verify: reply still exists with original CID references intact
525 const replyUri = 'at://did:plc:integ-user2/forum.barazo.topic.reply/edit-del-reply'
526 const reply = one(await db.select().from(replies).where(eq(replies.uri, replyUri)))
527 expect(reply.isAuthorDeleted).toBe(false)
528 expect(reply.rootUri).toBe(topicUri)
529 expect(reply.rootCid).toBe(originalCid)
530 expect(reply.parentCid).toBe(originalCid)
531 })
532 })
533
534 describe('tombstone: rapid create-then-delete', () => {
535 it('topic ends up soft-deleted after create followed by immediate delete', async () => {
536 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/rapid-topic'
537
538 // Create then immediately delete (simulates rapid firehose events)
539 await handler.handle({
540 id: 80,
541 action: 'create',
542 did: 'did:plc:integ-user1',
543 rev: 'rev1',
544 collection: 'forum.barazo.topic.post',
545 rkey: 'rapid-topic',
546 record: {
547 title: 'Ephemeral Topic',
548 content: {
549 $type: 'forum.barazo.richtext#markdown' as const,
550 value: 'Gone before you know it',
551 },
552 community: 'did:plc:community',
553 category: 'general',
554 publishedAt: '2026-01-15T10:00:00.000Z',
555 },
556 cid: 'bafyrapid1',
557 live: true,
558 })
559
560 await handler.handle({
561 id: 81,
562 action: 'delete',
563 did: 'did:plc:integ-user1',
564 rev: 'rev2',
565 collection: 'forum.barazo.topic.post',
566 rkey: 'rapid-topic',
567 live: true,
568 })
569
570 // Topic row should exist but be soft-deleted
571 const topic = one(await db.select().from(topics).where(eq(topics.uri, topicUri)))
572 expect(topic.isAuthorDeleted).toBe(true)
573 })
574
575 it('replies survive when topic is rapidly created and deleted', async () => {
576 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/rapid-topic2'
577
578 // Create topic
579 await handler.handle({
580 id: 82,
581 action: 'create',
582 did: 'did:plc:integ-user1',
583 rev: 'rev1',
584 collection: 'forum.barazo.topic.post',
585 rkey: 'rapid-topic2',
586 record: {
587 title: 'Another Ephemeral Topic',
588 content: {
589 $type: 'forum.barazo.richtext#markdown' as const,
590 value: 'Will be deleted quickly',
591 },
592 community: 'did:plc:community',
593 category: 'general',
594 publishedAt: '2026-01-15T10:00:00.000Z',
595 },
596 cid: 'bafyrapid2',
597 live: true,
598 })
599
600 // Another user replies before deletion
601 await handler.handle({
602 id: 83,
603 action: 'create',
604 did: 'did:plc:integ-user2',
605 rev: 'rev1',
606 collection: 'forum.barazo.topic.reply',
607 rkey: 'rapid-reply1',
608 record: {
609 content: {
610 $type: 'forum.barazo.richtext#markdown' as const,
611 value: 'Quick reply before deletion',
612 },
613 root: { uri: topicUri, cid: 'bafyrapid2' },
614 parent: { uri: topicUri, cid: 'bafyrapid2' },
615 community: 'did:plc:community',
616 createdAt: '2026-01-15T10:00:01.000Z',
617 },
618 cid: 'bafyrapidreply1',
619 live: true,
620 })
621
622 // Rapid delete of the topic
623 await handler.handle({
624 id: 84,
625 action: 'delete',
626 did: 'did:plc:integ-user1',
627 rev: 'rev2',
628 collection: 'forum.barazo.topic.post',
629 rkey: 'rapid-topic2',
630 live: true,
631 })
632
633 // Topic is soft-deleted
634 const topic = one(await db.select().from(topics).where(eq(topics.uri, topicUri)))
635 expect(topic.isAuthorDeleted).toBe(true)
636
637 // Reply is preserved (belongs to another user)
638 const replyUri = 'at://did:plc:integ-user2/forum.barazo.topic.reply/rapid-reply1'
639 const reply = one(await db.select().from(replies).where(eq(replies.uri, replyUri)))
640 expect(reply.isAuthorDeleted).toBe(false)
641 expect(reply.content).toBe('Quick reply before deletion')
642 })
643
644 it('delete before create is handled gracefully (out-of-order events)', async () => {
645 // Firehose can deliver events out of order; delete arriving before create
646 // should not throw
647 await handler.handle({
648 id: 85,
649 action: 'delete',
650 did: 'did:plc:integ-user1',
651 rev: 'rev2',
652 collection: 'forum.barazo.topic.post',
653 rkey: 'ooo-topic',
654 live: true,
655 })
656
657 // Topic doesn't exist, so the update should be a no-op (0 rows affected)
658 const topicUri = 'at://did:plc:integ-user1/forum.barazo.topic.post/ooo-topic'
659 const result = await db.select().from(topics).where(eq(topics.uri, topicUri))
660 expect(result).toHaveLength(0)
661 })
662 })
663
664 describe('unsupported and invalid records', () => {
665 it('skips unsupported collections', async () => {
666 const event: RecordEvent = {
667 id: 60,
668 action: 'create',
669 did: 'did:plc:integ-user1',
670 rev: 'rev1',
671 collection: 'app.bsky.feed.post',
672 rkey: 'post1',
673 record: { text: 'Hello world' },
674 cid: 'bafypost1',
675 live: true,
676 }
677
678 // Should not throw
679 await handler.handle(event)
680
681 // No topic should be created
682 const result = await db.select().from(topics)
683 expect(result).toHaveLength(0)
684 })
685
686 it('skips invalid record data', async () => {
687 const event: RecordEvent = {
688 id: 61,
689 action: 'create',
690 did: 'did:plc:integ-user1',
691 rev: 'rev1',
692 collection: 'forum.barazo.topic.post',
693 rkey: 'bad1',
694 record: { invalid: 'data' },
695 cid: 'bafybad1',
696 live: true,
697 }
698
699 await handler.handle(event)
700
701 const result = await db.select().from(topics)
702 expect(result).toHaveLength(0)
703 })
704 })
705})