elasticsearch-based configurable generic appview for prototyping ideas
at main 2.0 kB view raw
1import { AppConfig } from "./config.ts"; 2import { indexDocument, deleteDocument, type OnEventCallback } from "./indexer.ts"; 3 4interface FirehoseOptions { 5 config: AppConfig; 6 onEvent: OnEventCallback; 7 forcedCursor?: string; 8} 9 10export function startFirehose({ config, onEvent, forcedCursor }: FirehoseOptions) { 11 let lastCursor: number | null = null; 12 13 const connect = () => { 14 const url = new URL(config.jetstream_url); 15 if (lastCursor !== null) { 16 url.searchParams.set("cursor", lastCursor.toString()); 17 } 18 if (forcedCursor) { 19 console.log("using forced cursor: ", forcedCursor) 20 url.searchParams.set("cursor", forcedCursor); 21 } 22 23 const ws = new WebSocket(url.toString()); 24 25 ws.onopen = () => { 26 console.log("Connected to Jetstream", lastCursor !== null ? `(resuming from cursor ${lastCursor})` : ""); 27 }; 28 29 ws.onmessage = async (msg) => { 30 const data = msg.data instanceof Blob ? await msg.data.text() : msg.data; 31 const evt = JSON.parse(data); 32 33 if (!evt?.commit) return; 34 35 lastCursor = evt.time_us; 36 37 const opType = evt.commit.operation; 38 const cid = evt.commit.cid; 39 const collection = evt.commit.collection; 40 const rkey = evt.commit.rkey; 41 const aturi = `at://${evt.did}/${collection}/${rkey}`; 42 const record = evt.commit.record; 43 const indexedAt = new Date(evt.time_us / 1000).toISOString(); 44 45 if (!config.record_types.includes(collection)) return; 46 47 if (opType === "create" || opType === "update") { 48 await indexDocument(config, onEvent, aturi, record, cid, indexedAt); 49 } else if (opType === "delete") { 50 await deleteDocument(config, onEvent, aturi); 51 } 52 }; 53 54 ws.onerror = (e) => { 55 console.error("Jetstream WebSocket error:", e); 56 }; 57 58 ws.onclose = (e) => { 59 console.warn("Jetstream disconnected. Attempting to reconnect in 1 second...", e.reason); 60 setTimeout(connect, 1000); 61 }; 62 }; 63 64 connect(); 65}