open source is social v-it.org
1// SPDX-License-Identifier: MIT
2// Copyright (c) 2026 sol pbc
3
4import { loadConfig } from '../lib/config.js';
5import { CAP_COLLECTION, DEFAULT_JETSTREAM_URL } from '../lib/constants.js';
6import { resolveRef } from '../lib/cap-ref.js';
7import { brand } from '../lib/brand.js';
8
9let ws = null;
10let shuttingDown = false;
11let backoff = 1000;
12
13function buildUrl(baseUrl, collection, did, cursor) {
14 const url = new URL(baseUrl);
15 url.searchParams.set('wantedCollections', collection);
16 if (did) url.searchParams.set('wantedDids', did);
17 if (cursor) url.searchParams.set('cursor', cursor);
18 return url.toString();
19}
20
21function formatTime(timeUs) {
22 return new Date(timeUs / 1000).toLocaleTimeString();
23}
24
25function formatEvent(event) {
26 const time = formatTime(event.time_us);
27 const didShort = typeof event.did === 'string' ? event.did.slice(-12) : 'unknown';
28
29 if (event.kind === 'commit') {
30 const operation = event.commit?.operation?.toUpperCase?.() ?? 'UNKNOWN';
31 const collection = event.commit?.collection ?? 'unknown';
32 const rkey = event.commit?.rkey ?? 'unknown';
33
34 if (operation === 'DELETE') {
35 return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`;
36 }
37
38 const message = event.commit?.record?.title || event.commit?.record?.text;
39 const ref = event.commit?.cid ? resolveRef(event.commit.record, event.commit.cid) : null;
40 if (typeof message === 'string') {
41 const refPart = ref ? ` (${ref})` : '';
42 return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}${refPart} — "${message}"`;
43 }
44
45 return `[${time}] ${operation} ${collection} from ${didShort} rkey=${rkey}`;
46 }
47
48 if (event.kind === 'identity') {
49 return `[${time}] IDENTITY ${didShort}`;
50 }
51
52 if (event.kind === 'account') {
53 return `[${time}] ACCOUNT ${didShort} status=${event.account?.status}`;
54 }
55
56 return `[${time}] ${event.kind} from ${didShort}`;
57}
58
59function connect(opts, cursor) {
60 const url = buildUrl(opts.jetstreamUrl, opts.collection, opts.did, cursor);
61 let lastCursor = cursor;
62
63 ws = new WebSocket(url);
64
65 ws.onopen = () => {
66 backoff = 1000;
67 console.log(`Connected to ${url}`);
68 };
69
70 ws.onmessage = (event) => {
71 let msg;
72 try {
73 msg = JSON.parse(event.data);
74 } catch {
75 console.log('Warning: failed to parse message as JSON; skipping');
76 return;
77 }
78
79 if (msg.time_us) {
80 lastCursor = String(msg.time_us);
81 }
82
83 if (opts.verbose) {
84 console.log(JSON.stringify(msg, null, 2));
85 return;
86 }
87
88 console.log(formatEvent(msg));
89 };
90
91 ws.onclose = () => {
92 if (shuttingDown) {
93 return;
94 }
95
96 const delay = backoff;
97 backoff = Math.min(backoff * 2, 30000);
98 console.log(`Connection closed, reconnecting in ${delay}ms...`);
99 setTimeout(() => connect(opts, lastCursor), delay);
100 };
101
102 ws.onerror = (err) => {
103 const message = err?.message ?? 'unknown error';
104 console.error(`WebSocket error: ${message}`);
105 };
106}
107
108export default function register(program) {
109 program
110 .command('firehose')
111 .description('Listen to Jetstream for cap events')
112 .option('-v, --verbose', 'Show full JSON for each event')
113 .option('--did <did>', 'Filter by DID (reads saved DID from config if not provided)')
114 .option('--global', 'Show cap events from all DIDs across the network')
115 .option('--collection <nsid>', 'Collection NSID to filter', CAP_COLLECTION)
116 .option('--jetstream <url>', 'Jetstream WebSocket URL (default: VIT_JETSTREAM_URL env or built-in)')
117 .action(async (opts) => {
118 try {
119 if (opts.global && opts.did) {
120 console.error('error: --global and --did are mutually exclusive');
121 process.exitCode = 1;
122 return;
123 }
124
125 if (opts.global) {
126 opts.did = undefined;
127 } else if (!opts.did) {
128 const config = loadConfig();
129 if (config.did) {
130 opts.did = config.did;
131 }
132 }
133
134 const jetstreamUrl = opts.jetstream || process.env.VIT_JETSTREAM_URL || DEFAULT_JETSTREAM_URL;
135 opts.jetstreamUrl = jetstreamUrl;
136
137 for (const sig of ['SIGINT', 'SIGTERM']) {
138 process.on(sig, () => {
139 shuttingDown = true;
140 console.log('\nShutting down...');
141 if (ws) ws.close();
142 process.exit(0);
143 });
144 }
145
146 const url = buildUrl(jetstreamUrl, opts.collection, opts.did, null);
147 console.log(`${brand} firehose`);
148 console.log(` Collection: ${opts.collection}`);
149 if (opts.did) console.log(` DID filter: ${opts.did}`);
150 console.log(` Endpoint: ${url}`);
151 console.log(' Ctrl+C to stop\n');
152
153 connect(opts, null);
154 } catch (err) {
155 console.error(err instanceof Error ? err.message : String(err));
156 process.exitCode = 1;
157 }
158 });
159}