Relay firehose browser tools: https://compare.hose.cam
at main 4.5 kB view raw
1import { useCallback, useEffect, useState } from 'react'; 2import { Firehose } from '@skyware/firehose'; 3import * as TID from '@atcute/tid'; 4import { BarChart } from '@mui/x-charts/BarChart'; 5import './Relay.css'; 6 7type HoseState = 'connecting' | 'connected' | 'errored' | 'closed'; 8 9const TIME_SIGNAL_NSID = 'app.bsky.feed.like'; 10const BUCKET_WIDTH = 32; // ms 11const BUCKETS = 32; 12const MAX_BUCKET = BUCKET_WIDTH * BUCKETS; 13 14function Relay({ url, desc, includeEvents, onRecieveEvent }: { 15 url: string, 16 desc: string, 17 includeEvents: Set, 18 onRecieveEvent: (type: string, event: any) => void, 19}) { 20 const [state, setState] = useState('connecting' as HoseState); 21 const [commits, setCommits] = useState(0); 22 const [reconnects, setReconnects] = useState(0); 23 const [buckets, setBuckets] = useState({ 24 idx: Array.from({ length: BUCKETS + 2 }).map(() => 0), 25 recv: Array.from({ length: BUCKETS + 2 }).map(() => 0), 26 }); 27 28 useEffect(() => { 29 const sendIt = (type: string, event: any) => { 30 if (!includeEvents.has(type)) return; 31 onRecieveEvent(type, event); 32 setCommits(n => n + 1); 33 if (type === 'commit' && event.ops.length === 1) { 34 const op = event.ops[0]; 35 try { 36 const [nsid, rkey] = op.path.split('/'); 37 if (nsid === TIME_SIGNAL_NSID) { 38 const posted = TID.parse(rkey).timestamp / 1000; 39 const indexed = Date.parse(event.time) 40 const indexed_dt = indexed - posted; 41 const received_dt = +new Date() - indexed; 42 43 let idx_bucket, recv_bucket; 44 45 if (indexed_dt < 0) { 46 idx_bucket = -1; 47 } else if (indexed_dt >= MAX_BUCKET) { 48 idx_bucket = BUCKETS; 49 } else { 50 idx_bucket = Math.min(Math.floor(indexed_dt / BUCKET_WIDTH), MAX_BUCKET) 51 } 52 if (received_dt < 0) { 53 recv_bucket = -1; 54 } else if (received_dt >= MAX_BUCKET) { 55 recv_bucket = BUCKETS 56 } else { 57 recv_bucket = Math.min(Math.floor(received_dt / BUCKET_WIDTH), MAX_BUCKET) 58 } 59 60 setBuckets(({ idx, recv }) => { 61 idx = idx.slice(); 62 recv = recv.slice(); 63 idx[idx_bucket + 1] += 1; 64 recv[recv_bucket + 1] += 1; 65 return { idx, recv }; 66 }); 67 } 68 } catch (e) {} 69 } 70 }; 71 const firehose = new Firehose({ relay: url }); 72 firehose.on('open', () => setState('connected')); 73 firehose.on('close', () => setState('closed')); 74 firehose.on('reconnect', () => setReconnects(n => n + 1)); 75 firehose.on('error', e => { 76 console.error('oops', e); 77 setState('errored'); 78 }); 79 firehose.on('websocketError', () => setState('errored')); 80 firehose.on('commit', (ev) => sendIt('commit', ev)); 81 firehose.on('sync', (ev) => sendIt('sync', ev)); 82 firehose.on('account', (ev) => sendIt('account', ev)); 83 firehose.on('identity', (ev) => sendIt('identity', ev)); 84 firehose.on('info', (...args) => console.info('info event', ...args)); 85 firehose.on('unknown', e => console.warn(`unknown event from ${url}`, e)); 86 firehose.start(); 87 88 return () => { 89 firehose.close(); 90 }; 91 }, [url, includeEvents]); 92 93 return ( 94 <div className="relay"> 95 <h2>{ desc }</h2> 96 <p><code>{ url }</code></p> 97 <p>[<code>{ state }</code>] (<code>{ commits.toLocaleString() }</code> events)</p> 98 {(reconnects > 0) && ( 99 <p>reconnects: <code>{reconnects}</code></p> 100 )} 101 <BarChart 102 height={180} 103 width={420} 104 yAxis={[{ 105 label: 'events', 106 scaleType: 'symlog', 107 }]} 108 skipAnimation={true} 109 xAxis={[{ 110 data: [-1] 111 .concat(Array.from({ length: BUCKETS }).map((_, i) => i * BUCKET_WIDTH)) 112 .concat(['+']), 113 label: 'index latency (ms)', 114 }]} 115 series={[{ 116 data: buckets.idx, 117 }]} 118 /> 119 <BarChart 120 height={180} 121 width={420} 122 yAxis={[{ 123 label: 'events', 124 scaleType: 'symlog', 125 }]} 126 skipAnimation={true} 127 xAxis={[{ 128 data: [-1] 129 .concat(Array.from({ length: BUCKETS }).map((_, i) => i * BUCKET_WIDTH)) 130 .concat(['+']), 131 label: 'receive latency (ms)', 132 }]} 133 series={[{ 134 data: buckets.recv, 135 }]} 136 /> 137 </div> 138 ); 139} 140 141export default Relay;