Barazo AppView backend
barazo.forum
1import { eq, and, desc, sql } from 'drizzle-orm'
2import { requireCommunityDid } from '../middleware/community-resolver.js'
3import type { FastifyPluginCallback } from 'fastify'
4import { notFound, badRequest, conflict, errorResponseSchema } from '../lib/api-errors.js'
5import { wordFilterSchema, queueActionSchema, queueQuerySchema } from '../validation/anti-spam.js'
6import { moderationQueue } from '../db/schema/moderation-queue.js'
7import { accountTrust } from '../db/schema/account-trust.js'
8import { topics } from '../db/schema/topics.js'
9import { replies } from '../db/schema/replies.js'
10import { communitySettings } from '../db/schema/community-settings.js'
11import { createRequireModerator } from '../auth/require-moderator.js'
12
13// ---------------------------------------------------------------------------
14// OpenAPI JSON Schema definitions
15// ---------------------------------------------------------------------------
16
17const queueItemJsonSchema = {
18 type: 'object' as const,
19 properties: {
20 id: { type: 'number' as const },
21 contentUri: { type: 'string' as const },
22 contentType: { type: 'string' as const },
23 authorDid: { type: 'string' as const },
24 queueReason: { type: 'string' as const },
25 matchedWords: {
26 type: ['array', 'null'] as const,
27 items: { type: 'string' as const },
28 },
29 status: { type: 'string' as const },
30 reviewedBy: { type: ['string', 'null'] as const },
31 createdAt: { type: 'string' as const, format: 'date-time' as const },
32 reviewedAt: { type: ['string', 'null'] as const },
33 },
34}
35
36// ---------------------------------------------------------------------------
37// Helpers
38// ---------------------------------------------------------------------------
39
40function serializeQueueItem(row: typeof moderationQueue.$inferSelect) {
41 return {
42 id: row.id,
43 contentUri: row.contentUri,
44 contentType: row.contentType,
45 authorDid: row.authorDid,
46 queueReason: row.queueReason,
47 matchedWords: row.matchedWords ?? null,
48 status: row.status,
49 reviewedBy: row.reviewedBy ?? null,
50 createdAt: row.createdAt.toISOString(),
51 reviewedAt: row.reviewedAt?.toISOString() ?? null,
52 }
53}
54
55function encodeCursor(createdAt: string, id: number): string {
56 return Buffer.from(JSON.stringify({ createdAt, id })).toString('base64')
57}
58
59function decodeCursor(cursor: string): { createdAt: string; id: number } | null {
60 try {
61 const decoded = JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8')) as Record<
62 string,
63 unknown
64 >
65 if (typeof decoded.createdAt === 'string' && typeof decoded.id === 'number') {
66 return { createdAt: decoded.createdAt, id: decoded.id }
67 }
68 return null
69 } catch {
70 return null
71 }
72}
73
74// ---------------------------------------------------------------------------
75// Routes
76// ---------------------------------------------------------------------------
77
78export function moderationQueueRoutes(): FastifyPluginCallback {
79 return (app, _opts, done) => {
80 const { db, authMiddleware } = app
81 const requireModerator = createRequireModerator(db, authMiddleware, app.log)
82 const requireAdmin = app.requireAdmin
83
84 // -------------------------------------------------------------------
85 // GET /api/moderation/queue (moderator+)
86 // -------------------------------------------------------------------
87
88 app.get(
89 '/api/moderation/queue',
90 {
91 preHandler: [requireModerator],
92 schema: {
93 tags: ['Moderation'],
94 summary: 'List moderation queue items (paginated)',
95 security: [{ bearerAuth: [] }],
96 querystring: {
97 type: 'object',
98 properties: {
99 status: {
100 type: 'string',
101 enum: ['pending', 'approved', 'rejected'],
102 },
103 queueReason: {
104 type: 'string',
105 enum: ['word_filter', 'first_post', 'link_hold', 'burst', 'topic_delay'],
106 },
107 cursor: { type: 'string' },
108 limit: { type: 'string' },
109 },
110 },
111 response: {
112 200: {
113 type: 'object',
114 properties: {
115 items: { type: 'array', items: queueItemJsonSchema },
116 cursor: { type: ['string', 'null'] },
117 },
118 },
119 400: errorResponseSchema,
120 },
121 },
122 },
123 async (request, reply) => {
124 const communityDid = requireCommunityDid(request)
125 const parsed = queueQuerySchema.safeParse(request.query)
126 if (!parsed.success) {
127 throw badRequest('Invalid query parameters')
128 }
129
130 const { status, queueReason, cursor, limit } = parsed.data
131 const conditions = [eq(moderationQueue.communityDid, communityDid)]
132
133 conditions.push(eq(moderationQueue.status, status))
134
135 if (queueReason) {
136 conditions.push(eq(moderationQueue.queueReason, queueReason))
137 }
138
139 if (cursor) {
140 const decoded = decodeCursor(cursor)
141 if (decoded) {
142 conditions.push(
143 sql`(${moderationQueue.createdAt}, ${moderationQueue.id}) < (${decoded.createdAt}::timestamptz, ${decoded.id})`
144 )
145 }
146 }
147
148 const whereClause = and(...conditions)
149 const fetchLimit = limit + 1
150
151 const rows = await db
152 .select()
153 .from(moderationQueue)
154 .where(whereClause)
155 .orderBy(desc(moderationQueue.createdAt))
156 .limit(fetchLimit)
157
158 const hasMore = rows.length > limit
159 const resultRows = hasMore ? rows.slice(0, limit) : rows
160
161 let nextCursor: string | null = null
162 if (hasMore) {
163 const lastRow = resultRows[resultRows.length - 1]
164 if (lastRow) {
165 nextCursor = encodeCursor(lastRow.createdAt.toISOString(), lastRow.id)
166 }
167 }
168
169 return reply.status(200).send({
170 items: resultRows.map(serializeQueueItem),
171 cursor: nextCursor,
172 })
173 }
174 )
175
176 // -------------------------------------------------------------------
177 // PUT /api/moderation/queue/:id (moderator+)
178 // -------------------------------------------------------------------
179
180 app.put(
181 '/api/moderation/queue/:id',
182 {
183 preHandler: [requireModerator],
184 schema: {
185 tags: ['Moderation'],
186 summary: 'Approve or reject a queued item',
187 security: [{ bearerAuth: [] }],
188 params: {
189 type: 'object',
190 required: ['id'],
191 properties: { id: { type: 'string' } },
192 },
193 body: {
194 type: 'object',
195 required: ['action'],
196 properties: {
197 action: { type: 'string', enum: ['approve', 'reject'] },
198 },
199 },
200 response: {
201 200: queueItemJsonSchema,
202 400: errorResponseSchema,
203 401: errorResponseSchema,
204 403: errorResponseSchema,
205 404: errorResponseSchema,
206 409: errorResponseSchema,
207 },
208 },
209 },
210 async (request, reply) => {
211 const communityDid = requireCommunityDid(request)
212 const user = request.user
213 if (!user) {
214 return reply.status(401).send({ error: 'Authentication required' })
215 }
216
217 const { id } = request.params as { id: string }
218 const queueId = Number(id)
219 if (Number.isNaN(queueId)) {
220 throw badRequest('Invalid queue item ID')
221 }
222
223 const parsed = queueActionSchema.safeParse(request.body)
224 if (!parsed.success) {
225 throw badRequest('Invalid action')
226 }
227
228 const { action } = parsed.data
229
230 // Fetch the queue item
231 const existing = await db
232 .select()
233 .from(moderationQueue)
234 .where(
235 and(eq(moderationQueue.id, queueId), eq(moderationQueue.communityDid, communityDid))
236 )
237
238 const item = existing[0]
239 if (!item) {
240 throw notFound('Queue item not found')
241 }
242
243 if (item.status !== 'pending') {
244 throw conflict('Queue item already reviewed')
245 }
246
247 const newStatus = action === 'approve' ? 'approved' : 'rejected'
248 const contentStatus = action === 'approve' ? 'approved' : 'rejected'
249
250 await db.transaction(async (tx) => {
251 // Update queue item
252 await tx
253 .update(moderationQueue)
254 .set({
255 status: newStatus,
256 reviewedBy: user.did,
257 reviewedAt: new Date(),
258 })
259 .where(eq(moderationQueue.id, queueId))
260
261 // Update content moderation status
262 if (item.contentType === 'topic') {
263 await tx
264 .update(topics)
265 .set({ moderationStatus: contentStatus })
266 .where(eq(topics.uri, item.contentUri))
267 } else {
268 await tx
269 .update(replies)
270 .set({ moderationStatus: contentStatus })
271 .where(eq(replies.uri, item.contentUri))
272 }
273
274 // On approve: increment account trust
275 if (action === 'approve') {
276 // Check if there are other pending queue items for the same content URI
277 // Only increment trust once per content item (not per queue reason)
278 const otherPending = await tx
279 .select({ id: moderationQueue.id })
280 .from(moderationQueue)
281 .where(
282 and(
283 eq(moderationQueue.contentUri, item.contentUri),
284 eq(moderationQueue.status, 'pending'),
285 sql`${moderationQueue.id} != ${queueId}`
286 )
287 )
288
289 // Also approve any other pending queue items for the same content
290 if (otherPending.length > 0) {
291 await tx
292 .update(moderationQueue)
293 .set({
294 status: 'approved',
295 reviewedBy: user.did,
296 reviewedAt: new Date(),
297 })
298 .where(
299 and(
300 eq(moderationQueue.contentUri, item.contentUri),
301 eq(moderationQueue.status, 'pending')
302 )
303 )
304 }
305
306 // Upsert account trust
307 const existingTrust = await tx
308 .select()
309 .from(accountTrust)
310 .where(
311 and(
312 eq(accountTrust.did, item.authorDid),
313 eq(accountTrust.communityDid, communityDid)
314 )
315 )
316
317 // Load thresholds for trust check
318 const settingsRows = await tx
319 .select({
320 moderationThresholds: communitySettings.moderationThresholds,
321 })
322 .from(communitySettings)
323 .where(eq(communitySettings.communityDid, communityDid))
324 const trustedPostThreshold =
325 settingsRows[0]?.moderationThresholds.trustedPostThreshold ?? 10
326
327 if (existingTrust.length > 0) {
328 const newCount = (existingTrust[0]?.approvedPostCount ?? 0) + 1
329 const nowTrusted = newCount >= trustedPostThreshold
330
331 await tx
332 .update(accountTrust)
333 .set({
334 approvedPostCount: newCount,
335 isTrusted: nowTrusted,
336 ...(nowTrusted && !existingTrust[0]?.isTrusted ? { trustedAt: new Date() } : {}),
337 })
338 .where(
339 and(
340 eq(accountTrust.did, item.authorDid),
341 eq(accountTrust.communityDid, communityDid)
342 )
343 )
344 } else {
345 const nowTrusted = 1 >= trustedPostThreshold
346 await tx.insert(accountTrust).values({
347 did: item.authorDid,
348 communityDid,
349 approvedPostCount: 1,
350 isTrusted: nowTrusted,
351 ...(nowTrusted ? { trustedAt: new Date() } : {}),
352 })
353 }
354 }
355 })
356
357 app.log.info(
358 {
359 queueId,
360 action,
361 contentUri: item.contentUri,
362 reviewedBy: user.did,
363 },
364 `Queue item ${action}d`
365 )
366
367 // Fetch updated item
368 const updated = await db
369 .select()
370 .from(moderationQueue)
371 .where(eq(moderationQueue.id, queueId))
372
373 const updatedItem = updated[0]
374 if (!updatedItem) {
375 throw notFound('Queue item not found after update')
376 }
377
378 return reply.status(200).send(serializeQueueItem(updatedItem))
379 }
380 )
381
382 // -------------------------------------------------------------------
383 // GET /api/admin/moderation/word-filter (admin only)
384 // -------------------------------------------------------------------
385
386 app.get(
387 '/api/admin/moderation/word-filter',
388 {
389 preHandler: [requireAdmin],
390 schema: {
391 tags: ['Admin'],
392 summary: 'Get word filter list',
393 security: [{ bearerAuth: [] }],
394 response: {
395 200: {
396 type: 'object',
397 properties: {
398 words: {
399 type: 'array',
400 items: { type: 'string' },
401 },
402 },
403 },
404 },
405 },
406 },
407 async (request, reply) => {
408 const communityDid = requireCommunityDid(request)
409 const rows = await db
410 .select({ wordFilter: communitySettings.wordFilter })
411 .from(communitySettings)
412 .where(eq(communitySettings.communityDid, communityDid))
413
414 const words = rows[0]?.wordFilter ?? []
415
416 return reply.status(200).send({ words })
417 }
418 )
419
420 // -------------------------------------------------------------------
421 // PUT /api/admin/moderation/word-filter (admin only)
422 // -------------------------------------------------------------------
423
424 app.put(
425 '/api/admin/moderation/word-filter',
426 {
427 preHandler: [requireAdmin],
428 schema: {
429 tags: ['Admin'],
430 summary: 'Update word filter list',
431 security: [{ bearerAuth: [] }],
432 body: {
433 type: 'object',
434 required: ['words'],
435 properties: {
436 words: {
437 type: 'array',
438 items: { type: 'string', minLength: 1, maxLength: 100 },
439 maxItems: 500,
440 },
441 },
442 },
443 response: {
444 200: {
445 type: 'object',
446 properties: {
447 words: {
448 type: 'array',
449 items: { type: 'string' },
450 },
451 },
452 },
453 400: errorResponseSchema,
454 },
455 },
456 },
457 async (request, reply) => {
458 const communityDid = requireCommunityDid(request)
459 const parsed = wordFilterSchema.safeParse(request.body)
460 if (!parsed.success) {
461 throw badRequest('Invalid word filter data')
462 }
463
464 // Deduplicate and normalize to lowercase
465 const words = [...new Set(parsed.data.words.map((w) => w.toLowerCase()))]
466
467 await db
468 .update(communitySettings)
469 .set({ wordFilter: words })
470 .where(eq(communitySettings.communityDid, communityDid))
471
472 // Invalidate cached anti-spam settings
473 try {
474 await app.cache.del(`antispam:settings:${communityDid}`)
475 } catch {
476 // Non-critical
477 }
478
479 app.log.info({ wordCount: words.length }, 'Word filter updated')
480
481 return reply.status(200).send({ words })
482 }
483 )
484
485 done()
486 }
487}