Sifa professional network API (Fastify, AT Protocol, Jetstream)
sifa.id/
1import WebSocket from 'ws';
2import { logger } from '../logger.js';
3import type { JetstreamEvent } from './types.js';
4
5export const WANTED_COLLECTIONS = [
6 'id.sifa.profile.self',
7 'id.sifa.profile.position',
8 'id.sifa.profile.education',
9 'id.sifa.profile.skill',
10 'id.sifa.profile.certification',
11 'id.sifa.profile.project',
12 'id.sifa.profile.volunteering',
13 'id.sifa.profile.publication',
14 'id.sifa.profile.course',
15 'id.sifa.profile.honor',
16 'id.sifa.profile.language',
17 'id.sifa.endorsement',
18 'id.sifa.endorsement.confirmation',
19 'id.sifa.graph.follow',
20 'id.sifa.profile.externalAccount',
21] as const;
22
23export function buildJetstreamUrl(baseUrl: string, cursor?: bigint): string {
24 const url = new URL(baseUrl);
25 for (const col of WANTED_COLLECTIONS) {
26 url.searchParams.append('wantedCollections', col);
27 }
28 if (cursor !== undefined) {
29 url.searchParams.set('cursor', cursor.toString());
30 }
31 return url.toString();
32}
33
34export interface JetstreamClientOptions {
35 url: string;
36 onEvent: (event: JetstreamEvent) => Promise<void>;
37 onError?: (error: Error) => void;
38 getCursor: () => Promise<bigint | undefined>;
39}
40
41export function createJetstreamClient(opts: JetstreamClientOptions) {
42 let ws: WebSocket | null = null;
43 let reconnectDelay = 1000;
44 let running = false;
45
46 async function connect() {
47 running = true;
48 const cursor = await opts.getCursor();
49 const url = buildJetstreamUrl(opts.url, cursor);
50
51 ws = new WebSocket(url);
52
53 ws.on('open', () => {
54 logger.info('Jetstream connected');
55 reconnectDelay = 1000;
56 });
57
58 ws.on('message', (data: Buffer) => {
59 void (async () => {
60 try {
61 const event = JSON.parse(data.toString()) as JetstreamEvent;
62 await opts.onEvent(event);
63 } catch (err) {
64 logger.error({ err }, 'Failed to process Jetstream event');
65 }
66 })();
67 });
68
69 ws.on('close', () => {
70 if (running) {
71 logger.warn({ reconnectDelay }, 'Jetstream disconnected, reconnecting');
72 setTimeout(() => {
73 void connect();
74 }, reconnectDelay);
75 reconnectDelay = Math.min(reconnectDelay * 2, 30000);
76 }
77 });
78
79 ws.on('error', (err) => {
80 logger.error({ err }, 'Jetstream WebSocket error');
81 opts.onError?.(err);
82 });
83 }
84
85 function disconnect() {
86 running = false;
87 ws?.close();
88 }
89
90 return { connect, disconnect };
91}