Barazo AppView backend barazo.forum
at main 1194 lines 39 kB view raw
1import { eq, and, desc, sql, count } from 'drizzle-orm' 2import type { FastifyPluginCallback } from 'fastify' 3import { notFound, badRequest, tooManyRequests, errorResponseSchema } from '../lib/api-errors.js' 4import { 5 trustSeedCreateSchema, 6 trustSeedQuerySchema, 7 clusterQuerySchema, 8 clusterStatusUpdateSchema, 9 pdsTrustUpdateSchema, 10 pdsTrustQuerySchema, 11 behavioralFlagUpdateSchema, 12 behavioralFlagQuerySchema, 13} from '../validation/sybil.js' 14import { trustSeeds } from '../db/schema/trust-seeds.js' 15import { sybilClusters } from '../db/schema/sybil-clusters.js' 16import { sybilClusterMembers } from '../db/schema/sybil-cluster-members.js' 17import { users } from '../db/schema/users.js' 18import { trustScores } from '../db/schema/trust-scores.js' 19import { interactionGraph } from '../db/schema/interaction-graph.js' 20import { behavioralFlags } from '../db/schema/behavioral-flags.js' 21import { pdsTrustFactors } from '../db/schema/pds-trust-factors.js' 22 23// --------------------------------------------------------------------------- 24// OpenAPI JSON Schema definitions 25// --------------------------------------------------------------------------- 26 27const trustSeedJsonSchema = { 28 type: 'object' as const, 29 properties: { 30 id: { type: 'number' as const }, 31 did: { type: 'string' as const }, 32 handle: { type: ['string', 'null'] as const }, 33 displayName: { type: ['string', 'null'] as const }, 34 communityId: { type: ['string', 'null'] as const }, 35 addedBy: { type: 'string' as const }, 36 reason: { type: ['string', 'null'] as const }, 37 implicit: { type: 'boolean' as const }, 38 createdAt: { type: 'string' as const, format: 'date-time' as const }, 39 }, 40} 41 42const sybilClusterJsonSchema = { 43 type: 'object' as const, 44 properties: { 45 id: { type: 'number' as const }, 46 clusterHash: { type: 'string' as const }, 47 internalEdgeCount: { type: 'number' as const }, 48 externalEdgeCount: { type: 'number' as const }, 49 memberCount: { type: 'number' as const }, 50 suspicionRatio: { type: 'number' as const }, 51 status: { type: 'string' as const }, 52 reviewedBy: { type: ['string', 'null'] as const }, 53 reviewedAt: { type: ['string', 'null'] as const }, 54 detectedAt: { type: 'string' as const, format: 'date-time' as const }, 55 updatedAt: { type: 'string' as const, format: 'date-time' as const }, 56 }, 57} 58 59const pdsTrustJsonSchema = { 60 type: 'object' as const, 61 properties: { 62 id: { type: 'number' as const }, 63 pdsHost: { type: 'string' as const }, 64 trustFactor: { type: 'number' as const }, 65 isDefault: { type: 'boolean' as const }, 66 updatedAt: { type: 'string' as const, format: 'date-time' as const }, 67 }, 68} 69 70const behavioralFlagJsonSchema = { 71 type: 'object' as const, 72 properties: { 73 id: { type: 'number' as const }, 74 flagType: { type: 'string' as const }, 75 affectedDids: { type: 'array' as const, items: { type: 'string' as const } }, 76 details: { type: 'string' as const }, 77 status: { type: 'string' as const }, 78 detectedAt: { type: 'string' as const, format: 'date-time' as const }, 79 }, 80} 81 82const clusterMemberJsonSchema = { 83 type: 'object' as const, 84 properties: { 85 did: { type: 'string' as const }, 86 handle: { type: ['string', 'null'] as const }, 87 displayName: { type: ['string', 'null'] as const }, 88 trustScore: { type: ['number', 'null'] as const }, 89 reputationScore: { type: 'number' as const }, 90 accountAge: { type: ['string', 'null'] as const }, 91 roleInCluster: { type: 'string' as const }, 92 joinedAt: { type: 'string' as const, format: 'date-time' as const }, 93 }, 94} 95 96// --------------------------------------------------------------------------- 97// Helpers 98// --------------------------------------------------------------------------- 99 100function encodeCursor(createdAt: string, id: number): string { 101 return Buffer.from(JSON.stringify({ createdAt, id })).toString('base64') 102} 103 104function decodeCursor(cursor: string): { createdAt: string; id: number } | null { 105 try { 106 const decoded = JSON.parse(Buffer.from(cursor, 'base64').toString('utf-8')) as Record< 107 string, 108 unknown 109 > 110 if (typeof decoded.createdAt === 'string' && typeof decoded.id === 'number') { 111 return { createdAt: decoded.createdAt, id: decoded.id } 112 } 113 return null 114 } catch { 115 return null 116 } 117} 118 119interface TrustSeedWithUser { 120 seed: typeof trustSeeds.$inferSelect 121 handle: string | null 122 displayName: string | null 123} 124 125function serializeTrustSeed(row: TrustSeedWithUser, implicit: boolean) { 126 return { 127 id: row.seed.id, 128 did: row.seed.did, 129 handle: row.handle, 130 displayName: row.displayName, 131 communityId: row.seed.communityId || null, // Convert "" sentinel back to null for API 132 addedBy: row.seed.addedBy, 133 reason: row.seed.reason, 134 implicit, 135 createdAt: row.seed.createdAt.toISOString(), 136 } 137} 138 139function computeSuspicionRatio(internalEdgeCount: number, externalEdgeCount: number): number { 140 const total = internalEdgeCount + externalEdgeCount 141 return total > 0 ? internalEdgeCount / total : 0 142} 143 144function serializeCluster(row: typeof sybilClusters.$inferSelect) { 145 return { 146 id: row.id, 147 clusterHash: row.clusterHash, 148 internalEdgeCount: row.internalEdgeCount, 149 externalEdgeCount: row.externalEdgeCount, 150 memberCount: row.memberCount, 151 suspicionRatio: computeSuspicionRatio(row.internalEdgeCount, row.externalEdgeCount), 152 status: row.status, 153 reviewedBy: row.reviewedBy, 154 reviewedAt: row.reviewedAt?.toISOString() ?? null, 155 detectedAt: row.detectedAt.toISOString(), 156 updatedAt: row.updatedAt.toISOString(), 157 } 158} 159 160function serializePdsTrust(row: typeof pdsTrustFactors.$inferSelect) { 161 return { 162 id: row.id, 163 pdsHost: row.pdsHost, 164 trustFactor: row.trustFactor, 165 isDefault: row.isDefault, 166 updatedAt: row.updatedAt.toISOString(), 167 } 168} 169 170function serializeBehavioralFlag(row: typeof behavioralFlags.$inferSelect) { 171 return { 172 id: row.id, 173 flagType: row.flagType, 174 affectedDids: row.affectedDids, 175 details: row.details, 176 status: row.status, 177 detectedAt: row.detectedAt.toISOString(), 178 } 179} 180 181// Rate limit key for trust graph recompute 182const RECOMPUTE_CACHE_KEY = 'trust-graph:last-recompute' 183const RECOMPUTE_COOLDOWN_MS = 60 * 60 * 1000 // 1 hour 184 185/** Fire-and-forget trust graph recomputation. */ 186function triggerRecompute(app: { 187 trustGraphService: { computeTrustScores(communityId: string | null): Promise<unknown> } 188 log: { warn(obj: unknown, msg: string): void; info(msg: string): void } 189}): void { 190 app.log.info('Triggering fire-and-forget trust graph recompute') 191 app.trustGraphService.computeTrustScores(null).catch((err: unknown) => { 192 app.log.warn({ err }, 'Trust graph recompute failed') 193 }) 194} 195 196// --------------------------------------------------------------------------- 197// Admin sybil routes plugin 198// --------------------------------------------------------------------------- 199 200export function adminSybilRoutes(): FastifyPluginCallback { 201 return (app, _opts, done) => { 202 const { db, cache } = app 203 const requireAdmin = app.requireAdmin 204 205 // ======================================================================= 206 // TRUST SEED ROUTES 207 // ======================================================================= 208 209 // ------------------------------------------------------------------- 210 // GET /api/admin/trust-seeds 211 // ------------------------------------------------------------------- 212 213 app.get( 214 '/api/admin/trust-seeds', 215 { 216 preHandler: [requireAdmin], 217 schema: { 218 tags: ['Admin - Sybil'], 219 summary: 'List trust seeds (including implicit seeds from mods/admins)', 220 security: [{ bearerAuth: [] }], 221 querystring: { 222 type: 'object', 223 properties: { 224 cursor: { type: 'string' }, 225 limit: { type: 'string' }, 226 }, 227 }, 228 response: { 229 200: { 230 type: 'object', 231 properties: { 232 seeds: { type: 'array', items: trustSeedJsonSchema }, 233 cursor: { type: ['string', 'null'] }, 234 }, 235 }, 236 400: errorResponseSchema, 237 }, 238 }, 239 }, 240 async (request, reply) => { 241 const parsed = trustSeedQuerySchema.safeParse(request.query) 242 if (!parsed.success) { 243 throw badRequest('Invalid query parameters') 244 } 245 246 const { cursor, limit } = parsed.data 247 248 // Fetch explicit trust seeds joined with users for handle/displayName 249 const conditions = [] 250 if (cursor) { 251 const decoded = decodeCursor(cursor) 252 if (decoded) { 253 conditions.push( 254 sql`(${trustSeeds.createdAt}, ${trustSeeds.id}) < (${decoded.createdAt}::timestamptz, ${decoded.id})` 255 ) 256 } 257 } 258 259 const whereClause = conditions.length > 0 ? and(...conditions) : undefined 260 const fetchLimit = limit + 1 261 262 const explicitRows = await db 263 .select({ 264 seed: trustSeeds, 265 handle: users.handle, 266 displayName: users.displayName, 267 }) 268 .from(trustSeeds) 269 .leftJoin(users, eq(trustSeeds.did, users.did)) 270 .where(whereClause) 271 .orderBy(desc(trustSeeds.createdAt)) 272 .limit(fetchLimit) 273 274 const hasMore = explicitRows.length > limit 275 const resultRows = hasMore ? explicitRows.slice(0, limit) : explicitRows 276 277 // Fetch implicit seeds (admins and moderators) 278 const implicitUsers = await db 279 .select({ 280 did: users.did, 281 handle: users.handle, 282 displayName: users.displayName, 283 role: users.role, 284 firstSeenAt: users.firstSeenAt, 285 }) 286 .from(users) 287 .where(sql`${users.role} IN ('admin', 'moderator')`) 288 289 // Merge explicit seeds with implicit ones 290 const explicitDids = new Set(resultRows.map((r) => r.seed.did)) 291 const implicitSeeds = implicitUsers 292 .filter((u) => !explicitDids.has(u.did)) 293 .map((u) => ({ 294 id: 0, 295 did: u.did, 296 handle: u.handle, 297 displayName: u.displayName, 298 communityId: null, 299 addedBy: 'system', 300 reason: `Implicit trust seed (${u.role})`, 301 implicit: true, 302 createdAt: u.firstSeenAt.toISOString(), 303 })) 304 305 let nextCursor: string | null = null 306 if (hasMore) { 307 const lastRow = resultRows[resultRows.length - 1] 308 if (lastRow) { 309 nextCursor = encodeCursor(lastRow.seed.createdAt.toISOString(), lastRow.seed.id) 310 } 311 } 312 313 return reply.status(200).send({ 314 seeds: [...resultRows.map((r) => serializeTrustSeed(r, false)), ...implicitSeeds], 315 cursor: nextCursor, 316 }) 317 } 318 ) 319 320 // ------------------------------------------------------------------- 321 // POST /api/admin/trust-seeds 322 // ------------------------------------------------------------------- 323 324 app.post( 325 '/api/admin/trust-seeds', 326 { 327 preHandler: [requireAdmin], 328 schema: { 329 tags: ['Admin - Sybil'], 330 summary: 'Add a trust seed (triggers trust graph recompute)', 331 security: [{ bearerAuth: [] }], 332 body: { 333 type: 'object', 334 required: ['did'], 335 properties: { 336 did: { type: 'string', minLength: 1 }, 337 communityId: { type: 'string' }, 338 reason: { type: 'string', maxLength: 500 }, 339 }, 340 }, 341 response: { 342 201: trustSeedJsonSchema, 343 400: errorResponseSchema, 344 401: errorResponseSchema, 345 403: errorResponseSchema, 346 404: errorResponseSchema, 347 }, 348 }, 349 }, 350 async (request, reply) => { 351 const admin = request.user 352 if (!admin) { 353 return reply.status(401).send({ error: 'Authentication required' }) 354 } 355 356 const parsed = trustSeedCreateSchema.safeParse(request.body) 357 if (!parsed.success) { 358 throw badRequest('Invalid trust seed data') 359 } 360 361 const { did, communityId, reason } = parsed.data 362 363 // Validate DID exists in users table and fetch handle/displayName 364 const userRows = await db 365 .select({ did: users.did, handle: users.handle, displayName: users.displayName }) 366 .from(users) 367 .where(eq(users.did, did)) 368 369 if (userRows.length === 0) { 370 throw notFound('User not found') 371 } 372 373 const inserted = await db 374 .insert(trustSeeds) 375 .values({ 376 did, 377 communityId: communityId ?? '', 378 addedBy: admin.did, 379 reason: reason ?? null, 380 }) 381 .returning() 382 383 const seed = inserted[0] 384 if (!seed) { 385 throw badRequest('Failed to create trust seed') 386 } 387 388 app.log.info({ seedId: seed.id, did, addedBy: admin.did }, 'Trust seed added') 389 390 // Fire-and-forget trust graph recomputation 391 triggerRecompute(app) 392 393 const user = userRows[0] 394 return reply 395 .status(201) 396 .send( 397 serializeTrustSeed( 398 { seed, handle: user?.handle ?? null, displayName: user?.displayName ?? null }, 399 false 400 ) 401 ) 402 } 403 ) 404 405 // ------------------------------------------------------------------- 406 // DELETE /api/admin/trust-seeds/:id 407 // ------------------------------------------------------------------- 408 409 app.delete( 410 '/api/admin/trust-seeds/:id', 411 { 412 preHandler: [requireAdmin], 413 schema: { 414 tags: ['Admin - Sybil'], 415 summary: 'Remove a trust seed (triggers recompute)', 416 security: [{ bearerAuth: [] }], 417 params: { 418 type: 'object', 419 required: ['id'], 420 properties: { id: { type: 'string' } }, 421 }, 422 response: { 423 204: { type: 'null' as const }, 424 400: errorResponseSchema, 425 404: errorResponseSchema, 426 }, 427 }, 428 }, 429 async (request, reply) => { 430 const { id } = request.params as { id: string } 431 const seedId = Number(id) 432 if (Number.isNaN(seedId)) { 433 throw badRequest('Invalid seed ID') 434 } 435 436 const existing = await db 437 .select({ id: trustSeeds.id }) 438 .from(trustSeeds) 439 .where(eq(trustSeeds.id, seedId)) 440 441 if (existing.length === 0) { 442 throw notFound('Trust seed not found') 443 } 444 445 await db.delete(trustSeeds).where(eq(trustSeeds.id, seedId)) 446 447 app.log.info({ seedId }, 'Trust seed removed') 448 449 // Fire-and-forget trust graph recomputation 450 triggerRecompute(app) 451 452 return reply.status(204).send() 453 } 454 ) 455 456 // ======================================================================= 457 // SYBIL CLUSTER ROUTES 458 // ======================================================================= 459 460 // ------------------------------------------------------------------- 461 // GET /api/admin/sybil-clusters 462 // ------------------------------------------------------------------- 463 464 app.get( 465 '/api/admin/sybil-clusters', 466 { 467 preHandler: [requireAdmin], 468 schema: { 469 tags: ['Admin - Sybil'], 470 summary: 'List sybil clusters (paginated, filterable)', 471 security: [{ bearerAuth: [] }], 472 querystring: { 473 type: 'object', 474 properties: { 475 status: { type: 'string', enum: ['flagged', 'dismissed', 'monitoring', 'banned'] }, 476 cursor: { type: 'string' }, 477 limit: { type: 'string' }, 478 sort: { type: 'string', enum: ['detected_at', 'member_count', 'confidence'] }, 479 }, 480 }, 481 response: { 482 200: { 483 type: 'object', 484 properties: { 485 clusters: { type: 'array', items: sybilClusterJsonSchema }, 486 cursor: { type: ['string', 'null'] }, 487 }, 488 }, 489 400: errorResponseSchema, 490 }, 491 }, 492 }, 493 async (request, reply) => { 494 const parsed = clusterQuerySchema.safeParse(request.query) 495 if (!parsed.success) { 496 throw badRequest('Invalid query parameters') 497 } 498 499 const { status, cursor, limit, sort } = parsed.data 500 const conditions = [] 501 502 if (status) { 503 conditions.push(eq(sybilClusters.status, status)) 504 } 505 506 if (cursor) { 507 const decoded = decodeCursor(cursor) 508 if (decoded) { 509 conditions.push( 510 sql`(${sybilClusters.detectedAt}, ${sybilClusters.id}) < (${decoded.createdAt}::timestamptz, ${decoded.id})` 511 ) 512 } 513 } 514 515 const whereClause = conditions.length > 0 ? and(...conditions) : undefined 516 const fetchLimit = limit + 1 517 518 // Determine sort order 519 let orderByCol 520 switch (sort) { 521 case 'member_count': 522 orderByCol = desc(sybilClusters.memberCount) 523 break 524 case 'confidence': 525 // L5: Sort by suspicion ratio (internal / (internal + external)) 526 orderByCol = desc( 527 sql`CASE WHEN (${sybilClusters.internalEdgeCount} + ${sybilClusters.externalEdgeCount}) > 0 528 THEN ${sybilClusters.internalEdgeCount}::real / (${sybilClusters.internalEdgeCount} + ${sybilClusters.externalEdgeCount})::real 529 ELSE 0 END` 530 ) 531 break 532 default: 533 orderByCol = desc(sybilClusters.detectedAt) 534 } 535 536 const rows = await db 537 .select() 538 .from(sybilClusters) 539 .where(whereClause) 540 .orderBy(orderByCol) 541 .limit(fetchLimit) 542 543 const hasMore = rows.length > limit 544 const resultRows = hasMore ? rows.slice(0, limit) : rows 545 546 let nextCursor: string | null = null 547 if (hasMore) { 548 const lastRow = resultRows[resultRows.length - 1] 549 if (lastRow) { 550 nextCursor = encodeCursor(lastRow.detectedAt.toISOString(), lastRow.id) 551 } 552 } 553 554 return reply.status(200).send({ 555 clusters: resultRows.map(serializeCluster), 556 cursor: nextCursor, 557 }) 558 } 559 ) 560 561 // ------------------------------------------------------------------- 562 // GET /api/admin/sybil-clusters/:id 563 // ------------------------------------------------------------------- 564 565 app.get( 566 '/api/admin/sybil-clusters/:id', 567 { 568 preHandler: [requireAdmin], 569 schema: { 570 tags: ['Admin - Sybil'], 571 summary: 'Get sybil cluster detail with enriched member list', 572 security: [{ bearerAuth: [] }], 573 params: { 574 type: 'object', 575 required: ['id'], 576 properties: { id: { type: 'string' } }, 577 }, 578 response: { 579 200: { 580 type: 'object', 581 properties: { 582 ...sybilClusterJsonSchema.properties, 583 members: { 584 type: 'array', 585 items: clusterMemberJsonSchema, 586 }, 587 }, 588 }, 589 400: errorResponseSchema, 590 404: errorResponseSchema, 591 }, 592 }, 593 }, 594 async (request, reply) => { 595 const { id } = request.params as { id: string } 596 const clusterId = Number(id) 597 if (Number.isNaN(clusterId)) { 598 throw badRequest('Invalid cluster ID') 599 } 600 601 const clusterRows = await db 602 .select() 603 .from(sybilClusters) 604 .where(eq(sybilClusters.id, clusterId)) 605 606 const cluster = clusterRows[0] 607 if (!cluster) { 608 throw notFound('Sybil cluster not found') 609 } 610 611 // M5: Enriched member list with user data and trust scores 612 const members = await db 613 .select({ 614 did: sybilClusterMembers.did, 615 roleInCluster: sybilClusterMembers.roleInCluster, 616 joinedAt: sybilClusterMembers.joinedAt, 617 handle: users.handle, 618 displayName: users.displayName, 619 reputationScore: users.reputationScore, 620 accountCreatedAt: users.accountCreatedAt, 621 trustScore: trustScores.score, 622 }) 623 .from(sybilClusterMembers) 624 .leftJoin(users, eq(sybilClusterMembers.did, users.did)) 625 .leftJoin(trustScores, eq(sybilClusterMembers.did, trustScores.did)) 626 .where(eq(sybilClusterMembers.clusterId, clusterId)) 627 628 return reply.status(200).send({ 629 ...serializeCluster(cluster), 630 members: members.map((m) => ({ 631 did: m.did, 632 handle: m.handle ?? null, 633 displayName: m.displayName ?? null, 634 trustScore: m.trustScore ?? null, 635 reputationScore: m.reputationScore ?? 0, 636 accountAge: m.accountCreatedAt?.toISOString() ?? null, 637 roleInCluster: m.roleInCluster, 638 joinedAt: m.joinedAt.toISOString(), 639 })), 640 }) 641 } 642 ) 643 644 // ------------------------------------------------------------------- 645 // PUT /api/admin/sybil-clusters/:id 646 // ------------------------------------------------------------------- 647 648 app.put( 649 '/api/admin/sybil-clusters/:id', 650 { 651 preHandler: [requireAdmin], 652 schema: { 653 tags: ['Admin - Sybil'], 654 summary: 'Update sybil cluster status (handles ban propagation)', 655 security: [{ bearerAuth: [] }], 656 params: { 657 type: 'object', 658 required: ['id'], 659 properties: { id: { type: 'string' } }, 660 }, 661 body: { 662 type: 'object', 663 required: ['status'], 664 properties: { 665 status: { type: 'string', enum: ['dismissed', 'monitoring', 'banned'] }, 666 }, 667 }, 668 response: { 669 200: sybilClusterJsonSchema, 670 400: errorResponseSchema, 671 401: errorResponseSchema, 672 403: errorResponseSchema, 673 404: errorResponseSchema, 674 }, 675 }, 676 }, 677 async (request, reply) => { 678 const admin = request.user 679 if (!admin) { 680 return reply.status(401).send({ error: 'Authentication required' }) 681 } 682 683 const { id } = request.params as { id: string } 684 const clusterId = Number(id) 685 if (Number.isNaN(clusterId)) { 686 throw badRequest('Invalid cluster ID') 687 } 688 689 const parsed = clusterStatusUpdateSchema.safeParse(request.body) 690 if (!parsed.success) { 691 throw badRequest('Invalid status update') 692 } 693 694 const clusterRows = await db 695 .select() 696 .from(sybilClusters) 697 .where(eq(sybilClusters.id, clusterId)) 698 699 const cluster = clusterRows[0] 700 if (!cluster) { 701 throw notFound('Sybil cluster not found') 702 } 703 704 const now = new Date() 705 const updated = await db 706 .update(sybilClusters) 707 .set({ 708 status: parsed.data.status, 709 reviewedBy: admin.did, 710 reviewedAt: now, 711 updatedAt: now, 712 }) 713 .where(eq(sybilClusters.id, clusterId)) 714 .returning() 715 716 const updatedCluster = updated[0] 717 if (!updatedCluster) { 718 throw notFound('Cluster not found after update') 719 } 720 721 // If status is 'banned', propagate ban to all cluster members 722 if (parsed.data.status === 'banned') { 723 const members = await db 724 .select({ did: sybilClusterMembers.did }) 725 .from(sybilClusterMembers) 726 .where(eq(sybilClusterMembers.clusterId, clusterId)) 727 728 for (const member of members) { 729 await db.update(users).set({ isBanned: true }).where(eq(users.did, member.did)) 730 } 731 732 app.log.warn( 733 { 734 clusterId, 735 bannedDids: members.map((m) => m.did), 736 adminDid: admin.did, 737 }, 738 'Sybil cluster banned, propagated to all members' 739 ) 740 } else { 741 app.log.info( 742 { clusterId, status: parsed.data.status, adminDid: admin.did }, 743 'Sybil cluster status updated' 744 ) 745 } 746 747 return reply.status(200).send(serializeCluster(updatedCluster)) 748 } 749 ) 750 751 // ======================================================================= 752 // PDS TRUST FACTOR ROUTES 753 // ======================================================================= 754 755 // ------------------------------------------------------------------- 756 // GET /api/admin/pds-trust 757 // ------------------------------------------------------------------- 758 759 app.get( 760 '/api/admin/pds-trust', 761 { 762 preHandler: [requireAdmin], 763 schema: { 764 tags: ['Admin - Sybil'], 765 summary: 'List PDS trust factors (with defaults)', 766 security: [{ bearerAuth: [] }], 767 querystring: { 768 type: 'object', 769 properties: { 770 cursor: { type: 'string' }, 771 limit: { type: 'string' }, 772 }, 773 }, 774 response: { 775 200: { 776 type: 'object', 777 properties: { 778 factors: { type: 'array', items: pdsTrustJsonSchema }, 779 cursor: { type: ['string', 'null'] }, 780 }, 781 }, 782 400: errorResponseSchema, 783 }, 784 }, 785 }, 786 async (request, reply) => { 787 const parsed = pdsTrustQuerySchema.safeParse(request.query) 788 if (!parsed.success) { 789 throw badRequest('Invalid query parameters') 790 } 791 792 const { cursor, limit } = parsed.data 793 const conditions = [] 794 795 if (cursor) { 796 const decoded = decodeCursor(cursor) 797 if (decoded) { 798 conditions.push( 799 sql`(${pdsTrustFactors.updatedAt}, ${pdsTrustFactors.id}) < (${decoded.createdAt}::timestamptz, ${decoded.id})` 800 ) 801 } 802 } 803 804 const whereClause = conditions.length > 0 ? and(...conditions) : undefined 805 const fetchLimit = limit + 1 806 807 const rows = await db 808 .select() 809 .from(pdsTrustFactors) 810 .where(whereClause) 811 .orderBy(desc(pdsTrustFactors.updatedAt)) 812 .limit(fetchLimit) 813 814 const hasMore = rows.length > limit 815 const resultRows = hasMore ? rows.slice(0, limit) : rows 816 817 let nextCursor: string | null = null 818 if (hasMore) { 819 const lastRow = resultRows[resultRows.length - 1] 820 if (lastRow) { 821 nextCursor = encodeCursor(lastRow.updatedAt.toISOString(), lastRow.id) 822 } 823 } 824 825 return reply.status(200).send({ 826 factors: resultRows.map(serializePdsTrust), 827 cursor: nextCursor, 828 }) 829 } 830 ) 831 832 // ------------------------------------------------------------------- 833 // PUT /api/admin/pds-trust 834 // ------------------------------------------------------------------- 835 836 app.put( 837 '/api/admin/pds-trust', 838 { 839 preHandler: [requireAdmin], 840 schema: { 841 tags: ['Admin - Sybil'], 842 summary: 'Create or update PDS trust factor override', 843 security: [{ bearerAuth: [] }], 844 body: { 845 type: 'object', 846 required: ['pdsHost', 'trustFactor'], 847 properties: { 848 pdsHost: { type: 'string', minLength: 1 }, 849 trustFactor: { type: 'number', minimum: 0, maximum: 1 }, 850 }, 851 }, 852 response: { 853 200: pdsTrustJsonSchema, 854 400: errorResponseSchema, 855 }, 856 }, 857 }, 858 async (request, reply) => { 859 const parsed = pdsTrustUpdateSchema.safeParse(request.body) 860 if (!parsed.success) { 861 throw badRequest('Invalid PDS trust data') 862 } 863 864 const { pdsHost, trustFactor } = parsed.data 865 const now = new Date() 866 867 const upserted = await db 868 .insert(pdsTrustFactors) 869 .values({ 870 pdsHost, 871 trustFactor, 872 isDefault: false, 873 updatedAt: now, 874 }) 875 .onConflictDoUpdate({ 876 target: [pdsTrustFactors.pdsHost], 877 set: { 878 trustFactor, 879 isDefault: false, 880 updatedAt: now, 881 }, 882 }) 883 .returning() 884 885 const row = upserted[0] 886 if (!row) { 887 throw badRequest('Failed to upsert PDS trust factor') 888 } 889 890 app.log.info({ pdsHost, trustFactor }, 'PDS trust factor updated') 891 892 return reply.status(200).send(serializePdsTrust(row)) 893 } 894 ) 895 896 // ======================================================================= 897 // TRUST GRAPH ADMIN ROUTES 898 // ======================================================================= 899 900 // ------------------------------------------------------------------- 901 // POST /api/admin/trust-graph/recompute 902 // ------------------------------------------------------------------- 903 904 app.post( 905 '/api/admin/trust-graph/recompute', 906 { 907 preHandler: [requireAdmin], 908 schema: { 909 tags: ['Admin - Sybil'], 910 summary: 'Trigger trust graph recomputation (rate limited: 1/hour)', 911 security: [{ bearerAuth: [] }], 912 response: { 913 202: { 914 type: 'object', 915 properties: { 916 message: { type: 'string' }, 917 startedAt: { type: 'string', format: 'date-time' }, 918 }, 919 }, 920 429: errorResponseSchema, 921 }, 922 }, 923 }, 924 async (_request, reply) => { 925 // Rate limit: 1 recompute per hour 926 try { 927 const lastRecompute = await cache.get(RECOMPUTE_CACHE_KEY) 928 if (lastRecompute) { 929 const lastTime = Number(lastRecompute) 930 if (Date.now() - lastTime < RECOMPUTE_COOLDOWN_MS) { 931 throw tooManyRequests('Trust graph recompute is rate limited to once per hour') 932 } 933 } 934 } catch (err) { 935 if (err instanceof Error && err.message.includes('rate limited')) { 936 throw err 937 } 938 // Cache errors are non-critical, proceed 939 } 940 941 const now = new Date() 942 943 // Mark recompute as started in cache 944 try { 945 await cache.set(RECOMPUTE_CACHE_KEY, String(now.getTime()), 'EX', 3600) 946 } catch { 947 // Non-critical 948 } 949 950 // H5: Trigger actual trust graph recomputation (fire-and-forget) 951 triggerRecompute(app) 952 953 return reply.status(202).send({ 954 message: 'Trust graph recomputation started', 955 startedAt: now.toISOString(), 956 }) 957 } 958 ) 959 960 // ------------------------------------------------------------------- 961 // GET /api/admin/trust-graph/status 962 // ------------------------------------------------------------------- 963 964 app.get( 965 '/api/admin/trust-graph/status', 966 { 967 preHandler: [requireAdmin], 968 schema: { 969 tags: ['Admin - Sybil'], 970 summary: 'Get trust graph computation stats', 971 security: [{ bearerAuth: [] }], 972 response: { 973 200: { 974 type: 'object', 975 properties: { 976 lastComputedAt: { type: ['string', 'null'] }, 977 totalNodes: { type: 'number' }, 978 totalEdges: { type: 'number' }, 979 computationDurationMs: { type: ['number', 'null'] }, 980 clustersFlagged: { type: 'number' }, 981 nextScheduledAt: { type: ['string', 'null'] }, 982 }, 983 }, 984 }, 985 }, 986 }, 987 async (_request, reply) => { 988 // Get last recompute time from cache 989 let lastComputedAt: string | null = null 990 let computationDurationMs: number | null = null 991 let nextScheduledAt: string | null = null 992 try { 993 const cached = await cache.get(RECOMPUTE_CACHE_KEY) 994 if (cached) { 995 const lastTime = Number(cached) 996 lastComputedAt = new Date(lastTime).toISOString() 997 // Next scheduled: 1 hour after last computation 998 nextScheduledAt = new Date(lastTime + RECOMPUTE_COOLDOWN_MS).toISOString() 999 } 1000 1001 // Check for stored duration 1002 const durationCached = await cache.get('trust-graph:last-duration-ms') 1003 if (durationCached) { 1004 computationDurationMs = Number(durationCached) 1005 } 1006 } catch { 1007 // Non-critical 1008 } 1009 1010 // C2: Get counts from database using Drizzle ORM (no raw SQL) 1011 const [nodeRows, edgeRows, flaggedRows] = await Promise.all([ 1012 db.select({ nodeCount: count() }).from(trustScores), 1013 db.select({ edgeCount: count() }).from(interactionGraph), 1014 db 1015 .select({ flaggedCount: count() }) 1016 .from(sybilClusters) 1017 .where(eq(sybilClusters.status, 'flagged')), 1018 ]) 1019 1020 return reply.status(200).send({ 1021 lastComputedAt, 1022 totalNodes: nodeRows[0]?.nodeCount ?? 0, 1023 totalEdges: edgeRows[0]?.edgeCount ?? 0, 1024 computationDurationMs, 1025 clustersFlagged: flaggedRows[0]?.flaggedCount ?? 0, 1026 nextScheduledAt, 1027 }) 1028 } 1029 ) 1030 1031 // ======================================================================= 1032 // BEHAVIORAL FLAGS ROUTES 1033 // ======================================================================= 1034 1035 // ------------------------------------------------------------------- 1036 // GET /api/admin/behavioral-flags 1037 // ------------------------------------------------------------------- 1038 1039 app.get( 1040 '/api/admin/behavioral-flags', 1041 { 1042 preHandler: [requireAdmin], 1043 schema: { 1044 tags: ['Admin - Sybil'], 1045 summary: 'List behavioral flags (paginated)', 1046 security: [{ bearerAuth: [] }], 1047 querystring: { 1048 type: 'object', 1049 properties: { 1050 flagType: { 1051 type: 'string', 1052 enum: ['burst_voting', 'content_similarity', 'low_diversity'], 1053 }, 1054 status: { type: 'string', enum: ['pending', 'dismissed', 'action_taken'] }, 1055 cursor: { type: 'string' }, 1056 limit: { type: 'string' }, 1057 }, 1058 }, 1059 response: { 1060 200: { 1061 type: 'object', 1062 properties: { 1063 flags: { type: 'array', items: behavioralFlagJsonSchema }, 1064 cursor: { type: ['string', 'null'] }, 1065 }, 1066 }, 1067 400: errorResponseSchema, 1068 }, 1069 }, 1070 }, 1071 async (request, reply) => { 1072 const parsed = behavioralFlagQuerySchema.safeParse(request.query) 1073 if (!parsed.success) { 1074 throw badRequest('Invalid query parameters') 1075 } 1076 1077 const { flagType, status, cursor, limit } = parsed.data 1078 const conditions = [] 1079 1080 if (flagType) { 1081 conditions.push(eq(behavioralFlags.flagType, flagType)) 1082 } 1083 if (status) { 1084 conditions.push(eq(behavioralFlags.status, status)) 1085 } 1086 if (cursor) { 1087 const decoded = decodeCursor(cursor) 1088 if (decoded) { 1089 conditions.push( 1090 sql`(${behavioralFlags.detectedAt}, ${behavioralFlags.id}) < (${decoded.createdAt}::timestamptz, ${decoded.id})` 1091 ) 1092 } 1093 } 1094 1095 const whereClause = conditions.length > 0 ? and(...conditions) : undefined 1096 const fetchLimit = limit + 1 1097 1098 const rows = await db 1099 .select() 1100 .from(behavioralFlags) 1101 .where(whereClause) 1102 .orderBy(desc(behavioralFlags.detectedAt)) 1103 .limit(fetchLimit) 1104 1105 const hasMore = rows.length > limit 1106 const resultRows = hasMore ? rows.slice(0, limit) : rows 1107 1108 let nextCursor: string | null = null 1109 if (hasMore) { 1110 const lastRow = resultRows[resultRows.length - 1] 1111 if (lastRow) { 1112 nextCursor = encodeCursor(lastRow.detectedAt.toISOString(), lastRow.id) 1113 } 1114 } 1115 1116 return reply.status(200).send({ 1117 flags: resultRows.map(serializeBehavioralFlag), 1118 cursor: nextCursor, 1119 }) 1120 } 1121 ) 1122 1123 // ------------------------------------------------------------------- 1124 // PUT /api/admin/behavioral-flags/:id 1125 // ------------------------------------------------------------------- 1126 1127 app.put( 1128 '/api/admin/behavioral-flags/:id', 1129 { 1130 preHandler: [requireAdmin], 1131 schema: { 1132 tags: ['Admin - Sybil'], 1133 summary: 'Update behavioral flag status', 1134 security: [{ bearerAuth: [] }], 1135 params: { 1136 type: 'object', 1137 required: ['id'], 1138 properties: { id: { type: 'string' } }, 1139 }, 1140 body: { 1141 type: 'object', 1142 required: ['status'], 1143 properties: { 1144 status: { type: 'string', enum: ['dismissed', 'action_taken'] }, 1145 }, 1146 }, 1147 response: { 1148 200: behavioralFlagJsonSchema, 1149 400: errorResponseSchema, 1150 404: errorResponseSchema, 1151 }, 1152 }, 1153 }, 1154 async (request, reply) => { 1155 const { id } = request.params as { id: string } 1156 const flagId = Number(id) 1157 if (Number.isNaN(flagId)) { 1158 throw badRequest('Invalid flag ID') 1159 } 1160 1161 const parsed = behavioralFlagUpdateSchema.safeParse(request.body) 1162 if (!parsed.success) { 1163 throw badRequest('Invalid status update') 1164 } 1165 1166 const existing = await db 1167 .select() 1168 .from(behavioralFlags) 1169 .where(eq(behavioralFlags.id, flagId)) 1170 1171 if (existing.length === 0) { 1172 throw notFound('Behavioral flag not found') 1173 } 1174 1175 const updated = await db 1176 .update(behavioralFlags) 1177 .set({ status: parsed.data.status }) 1178 .where(eq(behavioralFlags.id, flagId)) 1179 .returning() 1180 1181 const updatedFlag = updated[0] 1182 if (!updatedFlag) { 1183 throw notFound('Flag not found after update') 1184 } 1185 1186 app.log.info({ flagId, status: parsed.data.status }, 'Behavioral flag status updated') 1187 1188 return reply.status(200).send(serializeBehavioralFlag(updatedFlag)) 1189 } 1190 ) 1191 1192 done() 1193 } 1194}