Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/
at main 88 lines 3.2 kB view raw
1import type { JetstreamEvent } from './types.js'; 2import { logger } from '../logger.js'; 3import type { Database } from '../db/index.js'; 4import { profiles } from '../db/schema/index.js'; 5 6export interface IndexerMap { 7 profileIndexer?: (event: JetstreamEvent) => Promise<void>; 8 positionIndexer?: (event: JetstreamEvent) => Promise<void>; 9 educationIndexer?: (event: JetstreamEvent) => Promise<void>; 10 skillIndexer?: (event: JetstreamEvent) => Promise<void>; 11 certificationIndexer?: (event: JetstreamEvent) => Promise<void>; 12 projectIndexer?: (event: JetstreamEvent) => Promise<void>; 13 volunteeringIndexer?: (event: JetstreamEvent) => Promise<void>; 14 publicationIndexer?: (event: JetstreamEvent) => Promise<void>; 15 courseIndexer?: (event: JetstreamEvent) => Promise<void>; 16 honorIndexer?: (event: JetstreamEvent) => Promise<void>; 17 languageIndexer?: (event: JetstreamEvent) => Promise<void>; 18 followIndexer?: (event: JetstreamEvent) => Promise<void>; 19 externalAccountIndexer?: (event: JetstreamEvent) => Promise<void>; 20} 21 22const COLLECTION_MAP: Record<string, keyof IndexerMap> = { 23 'id.sifa.profile.self': 'profileIndexer', 24 'id.sifa.profile.position': 'positionIndexer', 25 'id.sifa.profile.education': 'educationIndexer', 26 'id.sifa.profile.skill': 'skillIndexer', 27 'id.sifa.profile.certification': 'certificationIndexer', 28 'id.sifa.profile.project': 'projectIndexer', 29 'id.sifa.profile.volunteering': 'volunteeringIndexer', 30 'id.sifa.profile.publication': 'publicationIndexer', 31 'id.sifa.profile.course': 'courseIndexer', 32 'id.sifa.profile.honor': 'honorIndexer', 33 'id.sifa.profile.language': 'languageIndexer', 34 'id.sifa.graph.follow': 'followIndexer', 35 'id.sifa.profile.externalAccount': 'externalAccountIndexer', 36}; 37 38export function createEventRouter(db: Database, indexers: IndexerMap) { 39 return async (event: JetstreamEvent) => { 40 if (event.kind === 'identity' && event.identity?.handle) { 41 await db 42 .insert(profiles) 43 .values({ 44 did: event.identity.did, 45 handle: event.identity.handle, 46 createdAt: new Date(), 47 indexedAt: new Date(), 48 updatedAt: new Date(), 49 }) 50 .onConflictDoUpdate({ 51 target: profiles.did, 52 set: { 53 handle: event.identity.handle, 54 updatedAt: new Date(), 55 }, 56 }); 57 return; 58 } 59 60 if (event.kind !== 'commit' || !event.commit) return; 61 62 const indexerKey = COLLECTION_MAP[event.commit.collection]; 63 if (!indexerKey) return; 64 65 const indexer = indexers[indexerKey]; 66 if (!indexer) { 67 logger.warn({ collection: event.commit.collection }, 'No indexer registered'); 68 return; 69 } 70 71 // Ensure profile row exists before child record indexers run (FK constraint). 72 // Skip for profile indexer itself and for delete operations. 73 if (indexerKey !== 'profileIndexer' && event.commit.operation !== 'delete') { 74 await db 75 .insert(profiles) 76 .values({ 77 did: event.did, 78 handle: '', 79 createdAt: new Date(), 80 indexedAt: new Date(), 81 updatedAt: new Date(), 82 }) 83 .onConflictDoNothing(); 84 } 85 86 await indexer(event); 87 }; 88}