atmosphere explorer
pds.ls
tool
typescript
atproto
1import { Firehose } from "@skyware/firehose";
2import { Title } from "@solidjs/meta";
3import { A, useLocation, useSearchParams } from "@solidjs/router";
4import { createSignal, For, onCleanup, onMount, Show } from "solid-js";
5import { Button } from "../components/button";
6import { JSONValue } from "../components/json";
7import { StickyOverlay } from "../components/sticky";
8import { TextInput } from "../components/text-input";
9
10const LIMIT = 25;
11type Parameter = { name: string; param: string | string[] | undefined };
12
13const StreamView = () => {
14 const [searchParams, setSearchParams] = useSearchParams();
15 const [parameters, setParameters] = createSignal<Parameter[]>([]);
16 const streamType = useLocation().pathname === "/firehose" ? "firehose" : "jetstream";
17 const [records, setRecords] = createSignal<Array<any>>([]);
18 const [connected, setConnected] = createSignal(false);
19 const [notice, setNotice] = createSignal("");
20 let socket: WebSocket;
21 let firehose: Firehose;
22 let formRef!: HTMLFormElement;
23 let pendingRecords: any[] = [];
24 let rafId: number | null = null;
25
26 const addRecord = (record: any) => {
27 pendingRecords.push(record);
28 if (rafId === null) {
29 rafId = requestAnimationFrame(() => {
30 setRecords(records().concat(pendingRecords).slice(-LIMIT));
31 pendingRecords = [];
32 rafId = null;
33 });
34 }
35 };
36
37 const disconnect = () => {
38 if (streamType === "jetstream") socket?.close();
39 else firehose?.close();
40 if (rafId !== null) {
41 cancelAnimationFrame(rafId);
42 rafId = null;
43 }
44 pendingRecords = [];
45 setConnected(false);
46 };
47
48 const connectSocket = async (formData: FormData) => {
49 setNotice("");
50 if (connected()) {
51 disconnect();
52 return;
53 }
54 setRecords([]);
55
56 let url = "";
57 if (streamType === "jetstream") {
58 url =
59 formData.get("instance")?.toString() ?? "wss://jetstream1.us-east.bsky.network/subscribe";
60 url = url.concat("?");
61 } else {
62 url = formData.get("instance")?.toString() ?? "wss://bsky.network";
63 url = url.replace("/xrpc/com.atproto.sync.subscribeRepos", "");
64 if (!(url.startsWith("wss://") || url.startsWith("ws://"))) url = "wss://" + url;
65 }
66
67 const collections = formData.get("collections")?.toString().split(",");
68 collections?.forEach((collection) => {
69 if (collection.length) url = url.concat(`wantedCollections=${collection}&`);
70 });
71
72 const dids = formData.get("dids")?.toString().split(",");
73 dids?.forEach((did) => {
74 if (did.length) url = url.concat(`wantedDids=${did}&`);
75 });
76
77 const cursor = formData.get("cursor")?.toString();
78 if (streamType === "jetstream") {
79 if (cursor?.length) url = url.concat(`cursor=${cursor}`);
80 if (url.endsWith("&")) url = url.slice(0, -1);
81 }
82
83 setSearchParams({
84 instance: formData.get("instance")?.toString(),
85 collections: formData.get("collections")?.toString(),
86 dids: formData.get("dids")?.toString(),
87 cursor: formData.get("cursor")?.toString(),
88 allEvents: formData.get("allEvents")?.toString(),
89 });
90
91 setParameters([
92 { name: "Instance", param: formData.get("instance")?.toString() },
93 { name: "Collections", param: formData.get("collections")?.toString() },
94 { name: "DIDs", param: formData.get("dids")?.toString() },
95 { name: "Cursor", param: formData.get("cursor")?.toString() },
96 { name: "All Events", param: formData.get("allEvents")?.toString() },
97 ]);
98
99 setConnected(true);
100 if (streamType === "jetstream") {
101 socket = new WebSocket(url);
102 socket.addEventListener("message", (event) => {
103 const rec = JSON.parse(event.data);
104 if (searchParams.allEvents === "on" || (rec.kind !== "account" && rec.kind !== "identity"))
105 addRecord(rec);
106 });
107 socket.addEventListener("error", () => {
108 setNotice("Connection error");
109 setConnected(false);
110 });
111 } else {
112 firehose = new Firehose({
113 relay: url,
114 cursor: cursor,
115 autoReconnect: false,
116 });
117 firehose.on("error", (err) => {
118 console.error(err);
119 });
120 firehose.on("commit", (commit) => {
121 for (const op of commit.ops) {
122 const record = {
123 $type: commit.$type,
124 repo: commit.repo,
125 seq: commit.seq,
126 time: commit.time,
127 rev: commit.rev,
128 since: commit.since,
129 op: op,
130 };
131 addRecord(record);
132 }
133 });
134 firehose.on("identity", (identity) => {
135 addRecord(identity);
136 });
137 firehose.on("account", (account) => {
138 addRecord(account);
139 });
140 firehose.on("sync", (sync) => {
141 const event = {
142 $type: sync.$type,
143 did: sync.did,
144 rev: sync.rev,
145 seq: sync.seq,
146 time: sync.time,
147 };
148 addRecord(event);
149 });
150 firehose.start();
151 }
152 };
153
154 onMount(async () => {
155 const formData = new FormData();
156 if (searchParams.instance) formData.append("instance", searchParams.instance.toString());
157 if (searchParams.collections)
158 formData.append("collections", searchParams.collections.toString());
159 if (searchParams.dids) formData.append("dids", searchParams.dids.toString());
160 if (searchParams.cursor) formData.append("cursor", searchParams.cursor.toString());
161 if (searchParams.allEvents) formData.append("allEvents", searchParams.allEvents.toString());
162 if (searchParams.instance) connectSocket(formData);
163 });
164
165 onCleanup(() => {
166 socket?.close();
167 if (rafId !== null) {
168 cancelAnimationFrame(rafId);
169 }
170 });
171
172 return (
173 <>
174 <Title>{streamType === "firehose" ? "Firehose" : "Jetstream"} - PDSls</Title>
175 <div class="flex w-full flex-col items-center">
176 <div class="mb-1 flex gap-4 font-medium">
177 <A
178 class="flex items-center gap-1 border-b-2"
179 inactiveClass="border-transparent text-neutral-600 dark:text-neutral-400 hover:border-neutral-400 dark:hover:border-neutral-600"
180 href="/jetstream"
181 >
182 Jetstream
183 </A>
184 <A
185 class="flex items-center gap-1 border-b-2"
186 inactiveClass="border-transparent text-neutral-600 dark:text-neutral-400 hover:border-neutral-400 dark:hover:border-neutral-600"
187 href="/firehose"
188 >
189 Firehose
190 </A>
191 </div>
192 <StickyOverlay>
193 <form ref={formRef} class="flex w-full flex-col gap-1.5 text-sm">
194 <Show when={!connected()}>
195 <label class="flex items-center justify-end gap-x-1">
196 <span class="min-w-20">Instance</span>
197 <TextInput
198 name="instance"
199 value={
200 searchParams.instance ??
201 (streamType === "jetstream" ?
202 "wss://jetstream1.us-east.bsky.network/subscribe"
203 : "wss://bsky.network")
204 }
205 class="grow"
206 />
207 </label>
208 <Show when={streamType === "jetstream"}>
209 <label class="flex items-center justify-end gap-x-1">
210 <span class="min-w-20">Collections</span>
211 <textarea
212 name="collections"
213 spellcheck={false}
214 placeholder="Comma-separated list of collections"
215 value={searchParams.collections ?? ""}
216 class="dark:bg-dark-100 grow rounded-lg bg-white px-2 py-1 outline-1 outline-neutral-200 focus:outline-[1.5px] focus:outline-neutral-600 dark:outline-neutral-600 dark:focus:outline-neutral-400"
217 />
218 </label>
219 </Show>
220 <Show when={streamType === "jetstream"}>
221 <label class="flex items-center justify-end gap-x-1">
222 <span class="min-w-20">DIDs</span>
223 <textarea
224 name="dids"
225 spellcheck={false}
226 placeholder="Comma-separated list of DIDs"
227 value={searchParams.dids ?? ""}
228 class="dark:bg-dark-100 grow rounded-lg bg-white px-2 py-1 outline-1 outline-neutral-200 focus:outline-[1.5px] focus:outline-neutral-600 dark:outline-neutral-600 dark:focus:outline-neutral-400"
229 />
230 </label>
231 </Show>
232 <label class="flex items-center justify-end gap-x-1">
233 <span class="min-w-20">Cursor</span>
234 <TextInput
235 name="cursor"
236 placeholder="Leave empty for live-tail"
237 value={searchParams.cursor ?? ""}
238 class="grow"
239 />
240 </label>
241 <Show when={streamType === "jetstream"}>
242 <div class="flex items-center justify-end gap-x-1">
243 <input
244 type="checkbox"
245 name="allEvents"
246 id="allEvents"
247 checked={searchParams.allEvents === "on" ? true : false}
248 />
249 <label for="allEvents" class="select-none">
250 Show account and identity events
251 </label>
252 </div>
253 </Show>
254 </Show>
255 <Show when={connected()}>
256 <div class="flex flex-col gap-1 wrap-anywhere">
257 <For each={parameters()}>
258 {(param) => (
259 <Show when={param.param}>
260 <div class="flex">
261 <div class="min-w-24 font-semibold">{param.name}</div>
262 {param.param}
263 </div>
264 </Show>
265 )}
266 </For>
267 </div>
268 </Show>
269 <div class="flex justify-end">
270 <Show when={connected()}>
271 <button
272 type="button"
273 onmousedown={(e) => {
274 e.preventDefault();
275 disconnect();
276 }}
277 ontouchstart={(e) => {
278 e.preventDefault();
279 disconnect();
280 }}
281 class="dark:hover:bg-dark-200 dark:shadow-dark-700 dark:active:bg-dark-100 box-border flex h-7 items-center gap-1 rounded-lg border-[0.5px] border-neutral-300 bg-neutral-50 px-2 py-1.5 text-xs shadow-xs select-none hover:bg-neutral-100 active:bg-neutral-200 dark:border-neutral-700 dark:bg-neutral-800"
282 >
283 Disconnect
284 </button>
285 </Show>
286 <Show when={!connected()}>
287 <Button onClick={() => connectSocket(new FormData(formRef))}>Connect</Button>
288 </Show>
289 </div>
290 </form>
291 </StickyOverlay>
292 <Show when={notice().length}>
293 <div class="text-red-500 dark:text-red-400">{notice()}</div>
294 </Show>
295 <div class="flex w-full flex-col gap-2 divide-y-[0.5px] divide-neutral-500 font-mono text-sm wrap-anywhere whitespace-pre-wrap md:w-3xl">
296 <For each={records().toReversed()}>
297 {(rec) => (
298 <div class="pb-2">
299 <JSONValue data={rec} repo={rec.did ?? rec.repo} />
300 </div>
301 )}
302 </For>
303 </div>
304 </div>
305 </>
306 );
307};
308
309export { StreamView };