Relay firehose browser tools: https://compare.hose.cam
1import { useCallback, useEffect, useState } from 'react';
2import { Firehose } from '@skyware/firehose';
3import * as TID from '@atcute/tid';
4import { BarChart } from '@mui/x-charts/BarChart';
5import './Relay.css';
6
7type HoseState = 'connecting' | 'connected' | 'errored' | 'closed';
8
9const TIME_SIGNAL_NSID = 'app.bsky.feed.like';
10const BUCKET_WIDTH = 32; // ms
11const BUCKETS = 32;
12const MAX_BUCKET = BUCKET_WIDTH * BUCKETS;
13
14function Relay({ url, desc, includeEvents, onRecieveEvent }: {
15 url: string,
16 desc: string,
17 includeEvents: Set,
18 onRecieveEvent: (type: string, event: any) => void,
19}) {
20 const [state, setState] = useState('connecting' as HoseState);
21 const [commits, setCommits] = useState(0);
22 const [reconnects, setReconnects] = useState(0);
23 const [buckets, setBuckets] = useState({
24 idx: Array.from({ length: BUCKETS + 2 }).map(() => 0),
25 recv: Array.from({ length: BUCKETS + 2 }).map(() => 0),
26 });
27
28 useEffect(() => {
29 const sendIt = (type: string, event: any) => {
30 if (!includeEvents.has(type)) return;
31 onRecieveEvent(type, event);
32 setCommits(n => n + 1);
33 if (type === 'commit' && event.ops.length === 1) {
34 const op = event.ops[0];
35 try {
36 const [nsid, rkey] = op.path.split('/');
37 if (nsid === TIME_SIGNAL_NSID) {
38 const posted = TID.parse(rkey).timestamp / 1000;
39 const indexed = Date.parse(event.time)
40 const indexed_dt = indexed - posted;
41 const received_dt = +new Date() - indexed;
42
43 let idx_bucket, recv_bucket;
44
45 if (indexed_dt < 0) {
46 idx_bucket = -1;
47 } else if (indexed_dt >= MAX_BUCKET) {
48 idx_bucket = BUCKETS;
49 } else {
50 idx_bucket = Math.min(Math.floor(indexed_dt / BUCKET_WIDTH), MAX_BUCKET)
51 }
52 if (received_dt < 0) {
53 recv_bucket = -1;
54 } else if (received_dt >= MAX_BUCKET) {
55 recv_bucket = BUCKETS
56 } else {
57 recv_bucket = Math.min(Math.floor(received_dt / BUCKET_WIDTH), MAX_BUCKET)
58 }
59
60 setBuckets(({ idx, recv }) => {
61 idx = idx.slice();
62 recv = recv.slice();
63 idx[idx_bucket + 1] += 1;
64 recv[recv_bucket + 1] += 1;
65 return { idx, recv };
66 });
67 }
68 } catch (e) {}
69 }
70 };
71 const firehose = new Firehose({ relay: url });
72 firehose.on('open', () => setState('connected'));
73 firehose.on('close', () => setState('closed'));
74 firehose.on('reconnect', () => setReconnects(n => n + 1));
75 firehose.on('error', e => {
76 console.error('oops', e);
77 setState('errored');
78 });
79 firehose.on('websocketError', () => setState('errored'));
80 firehose.on('commit', (ev) => sendIt('commit', ev));
81 firehose.on('sync', (ev) => sendIt('sync', ev));
82 firehose.on('account', (ev) => sendIt('account', ev));
83 firehose.on('identity', (ev) => sendIt('identity', ev));
84 firehose.on('info', (...args) => console.info('info event', ...args));
85 firehose.on('unknown', e => console.warn(`unknown event from ${url}`, e));
86 firehose.start();
87
88 return () => {
89 firehose.close();
90 };
91 }, [url, includeEvents]);
92
93 return (
94 <div className="relay">
95 <h2>{ desc }</h2>
96 <p><code>{ url }</code></p>
97 <p>[<code>{ state }</code>] (<code>{ commits.toLocaleString() }</code> events)</p>
98 {(reconnects > 0) && (
99 <p>reconnects: <code>{reconnects}</code></p>
100 )}
101 <BarChart
102 height={180}
103 width={420}
104 yAxis={[{
105 label: 'events',
106 scaleType: 'symlog',
107 }]}
108 skipAnimation={true}
109 xAxis={[{
110 data: [-1]
111 .concat(Array.from({ length: BUCKETS }).map((_, i) => i * BUCKET_WIDTH))
112 .concat(['+']),
113 label: 'index latency (ms)',
114 }]}
115 series={[{
116 data: buckets.idx,
117 }]}
118 />
119 <BarChart
120 height={180}
121 width={420}
122 yAxis={[{
123 label: 'events',
124 scaleType: 'symlog',
125 }]}
126 skipAnimation={true}
127 xAxis={[{
128 data: [-1]
129 .concat(Array.from({ length: BUCKETS }).map((_, i) => i * BUCKET_WIDTH))
130 .concat(['+']),
131 label: 'receive latency (ms)',
132 }]}
133 series={[{
134 data: buckets.recv,
135 }]}
136 />
137 </div>
138 );
139}
140
141export default Relay;