forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 🎵
1import { ctx } from "./context.ts";
2import logger from "./logger.ts";
3import schema from "./schema/mod.ts";
4import { asc, inArray } from "drizzle-orm";
5import type { SelectEvent } from "./schema/event.ts";
6import { assureAdminAuth, parseTapEvent } from "@atproto/tap";
7import { addToBatch, flushBatch } from "./batch.ts";
8
9const PAGE_SIZE = 100;
10const BATCH_SEND_SIZE = 50;
11const ADMIN_PASSWORD = Deno.env.get("TAP_ADMIN_PASSWORD")!;
12
13interface ClientState {
14 socket: WebSocket;
15 isPaginating: boolean;
16 queue: SelectEvent[];
17 dids?: string[];
18}
19
20const connectedClients = new Map<WebSocket, ClientState>();
21
22function safeSend(
23 socket: WebSocket,
24 message: string,
25 eventCount?: number,
26): boolean {
27 try {
28 if (socket.readyState === WebSocket.OPEN) {
29 socket.send(message);
30 if (eventCount !== undefined && eventCount % 50 === 0) {
31 logger.info`📤 Sent ${eventCount} events, readyState: ${socket.readyState}`;
32 }
33 return true;
34 } else {
35 logger.error`❌ Cannot send - socket readyState: ${socket.readyState}`;
36 }
37 } catch (error) {
38 logger.error`Failed to send message: ${error}`;
39 logger.error`Socket readyState: ${socket.readyState}`;
40 }
41 return false;
42}
43
44function formatEvent(evt: SelectEvent): string {
45 const { createdAt: _createdAt, record, ...rest } = evt;
46 if (record) {
47 return JSON.stringify({ ...rest, record: JSON.parse(record) });
48 }
49 return JSON.stringify(rest);
50}
51
52export function broadcastEvent(evt: SelectEvent) {
53 const message = formatEvent(evt);
54
55 for (const [socket, state] of connectedClients.entries()) {
56 if (socket.readyState === WebSocket.OPEN) {
57 if (
58 state.dids &&
59 state.dids.length > 0 &&
60 !state.dids.includes(evt.did)
61 ) {
62 continue; // Skip events not matching the DID filter
63 }
64
65 if (state.isPaginating) {
66 state.queue.push(evt);
67 } else {
68 safeSend(socket, message);
69 }
70 }
71 }
72}
73
74Deno.serve(
75 { port: parseInt(Deno.env.get("WS_PORT") || "2481") },
76 async (req) => {
77 if (req.method === "POST") {
78 try {
79 assureAdminAuth(ADMIN_PASSWORD, req.headers.get("authorization")!);
80 } catch {
81 logger.warn`Unauthorized access attempt ${req.headers.get("authorization")}`;
82 return new Response(null, { status: 401 });
83 }
84 const evt = parseTapEvent(await req.json());
85 switch (evt.type) {
86 case "identity": {
87 addToBatch({
88 id: evt.id,
89 type: evt.type,
90 did: evt.did,
91 handle: evt.handle,
92 status: evt.status,
93 isActive: evt.isActive,
94 action: null,
95 rev: null,
96 collection: null,
97 rkey: null,
98 record: null,
99 cid: null,
100 live: null,
101 });
102 logger.info`New identity: ${evt.did} ${evt.handle} ${evt.status}`;
103 break;
104 }
105 case "record": {
106 addToBatch({
107 id: evt.id,
108 type: evt.type,
109 action: evt.action,
110 did: evt.did,
111 rev: evt.rev,
112 collection: evt.collection,
113 rkey: evt.rkey,
114 record: JSON.stringify(evt.record),
115 cid: evt.cid,
116 live: evt.live,
117 handle: null,
118 status: null,
119 isActive: null,
120 });
121 const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`;
122 logger.info`New record: ${uri}`;
123 break;
124 }
125 }
126
127 return new Response(null, { status: 200 });
128 }
129
130 if (req.headers.get("upgrade") != "websocket") {
131 return new Response(null, { status: 426 });
132 }
133
134 const { socket, response } = Deno.upgradeWebSocket(req);
135
136 const url = new URL(req.url);
137 const didsParam = url.searchParams.get("dids");
138 const dids = didsParam
139 ? didsParam
140 .split(",")
141 .map((d) => d.trim())
142 .filter((d) => d.length > 0)
143 : undefined;
144
145 socket.addEventListener("open", () => {
146 logger.info`✅ Client connected! Socket state: ${socket.readyState}`;
147 if (dids && dids.length > 0) {
148 logger.info`🔍 Filtering by DIDs: ${dids.join(", ")}`;
149 }
150
151 connectedClients.set(socket, {
152 socket,
153 isPaginating: true,
154 queue: [],
155 dids,
156 });
157
158 safeSend(
159 socket,
160 JSON.stringify({
161 type: "connected",
162 message: "Ready to stream events",
163 }),
164 );
165 logger.info`📤 Sent connection confirmation`;
166
167 (async () => {
168 try {
169 let page = 0;
170 let hasMore = true;
171 let totalEvents = 0;
172
173 logger.info`📖 Starting pagination...`;
174
175 try {
176 const testQuery = await ctx.db
177 .select()
178 .from(schema.events)
179 .limit(1)
180 .execute();
181 logger.info`✅ Database test query successful, found ${testQuery.length} sample event(s)`;
182 } catch (dbError) {
183 logger.error`❌ Database test query failed: ${dbError}`;
184 throw dbError;
185 }
186
187 while (hasMore && socket.readyState === WebSocket.OPEN) {
188 let query = ctx.db.select().from(schema.events).$dynamic();
189
190 // Apply DID filter if specified
191 if (dids && dids.length > 0) {
192 query = query.where(inArray(schema.events.did, dids));
193 }
194
195 const events = await query
196 .orderBy(asc(schema.events.createdAt))
197 .offset(page * PAGE_SIZE)
198 .limit(PAGE_SIZE)
199 .execute();
200
201 if (page % 10 === 0) {
202 logger.info`📄 Fetching page ${page}... (${totalEvents} events sent so far)`;
203 }
204
205 // Batch send events for better performance
206 const batchMessages: string[] = [];
207 for (let i = 0; i < events.length; i++) {
208 const evt = events[i];
209
210 if (socket.readyState !== WebSocket.OPEN) {
211 logger.info`⚠️ Socket closed during pagination at event ${totalEvents}`;
212 return;
213 }
214
215 batchMessages.push(formatEvent(evt));
216
217 // Send batch when full or at end of page
218 if (
219 batchMessages.length >= BATCH_SEND_SIZE ||
220 i === events.length - 1
221 ) {
222 const batchMessage = `[${batchMessages.join(",")}]`;
223 const success = safeSend(socket, batchMessage, totalEvents);
224
225 if (success) {
226 totalEvents += batchMessages.length;
227 batchMessages.length = 0; // Clear batch
228 } else {
229 logger.error`❌ Failed to send batch at ${totalEvents}, stopping pagination`;
230 return;
231 }
232 }
233 }
234
235 hasMore = events.length === PAGE_SIZE;
236 page++;
237
238 if (hasMore && page % 5 === 0) {
239 await new Promise((resolve) => setTimeout(resolve, 20));
240 }
241 }
242
243 logger.info`📤 Sent all historical events: ${totalEvents} total (${page} pages)`;
244
245 const clientState = connectedClients.get(socket);
246 if (clientState && socket.readyState === WebSocket.OPEN) {
247 const queuedCount = clientState.queue.length;
248
249 if (queuedCount > 0) {
250 logger.info`📦 Sending ${queuedCount} queued events...`;
251
252 // Batch send queued events
253 const queueMessages: string[] = [];
254 for (const evt of clientState.queue) {
255 if (socket.readyState !== WebSocket.OPEN) break;
256
257 queueMessages.push(formatEvent(evt));
258
259 if (queueMessages.length >= BATCH_SEND_SIZE) {
260 safeSend(socket, `[${queueMessages.join(",")}]`);
261 queueMessages.length = 0;
262 }
263 }
264
265 if (queueMessages.length > 0) {
266 safeSend(socket, `[${queueMessages.join(",")}]`);
267 }
268
269 clientState.queue = [];
270 }
271
272 clientState.isPaginating = false;
273 logger.info`🔄 Now streaming real-time events...`;
274 }
275 } catch (error) {
276 logger.error`Pagination error: ${error}`;
277 logger.error`Stack: ${error instanceof Error ? error.stack : ""}`;
278
279 safeSend(
280 socket,
281 JSON.stringify({
282 type: "error",
283 message: "Failed to load historical events",
284 }),
285 );
286
287 const clientState = connectedClients.get(socket);
288 if (clientState) {
289 clientState.isPaginating = false;
290 }
291 }
292 })().catch((err) => {
293 logger.error`Unhandled error in pagination loop: ${err}`;
294 logger.error`Stack: ${err instanceof Error ? err.stack : ""}`;
295 });
296 });
297
298 socket.addEventListener("message", (event) => {
299 try {
300 if (event.data === "ping") {
301 safeSend(socket, "pong");
302 }
303 } catch (error) {
304 logger.error`Error handling message: ${error}`;
305 }
306 });
307
308 socket.addEventListener("close", (event) => {
309 const clientState = connectedClients.get(socket);
310 connectedClients.delete(socket);
311
312 logger.info`❌ Client disconnected. Code: ${event.code}, Reason: ${event.reason || "none"}, Clean: ${event.wasClean}`;
313 logger.info` Active clients: ${connectedClients.size}`;
314
315 if (clientState) {
316 logger.info` Was paginating: ${clientState.isPaginating}`;
317 logger.info` Queued events: ${clientState.queue.length}`;
318 }
319
320 if (event.code === 1006) {
321 logger.error`⚠️ Abnormal closure (1006) detected - connection dropped unexpectedly`;
322 logger.error` Possible causes:`;
323 logger.error` - Client overwhelmed with messages (try reducing PAGE_SIZE)`;
324 logger.error` - Network timeout or interruption`;
325 logger.error` - Server sent messages too fast`;
326 logger.error` - Uncaught exception in message handling`;
327 }
328 });
329
330 socket.addEventListener("error", (error) => {
331 logger.error`❌ WebSocket error occurred`;
332 logger.error` Error: ${error}`;
333 logger.error` ReadyState: ${socket.readyState}`;
334 const clientState = connectedClients.get(socket);
335 if (clientState) {
336 logger.error` Was paginating: ${clientState.isPaginating}`;
337 logger.error` Queued events: ${clientState.queue.length}`;
338 }
339 connectedClients.delete(socket);
340 });
341
342 return response;
343 },
344);
345
346globalThis.addEventListener("beforeunload", () => {
347 flushBatch();
348});
349
350const url = `ws://localhost:${Deno.env.get("WS_PORT") || 2481}`;
351logger.info`🚀 Tap WebSocket server is running! ${url}`;