open source is social v-it.org
at main 159 lines 4.9 kB view raw
1// SPDX-License-Identifier: MIT 2// Copyright (c) 2026 sol pbc 3 4import { loadConfig } from '../lib/config.js'; 5import { CAP_COLLECTION, DEFAULT_JETSTREAM_URL } from '../lib/constants.js'; 6import { resolveRef } from '../lib/cap-ref.js'; 7import { brand } from '../lib/brand.js'; 8 9let ws = null; 10let shuttingDown = false; 11let backoff = 1000; 12 13function buildUrl(baseUrl, collection, did, cursor) { 14 const url = new URL(baseUrl); 15 url.searchParams.set('wantedCollections', collection); 16 if (did) url.searchParams.set('wantedDids', did); 17 if (cursor) url.searchParams.set('cursor', cursor); 18 return url.toString(); 19} 20 21function formatTime(timeUs) { 22 return new Date(timeUs / 1000).toLocaleTimeString(); 23} 24 25function formatEvent(event) { 26 const time = formatTime(event.time_us); 27 const didShort = typeof event.did === 'string' ? event.did.slice(-12) : 'unknown'; 28 29 if (event.kind === 'commit') { 30 const operation = event.commit?.operation?.toUpperCase?.() ?? 'UNKNOWN'; 31 const collection = event.commit?.collection ?? 'unknown'; 32 const rkey = event.commit?.rkey ?? 'unknown'; 33 34 if (operation === 'DELETE') { 35 return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`; 36 } 37 38 const message = event.commit?.record?.title || event.commit?.record?.text; 39 const ref = event.commit?.cid ? resolveRef(event.commit.record, event.commit.cid) : null; 40 if (typeof message === 'string') { 41 const refPart = ref ? ` (${ref})` : ''; 42 return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}${refPart} — "${message}"`; 43 } 44 45 return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`; 46 } 47 48 if (event.kind === 'identity') { 49 return `[${time}] IDENTITY ${didShort}`; 50 } 51 52 if (event.kind === 'account') { 53 return `[${time}] ACCOUNT ${didShort} status=${event.account?.status}`; 54 } 55 56 return `[${time}] ${event.kind} from ${didShort}`; 57} 58 59function connect(opts, cursor) { 60 const url = buildUrl(opts.jetstreamUrl, opts.collection, opts.did, cursor); 61 let lastCursor = cursor; 62 63 ws = new WebSocket(url); 64 65 ws.onopen = () => { 66 backoff = 1000; 67 console.log(`Connected to ${url}`); 68 }; 69 70 ws.onmessage = (event) => { 71 let msg; 72 try { 73 msg = JSON.parse(event.data); 74 } catch { 75 console.log('Warning: failed to parse message as JSON; skipping'); 76 return; 77 } 78 79 if (msg.time_us) { 80 lastCursor = String(msg.time_us); 81 } 82 83 if (opts.verbose) { 84 console.log(JSON.stringify(msg, null, 2)); 85 return; 86 } 87 88 console.log(formatEvent(msg)); 89 }; 90 91 ws.onclose = () => { 92 if (shuttingDown) { 93 return; 94 } 95 96 const delay = backoff; 97 backoff = Math.min(backoff * 2, 30000); 98 console.log(`Connection closed, reconnecting in ${delay}ms...`); 99 setTimeout(() => connect(opts, lastCursor), delay); 100 }; 101 102 ws.onerror = (err) => { 103 const message = err?.message ?? 'unknown error'; 104 console.error(`WebSocket error: ${message}`); 105 }; 106} 107 108export default function register(program) { 109 program 110 .command('firehose') 111 .description('Listen to Jetstream for cap events') 112 .option('-v, --verbose', 'Show full JSON for each event') 113 .option('--did <did>', 'Filter by DID (reads saved DID from config if not provided)') 114 .option('--global', 'Show cap events from all DIDs across the network') 115 .option('--collection <nsid>', 'Collection NSID to filter', CAP_COLLECTION) 116 .option('--jetstream <url>', 'Jetstream WebSocket URL (default: VIT_JETSTREAM_URL env or built-in)') 117 .action(async (opts) => { 118 try { 119 if (opts.global && opts.did) { 120 console.error('error: --global and --did are mutually exclusive'); 121 process.exitCode = 1; 122 return; 123 } 124 125 if (opts.global) { 126 opts.did = undefined; 127 } else if (!opts.did) { 128 const config = loadConfig(); 129 if (config.did) { 130 opts.did = config.did; 131 } 132 } 133 134 const jetstreamUrl = opts.jetstream || process.env.VIT_JETSTREAM_URL || DEFAULT_JETSTREAM_URL; 135 opts.jetstreamUrl = jetstreamUrl; 136 137 for (const sig of ['SIGINT', 'SIGTERM']) { 138 process.on(sig, () => { 139 shuttingDown = true; 140 console.log('\nShutting down...'); 141 if (ws) ws.close(); 142 process.exit(0); 143 }); 144 } 145 146 const url = buildUrl(jetstreamUrl, opts.collection, opts.did, null); 147 console.log(`${brand} firehose`); 148 console.log(` Collection: ${opts.collection}`); 149 if (opts.did) console.log(` DID filter: ${opts.did}`); 150 console.log(` Endpoint: ${url}`); 151 console.log(' Ctrl+C to stop\n'); 152 153 connect(opts, null); 154 } catch (err) { 155 console.error(err instanceof Error ? err.message : String(err)); 156 process.exitCode = 1; 157 } 158 }); 159}