elasticsearch-based configurable generic appview for prototyping ideas
1import { AppConfig } from "./config.ts";
2import { indexDocument, deleteDocument, type OnEventCallback } from "./indexer.ts";
3
4interface FirehoseOptions {
5 config: AppConfig;
6 onEvent: OnEventCallback;
7 forcedCursor?: string;
8}
9
10export function startFirehose({ config, onEvent, forcedCursor }: FirehoseOptions) {
11 let lastCursor: number | null = null;
12
13 const connect = () => {
14 const url = new URL(config.jetstream_url);
15 if (lastCursor !== null) {
16 url.searchParams.set("cursor", lastCursor.toString());
17 }
18 if (forcedCursor) {
19 console.log("using forced cursor: ", forcedCursor)
20 url.searchParams.set("cursor", forcedCursor);
21 }
22
23 const ws = new WebSocket(url.toString());
24
25 ws.onopen = () => {
26 console.log("Connected to Jetstream", lastCursor !== null ? `(resuming from cursor ${lastCursor})` : "");
27 };
28
29 ws.onmessage = async (msg) => {
30 const data = msg.data instanceof Blob ? await msg.data.text() : msg.data;
31 const evt = JSON.parse(data);
32
33 if (!evt?.commit) return;
34
35 lastCursor = evt.time_us;
36
37 const opType = evt.commit.operation;
38 const cid = evt.commit.cid;
39 const collection = evt.commit.collection;
40 const rkey = evt.commit.rkey;
41 const aturi = `at://${evt.did}/${collection}/${rkey}`;
42 const record = evt.commit.record;
43 const indexedAt = new Date(evt.time_us / 1000).toISOString();
44
45 if (!config.record_types.includes(collection)) return;
46
47 if (opType === "create" || opType === "update") {
48 await indexDocument(config, onEvent, aturi, record, cid, indexedAt);
49 } else if (opType === "delete") {
50 await deleteDocument(config, onEvent, aturi);
51 }
52 };
53
54 ws.onerror = (e) => {
55 console.error("Jetstream WebSocket error:", e);
56 };
57
58 ws.onclose = (e) => {
59 console.warn("Jetstream disconnected. Attempting to reconnect in 1 second...", e.reason);
60 setTimeout(connect, 1000);
61 };
62 };
63
64 connect();
65}