Sifa professional network API (Fastify, AT Protocol, Jetstream) sifa.id/
at main 91 lines 2.4 kB view raw
1import WebSocket from 'ws'; 2import { logger } from '../logger.js'; 3import type { JetstreamEvent } from './types.js'; 4 5export const WANTED_COLLECTIONS = [ 6 'id.sifa.profile.self', 7 'id.sifa.profile.position', 8 'id.sifa.profile.education', 9 'id.sifa.profile.skill', 10 'id.sifa.profile.certification', 11 'id.sifa.profile.project', 12 'id.sifa.profile.volunteering', 13 'id.sifa.profile.publication', 14 'id.sifa.profile.course', 15 'id.sifa.profile.honor', 16 'id.sifa.profile.language', 17 'id.sifa.endorsement', 18 'id.sifa.endorsement.confirmation', 19 'id.sifa.graph.follow', 20 'id.sifa.profile.externalAccount', 21] as const; 22 23export function buildJetstreamUrl(baseUrl: string, cursor?: bigint): string { 24 const url = new URL(baseUrl); 25 for (const col of WANTED_COLLECTIONS) { 26 url.searchParams.append('wantedCollections', col); 27 } 28 if (cursor !== undefined) { 29 url.searchParams.set('cursor', cursor.toString()); 30 } 31 return url.toString(); 32} 33 34export interface JetstreamClientOptions { 35 url: string; 36 onEvent: (event: JetstreamEvent) => Promise<void>; 37 onError?: (error: Error) => void; 38 getCursor: () => Promise<bigint | undefined>; 39} 40 41export function createJetstreamClient(opts: JetstreamClientOptions) { 42 let ws: WebSocket | null = null; 43 let reconnectDelay = 1000; 44 let running = false; 45 46 async function connect() { 47 running = true; 48 const cursor = await opts.getCursor(); 49 const url = buildJetstreamUrl(opts.url, cursor); 50 51 ws = new WebSocket(url); 52 53 ws.on('open', () => { 54 logger.info('Jetstream connected'); 55 reconnectDelay = 1000; 56 }); 57 58 ws.on('message', (data: Buffer) => { 59 void (async () => { 60 try { 61 const event = JSON.parse(data.toString()) as JetstreamEvent; 62 await opts.onEvent(event); 63 } catch (err) { 64 logger.error({ err }, 'Failed to process Jetstream event'); 65 } 66 })(); 67 }); 68 69 ws.on('close', () => { 70 if (running) { 71 logger.warn({ reconnectDelay }, 'Jetstream disconnected, reconnecting'); 72 setTimeout(() => { 73 void connect(); 74 }, reconnectDelay); 75 reconnectDelay = Math.min(reconnectDelay * 2, 30000); 76 } 77 }); 78 79 ws.on('error', (err) => { 80 logger.error({ err }, 'Jetstream WebSocket error'); 81 opts.onError?.(err); 82 }); 83 } 84 85 function disconnect() { 86 running = false; 87 ws?.close(); 88 } 89 90 return { connect, disconnect }; 91}