Barazo AppView backend barazo.forum
at main 487 lines 16 kB view raw
1import { eq, and, desc, sql } from 'drizzle-orm' 2import { requireCommunityDid } from '../middleware/community-resolver.js' 3import type { FastifyPluginCallback } from 'fastify' 4import { notFound, badRequest, conflict, errorResponseSchema } from '../lib/api-errors.js' 5import { wordFilterSchema, queueActionSchema, queueQuerySchema } from '../validation/anti-spam.js' 6import { moderationQueue } from '../db/schema/moderation-queue.js' 7import { accountTrust } from '../db/schema/account-trust.js' 8import { topics } from '../db/schema/topics.js' 9import { replies } from '../db/schema/replies.js' 10import { communitySettings } from '../db/schema/community-settings.js' 11import { createRequireModerator } from '../auth/require-moderator.js' 12 13// --------------------------------------------------------------------------- 14// OpenAPI JSON Schema definitions 15// --------------------------------------------------------------------------- 16 17const queueItemJsonSchema = { 18 type: 'object' as const, 19 properties: { 20 id: { type: 'number' as const }, 21 contentUri: { type: 'string' as const }, 22 contentType: { type: 'string' as const }, 23 authorDid: { type: 'string' as const }, 24 queueReason: { type: 'string' as const }, 25 matchedWords: { 26 type: ['array', 'null'] as const, 27 items: { type: 'string' as const }, 28 }, 29 status: { type: 'string' as const }, 30 reviewedBy: { type: ['string', 'null'] as const }, 31 createdAt: { type: 'string' as const, format: 'date-time' as const }, 32 reviewedAt: { type: ['string', 'null'] as const }, 33 }, 34} 35 36// --------------------------------------------------------------------------- 37// Helpers 38// --------------------------------------------------------------------------- 39 40function serializeQueueItem(row: typeof moderationQueue.$inferSelect) { 41 return { 42 id: row.id, 43 contentUri: row.contentUri, 44 contentType: row.contentType, 45 authorDid: row.authorDid, 46 queueReason: row.queueReason, 47 matchedWords: row.matchedWords ?? null, 48 status: row.status, 49 reviewedBy: row.reviewedBy ?? null, 50 createdAt: row.createdAt.toISOString(), 51 reviewedAt: row.reviewedAt?.toISOString() ?? null, 52 } 53} 54 55function encodeCursor(createdAt: string, id: number): string { 56 return Buffer.from(JSON.stringify({ createdAt, id })).toString('base64') 57} 58 59function decodeCursor(cursor: string): { createdAt: string; id: number } | null { 60 try { 61 const decoded = JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8')) as Record< 62 string, 63 unknown 64 > 65 if (typeof decoded.createdAt === 'string' && typeof decoded.id === 'number') { 66 return { createdAt: decoded.createdAt, id: decoded.id } 67 } 68 return null 69 } catch { 70 return null 71 } 72} 73 74// --------------------------------------------------------------------------- 75// Routes 76// --------------------------------------------------------------------------- 77 78export function moderationQueueRoutes(): FastifyPluginCallback { 79 return (app, _opts, done) => { 80 const { db, authMiddleware } = app 81 const requireModerator = createRequireModerator(db, authMiddleware, app.log) 82 const requireAdmin = app.requireAdmin 83 84 // ------------------------------------------------------------------- 85 // GET /api/moderation/queue (moderator+) 86 // ------------------------------------------------------------------- 87 88 app.get( 89 '/api/moderation/queue', 90 { 91 preHandler: [requireModerator], 92 schema: { 93 tags: ['Moderation'], 94 summary: 'List moderation queue items (paginated)', 95 security: [{ bearerAuth: [] }], 96 querystring: { 97 type: 'object', 98 properties: { 99 status: { 100 type: 'string', 101 enum: ['pending', 'approved', 'rejected'], 102 }, 103 queueReason: { 104 type: 'string', 105 enum: ['word_filter', 'first_post', 'link_hold', 'burst', 'topic_delay'], 106 }, 107 cursor: { type: 'string' }, 108 limit: { type: 'string' }, 109 }, 110 }, 111 response: { 112 200: { 113 type: 'object', 114 properties: { 115 items: { type: 'array', items: queueItemJsonSchema }, 116 cursor: { type: ['string', 'null'] }, 117 }, 118 }, 119 400: errorResponseSchema, 120 }, 121 }, 122 }, 123 async (request, reply) => { 124 const communityDid = requireCommunityDid(request) 125 const parsed = queueQuerySchema.safeParse(request.query) 126 if (!parsed.success) { 127 throw badRequest('Invalid query parameters') 128 } 129 130 const { status, queueReason, cursor, limit } = parsed.data 131 const conditions = [eq(moderationQueue.communityDid, communityDid)] 132 133 conditions.push(eq(moderationQueue.status, status)) 134 135 if (queueReason) { 136 conditions.push(eq(moderationQueue.queueReason, queueReason)) 137 } 138 139 if (cursor) { 140 const decoded = decodeCursor(cursor) 141 if (decoded) { 142 conditions.push( 143 sql`(${moderationQueue.createdAt}, ${moderationQueue.id}) < (${decoded.createdAt}::timestamptz, ${decoded.id})` 144 ) 145 } 146 } 147 148 const whereClause = and(...conditions) 149 const fetchLimit = limit + 1 150 151 const rows = await db 152 .select() 153 .from(moderationQueue) 154 .where(whereClause) 155 .orderBy(desc(moderationQueue.createdAt)) 156 .limit(fetchLimit) 157 158 const hasMore = rows.length > limit 159 const resultRows = hasMore ? rows.slice(0, limit) : rows 160 161 let nextCursor: string | null = null 162 if (hasMore) { 163 const lastRow = resultRows[resultRows.length - 1] 164 if (lastRow) { 165 nextCursor = encodeCursor(lastRow.createdAt.toISOString(), lastRow.id) 166 } 167 } 168 169 return reply.status(200).send({ 170 items: resultRows.map(serializeQueueItem), 171 cursor: nextCursor, 172 }) 173 } 174 ) 175 176 // ------------------------------------------------------------------- 177 // PUT /api/moderation/queue/:id (moderator+) 178 // ------------------------------------------------------------------- 179 180 app.put( 181 '/api/moderation/queue/:id', 182 { 183 preHandler: [requireModerator], 184 schema: { 185 tags: ['Moderation'], 186 summary: 'Approve or reject a queued item', 187 security: [{ bearerAuth: [] }], 188 params: { 189 type: 'object', 190 required: ['id'], 191 properties: { id: { type: 'string' } }, 192 }, 193 body: { 194 type: 'object', 195 required: ['action'], 196 properties: { 197 action: { type: 'string', enum: ['approve', 'reject'] }, 198 }, 199 }, 200 response: { 201 200: queueItemJsonSchema, 202 400: errorResponseSchema, 203 401: errorResponseSchema, 204 403: errorResponseSchema, 205 404: errorResponseSchema, 206 409: errorResponseSchema, 207 }, 208 }, 209 }, 210 async (request, reply) => { 211 const communityDid = requireCommunityDid(request) 212 const user = request.user 213 if (!user) { 214 return reply.status(401).send({ error: 'Authentication required' }) 215 } 216 217 const { id } = request.params as { id: string } 218 const queueId = Number(id) 219 if (Number.isNaN(queueId)) { 220 throw badRequest('Invalid queue item ID') 221 } 222 223 const parsed = queueActionSchema.safeParse(request.body) 224 if (!parsed.success) { 225 throw badRequest('Invalid action') 226 } 227 228 const { action } = parsed.data 229 230 // Fetch the queue item 231 const existing = await db 232 .select() 233 .from(moderationQueue) 234 .where( 235 and(eq(moderationQueue.id, queueId), eq(moderationQueue.communityDid, communityDid)) 236 ) 237 238 const item = existing[0] 239 if (!item) { 240 throw notFound('Queue item not found') 241 } 242 243 if (item.status !== 'pending') { 244 throw conflict('Queue item already reviewed') 245 } 246 247 const newStatus = action === 'approve' ? 'approved' : 'rejected' 248 const contentStatus = action === 'approve' ? 'approved' : 'rejected' 249 250 await db.transaction(async (tx) => { 251 // Update queue item 252 await tx 253 .update(moderationQueue) 254 .set({ 255 status: newStatus, 256 reviewedBy: user.did, 257 reviewedAt: new Date(), 258 }) 259 .where(eq(moderationQueue.id, queueId)) 260 261 // Update content moderation status 262 if (item.contentType === 'topic') { 263 await tx 264 .update(topics) 265 .set({ moderationStatus: contentStatus }) 266 .where(eq(topics.uri, item.contentUri)) 267 } else { 268 await tx 269 .update(replies) 270 .set({ moderationStatus: contentStatus }) 271 .where(eq(replies.uri, item.contentUri)) 272 } 273 274 // On approve: increment account trust 275 if (action === 'approve') { 276 // Check if there are other pending queue items for the same content URI 277 // Only increment trust once per content item (not per queue reason) 278 const otherPending = await tx 279 .select({ id: moderationQueue.id }) 280 .from(moderationQueue) 281 .where( 282 and( 283 eq(moderationQueue.contentUri, item.contentUri), 284 eq(moderationQueue.status, 'pending'), 285 sql`${moderationQueue.id} != ${queueId}` 286 ) 287 ) 288 289 // Also approve any other pending queue items for the same content 290 if (otherPending.length > 0) { 291 await tx 292 .update(moderationQueue) 293 .set({ 294 status: 'approved', 295 reviewedBy: user.did, 296 reviewedAt: new Date(), 297 }) 298 .where( 299 and( 300 eq(moderationQueue.contentUri, item.contentUri), 301 eq(moderationQueue.status, 'pending') 302 ) 303 ) 304 } 305 306 // Upsert account trust 307 const existingTrust = await tx 308 .select() 309 .from(accountTrust) 310 .where( 311 and( 312 eq(accountTrust.did, item.authorDid), 313 eq(accountTrust.communityDid, communityDid) 314 ) 315 ) 316 317 // Load thresholds for trust check 318 const settingsRows = await tx 319 .select({ 320 moderationThresholds: communitySettings.moderationThresholds, 321 }) 322 .from(communitySettings) 323 .where(eq(communitySettings.communityDid, communityDid)) 324 const trustedPostThreshold = 325 settingsRows[0]?.moderationThresholds.trustedPostThreshold ?? 10 326 327 if (existingTrust.length > 0) { 328 const newCount = (existingTrust[0]?.approvedPostCount ?? 0) + 1 329 const nowTrusted = newCount >= trustedPostThreshold 330 331 await tx 332 .update(accountTrust) 333 .set({ 334 approvedPostCount: newCount, 335 isTrusted: nowTrusted, 336 ...(nowTrusted && !existingTrust[0]?.isTrusted ? { trustedAt: new Date() } : {}), 337 }) 338 .where( 339 and( 340 eq(accountTrust.did, item.authorDid), 341 eq(accountTrust.communityDid, communityDid) 342 ) 343 ) 344 } else { 345 const nowTrusted = 1 >= trustedPostThreshold 346 await tx.insert(accountTrust).values({ 347 did: item.authorDid, 348 communityDid, 349 approvedPostCount: 1, 350 isTrusted: nowTrusted, 351 ...(nowTrusted ? { trustedAt: new Date() } : {}), 352 }) 353 } 354 } 355 }) 356 357 app.log.info( 358 { 359 queueId, 360 action, 361 contentUri: item.contentUri, 362 reviewedBy: user.did, 363 }, 364 `Queue item ${action}d` 365 ) 366 367 // Fetch updated item 368 const updated = await db 369 .select() 370 .from(moderationQueue) 371 .where(eq(moderationQueue.id, queueId)) 372 373 const updatedItem = updated[0] 374 if (!updatedItem) { 375 throw notFound('Queue item not found after update') 376 } 377 378 return reply.status(200).send(serializeQueueItem(updatedItem)) 379 } 380 ) 381 382 // ------------------------------------------------------------------- 383 // GET /api/admin/moderation/word-filter (admin only) 384 // ------------------------------------------------------------------- 385 386 app.get( 387 '/api/admin/moderation/word-filter', 388 { 389 preHandler: [requireAdmin], 390 schema: { 391 tags: ['Admin'], 392 summary: 'Get word filter list', 393 security: [{ bearerAuth: [] }], 394 response: { 395 200: { 396 type: 'object', 397 properties: { 398 words: { 399 type: 'array', 400 items: { type: 'string' }, 401 }, 402 }, 403 }, 404 }, 405 }, 406 }, 407 async (request, reply) => { 408 const communityDid = requireCommunityDid(request) 409 const rows = await db 410 .select({ wordFilter: communitySettings.wordFilter }) 411 .from(communitySettings) 412 .where(eq(communitySettings.communityDid, communityDid)) 413 414 const words = rows[0]?.wordFilter ?? [] 415 416 return reply.status(200).send({ words }) 417 } 418 ) 419 420 // ------------------------------------------------------------------- 421 // PUT /api/admin/moderation/word-filter (admin only) 422 // ------------------------------------------------------------------- 423 424 app.put( 425 '/api/admin/moderation/word-filter', 426 { 427 preHandler: [requireAdmin], 428 schema: { 429 tags: ['Admin'], 430 summary: 'Update word filter list', 431 security: [{ bearerAuth: [] }], 432 body: { 433 type: 'object', 434 required: ['words'], 435 properties: { 436 words: { 437 type: 'array', 438 items: { type: 'string', minLength: 1, maxLength: 100 }, 439 maxItems: 500, 440 }, 441 }, 442 }, 443 response: { 444 200: { 445 type: 'object', 446 properties: { 447 words: { 448 type: 'array', 449 items: { type: 'string' }, 450 }, 451 }, 452 }, 453 400: errorResponseSchema, 454 }, 455 }, 456 }, 457 async (request, reply) => { 458 const communityDid = requireCommunityDid(request) 459 const parsed = wordFilterSchema.safeParse(request.body) 460 if (!parsed.success) { 461 throw badRequest('Invalid word filter data') 462 } 463 464 // Deduplicate and normalize to lowercase 465 const words = [...new Set(parsed.data.words.map((w) => w.toLowerCase()))] 466 467 await db 468 .update(communitySettings) 469 .set({ wordFilter: words }) 470 .where(eq(communitySettings.communityDid, communityDid)) 471 472 // Invalidate cached anti-spam settings 473 try { 474 await app.cache.del(`antispam:settings:${communityDid}`) 475 } catch { 476 // Non-critical 477 } 478 479 app.log.info({ wordCount: words.length }, 'Word filter updated') 480 481 return reply.status(200).send({ words }) 482 } 483 ) 484 485 done() 486 } 487}