// SPDX-License-Identifier: MIT // Copyright (c) 2026 sol pbc import { loadConfig } from '../lib/config.js'; import { CAP_COLLECTION, DEFAULT_JETSTREAM_URL } from '../lib/constants.js'; import { resolveRef } from '../lib/cap-ref.js'; import { brand } from '../lib/brand.js'; let ws = null; let shuttingDown = false; let backoff = 1000; function buildUrl(baseUrl, collection, did, cursor) { const url = new URL(baseUrl); url.searchParams.set('wantedCollections', collection); if (did) url.searchParams.set('wantedDids', did); if (cursor) url.searchParams.set('cursor', cursor); return url.toString(); } function formatTime(timeUs) { return new Date(timeUs / 1000).toLocaleTimeString(); } function formatEvent(event) { const time = formatTime(event.time_us); const didShort = typeof event.did === 'string' ? event.did.slice(-12) : 'unknown'; if (event.kind === 'commit') { const operation = event.commit?.operation?.toUpperCase?.() ?? 'UNKNOWN'; const collection = event.commit?.collection ?? 'unknown'; const rkey = event.commit?.rkey ?? 'unknown'; if (operation === 'DELETE') { return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`; } const message = event.commit?.record?.title || event.commit?.record?.text; const ref = event.commit?.cid ? resolveRef(event.commit.record, event.commit.cid) : null; if (typeof message === 'string') { const refPart = ref ? ` (${ref})` : ''; return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}${refPart} — "${message}"`; } return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`; } if (event.kind === 'identity') { return `[${time}] IDENTITY ${didShort}`; } if (event.kind === 'account') { return `[${time}] ACCOUNT ${didShort} status=${event.account?.status}`; } return `[${time}] ${event.kind} from ${didShort}`; } function connect(opts, cursor) { const url = buildUrl(opts.jetstreamUrl, opts.collection, opts.did, cursor); let lastCursor = cursor; ws = new WebSocket(url); ws.onopen = () => { backoff = 1000; console.log(`Connected to ${url}`); }; ws.onmessage = (event) => { let msg; try { msg = JSON.parse(event.data); } catch { console.log('Warning: failed to parse message as JSON; skipping'); return; } if (msg.time_us) { lastCursor = String(msg.time_us); } if (opts.verbose) { console.log(JSON.stringify(msg, null, 2)); return; } console.log(formatEvent(msg)); }; ws.onclose = () => { if (shuttingDown) { return; } const delay = backoff; backoff = Math.min(backoff * 2, 30000); console.log(`Connection closed, reconnecting in ${delay}ms...`); setTimeout(() => connect(opts, lastCursor), delay); }; ws.onerror = (err) => { const message = err?.message ?? 'unknown error'; console.error(`WebSocket error: ${message}`); }; } export default function register(program) { program .command('firehose') .description('Listen to Jetstream for cap events') .option('-v, --verbose', 'Show full JSON for each event') .option('--did ', 'Filter by DID (reads saved DID from config if not provided)') .option('--global', 'Show cap events from all DIDs across the network') .option('--collection ', 'Collection NSID to filter', CAP_COLLECTION) .option('--jetstream ', 'Jetstream WebSocket URL (default: VIT_JETSTREAM_URL env or built-in)') .action(async (opts) => { try { if (opts.global && opts.did) { console.error('error: --global and --did are mutually exclusive'); process.exitCode = 1; return; } if (opts.global) { opts.did = undefined; } else if (!opts.did) { const config = loadConfig(); if (config.did) { opts.did = config.did; } } const jetstreamUrl = opts.jetstream || process.env.VIT_JETSTREAM_URL || DEFAULT_JETSTREAM_URL; opts.jetstreamUrl = jetstreamUrl; for (const sig of ['SIGINT', 'SIGTERM']) { process.on(sig, () => { shuttingDown = true; console.log('\nShutting down...'); if (ws) ws.close(); process.exit(0); }); } const url = buildUrl(jetstreamUrl, opts.collection, opts.did, null); console.log(`${brand} firehose`); console.log(` Collection: ${opts.collection}`); if (opts.did) console.log(` DID filter: ${opts.did}`); console.log(` Endpoint: ${url}`); console.log(' Ctrl+C to stop\n'); connect(opts, null); } catch (err) { console.error(err instanceof Error ? err.message : String(err)); process.exitCode = 1; } }); }