A decentralized music tracking and discovery platform built on AT Protocol 🎵
at main 351 lines 11 kB view raw
1import { ctx } from "./context.ts"; 2import logger from "./logger.ts"; 3import schema from "./schema/mod.ts"; 4import { asc, inArray } from "drizzle-orm"; 5import type { SelectEvent } from "./schema/event.ts"; 6import { assureAdminAuth, parseTapEvent } from "@atproto/tap"; 7import { addToBatch, flushBatch } from "./batch.ts"; 8 9const PAGE_SIZE = 100; 10const BATCH_SEND_SIZE = 50; 11const ADMIN_PASSWORD = Deno.env.get("TAP_ADMIN_PASSWORD")!; 12 13interface ClientState { 14 socket: WebSocket; 15 isPaginating: boolean; 16 queue: SelectEvent[]; 17 dids?: string[]; 18} 19 20const connectedClients = new Map<WebSocket, ClientState>(); 21 22function safeSend( 23 socket: WebSocket, 24 message: string, 25 eventCount?: number, 26): boolean { 27 try { 28 if (socket.readyState === WebSocket.OPEN) { 29 socket.send(message); 30 if (eventCount !== undefined && eventCount % 50 === 0) { 31 logger.info`📤 Sent ${eventCount} events, readyState: ${socket.readyState}`; 32 } 33 return true; 34 } else { 35 logger.error`❌ Cannot send - socket readyState: ${socket.readyState}`; 36 } 37 } catch (error) { 38 logger.error`Failed to send message: ${error}`; 39 logger.error`Socket readyState: ${socket.readyState}`; 40 } 41 return false; 42} 43 44function formatEvent(evt: SelectEvent): string { 45 const { createdAt: _createdAt, record, ...rest } = evt; 46 if (record) { 47 return JSON.stringify({ ...rest, record: JSON.parse(record) }); 48 } 49 return JSON.stringify(rest); 50} 51 52export function broadcastEvent(evt: SelectEvent) { 53 const message = formatEvent(evt); 54 55 for (const [socket, state] of connectedClients.entries()) { 56 if (socket.readyState === WebSocket.OPEN) { 57 if ( 58 state.dids && 59 state.dids.length > 0 && 60 !state.dids.includes(evt.did) 61 ) { 62 continue; // Skip events not matching the DID filter 63 } 64 65 if (state.isPaginating) { 66 state.queue.push(evt); 67 } else { 68 safeSend(socket, message); 69 } 70 } 71 } 72} 73 74Deno.serve( 75 { port: parseInt(Deno.env.get("WS_PORT") || "2481") }, 76 async (req) => { 77 if (req.method === "POST") { 78 try { 79 assureAdminAuth(ADMIN_PASSWORD, req.headers.get("authorization")!); 80 } catch { 81 logger.warn`Unauthorized access attempt ${req.headers.get("authorization")}`; 82 return new Response(null, { status: 401 }); 83 } 84 const evt = parseTapEvent(await req.json()); 85 switch (evt.type) { 86 case "identity": { 87 addToBatch({ 88 id: evt.id, 89 type: evt.type, 90 did: evt.did, 91 handle: evt.handle, 92 status: evt.status, 93 isActive: evt.isActive, 94 action: null, 95 rev: null, 96 collection: null, 97 rkey: null, 98 record: null, 99 cid: null, 100 live: null, 101 }); 102 logger.info`New identity: ${evt.did} ${evt.handle} ${evt.status}`; 103 break; 104 } 105 case "record": { 106 addToBatch({ 107 id: evt.id, 108 type: evt.type, 109 action: evt.action, 110 did: evt.did, 111 rev: evt.rev, 112 collection: evt.collection, 113 rkey: evt.rkey, 114 record: JSON.stringify(evt.record), 115 cid: evt.cid, 116 live: evt.live, 117 handle: null, 118 status: null, 119 isActive: null, 120 }); 121 const uri = `at://${evt.did}/${evt.collection}/${evt.rkey}`; 122 logger.info`New record: ${uri}`; 123 break; 124 } 125 } 126 127 return new Response(null, { status: 200 }); 128 } 129 130 if (req.headers.get("upgrade") != "websocket") { 131 return new Response(null, { status: 426 }); 132 } 133 134 const { socket, response } = Deno.upgradeWebSocket(req); 135 136 const url = new URL(req.url); 137 const didsParam = url.searchParams.get("dids"); 138 const dids = didsParam 139 ? didsParam 140 .split(",") 141 .map((d) => d.trim()) 142 .filter((d) => d.length > 0) 143 : undefined; 144 145 socket.addEventListener("open", () => { 146 logger.info`✅ Client connected! Socket state: ${socket.readyState}`; 147 if (dids && dids.length > 0) { 148 logger.info`🔍 Filtering by DIDs: ${dids.join(", ")}`; 149 } 150 151 connectedClients.set(socket, { 152 socket, 153 isPaginating: true, 154 queue: [], 155 dids, 156 }); 157 158 safeSend( 159 socket, 160 JSON.stringify({ 161 type: "connected", 162 message: "Ready to stream events", 163 }), 164 ); 165 logger.info`📤 Sent connection confirmation`; 166 167 (async () => { 168 try { 169 let page = 0; 170 let hasMore = true; 171 let totalEvents = 0; 172 173 logger.info`📖 Starting pagination...`; 174 175 try { 176 const testQuery = await ctx.db 177 .select() 178 .from(schema.events) 179 .limit(1) 180 .execute(); 181 logger.info`✅ Database test query successful, found ${testQuery.length} sample event(s)`; 182 } catch (dbError) { 183 logger.error`❌ Database test query failed: ${dbError}`; 184 throw dbError; 185 } 186 187 while (hasMore && socket.readyState === WebSocket.OPEN) { 188 let query = ctx.db.select().from(schema.events).$dynamic(); 189 190 // Apply DID filter if specified 191 if (dids && dids.length > 0) { 192 query = query.where(inArray(schema.events.did, dids)); 193 } 194 195 const events = await query 196 .orderBy(asc(schema.events.createdAt)) 197 .offset(page * PAGE_SIZE) 198 .limit(PAGE_SIZE) 199 .execute(); 200 201 if (page % 10 === 0) { 202 logger.info`📄 Fetching page ${page}... (${totalEvents} events sent so far)`; 203 } 204 205 // Batch send events for better performance 206 const batchMessages: string[] = []; 207 for (let i = 0; i < events.length; i++) { 208 const evt = events[i]; 209 210 if (socket.readyState !== WebSocket.OPEN) { 211 logger.info`⚠️ Socket closed during pagination at event ${totalEvents}`; 212 return; 213 } 214 215 batchMessages.push(formatEvent(evt)); 216 217 // Send batch when full or at end of page 218 if ( 219 batchMessages.length >= BATCH_SEND_SIZE || 220 i === events.length - 1 221 ) { 222 const batchMessage = `[${batchMessages.join(",")}]`; 223 const success = safeSend(socket, batchMessage, totalEvents); 224 225 if (success) { 226 totalEvents += batchMessages.length; 227 batchMessages.length = 0; // Clear batch 228 } else { 229 logger.error`❌ Failed to send batch at ${totalEvents}, stopping pagination`; 230 return; 231 } 232 } 233 } 234 235 hasMore = events.length === PAGE_SIZE; 236 page++; 237 238 if (hasMore && page % 5 === 0) { 239 await new Promise((resolve) => setTimeout(resolve, 20)); 240 } 241 } 242 243 logger.info`📤 Sent all historical events: ${totalEvents} total (${page} pages)`; 244 245 const clientState = connectedClients.get(socket); 246 if (clientState && socket.readyState === WebSocket.OPEN) { 247 const queuedCount = clientState.queue.length; 248 249 if (queuedCount > 0) { 250 logger.info`📦 Sending ${queuedCount} queued events...`; 251 252 // Batch send queued events 253 const queueMessages: string[] = []; 254 for (const evt of clientState.queue) { 255 if (socket.readyState !== WebSocket.OPEN) break; 256 257 queueMessages.push(formatEvent(evt)); 258 259 if (queueMessages.length >= BATCH_SEND_SIZE) { 260 safeSend(socket, `[${queueMessages.join(",")}]`); 261 queueMessages.length = 0; 262 } 263 } 264 265 if (queueMessages.length > 0) { 266 safeSend(socket, `[${queueMessages.join(",")}]`); 267 } 268 269 clientState.queue = []; 270 } 271 272 clientState.isPaginating = false; 273 logger.info`🔄 Now streaming real-time events...`; 274 } 275 } catch (error) { 276 logger.error`Pagination error: ${error}`; 277 logger.error`Stack: ${error instanceof Error ? error.stack : ""}`; 278 279 safeSend( 280 socket, 281 JSON.stringify({ 282 type: "error", 283 message: "Failed to load historical events", 284 }), 285 ); 286 287 const clientState = connectedClients.get(socket); 288 if (clientState) { 289 clientState.isPaginating = false; 290 } 291 } 292 })().catch((err) => { 293 logger.error`Unhandled error in pagination loop: ${err}`; 294 logger.error`Stack: ${err instanceof Error ? err.stack : ""}`; 295 }); 296 }); 297 298 socket.addEventListener("message", (event) => { 299 try { 300 if (event.data === "ping") { 301 safeSend(socket, "pong"); 302 } 303 } catch (error) { 304 logger.error`Error handling message: ${error}`; 305 } 306 }); 307 308 socket.addEventListener("close", (event) => { 309 const clientState = connectedClients.get(socket); 310 connectedClients.delete(socket); 311 312 logger.info`❌ Client disconnected. Code: ${event.code}, Reason: ${event.reason || "none"}, Clean: ${event.wasClean}`; 313 logger.info` Active clients: ${connectedClients.size}`; 314 315 if (clientState) { 316 logger.info` Was paginating: ${clientState.isPaginating}`; 317 logger.info` Queued events: ${clientState.queue.length}`; 318 } 319 320 if (event.code === 1006) { 321 logger.error`⚠️ Abnormal closure (1006) detected - connection dropped unexpectedly`; 322 logger.error` Possible causes:`; 323 logger.error` - Client overwhelmed with messages (try reducing PAGE_SIZE)`; 324 logger.error` - Network timeout or interruption`; 325 logger.error` - Server sent messages too fast`; 326 logger.error` - Uncaught exception in message handling`; 327 } 328 }); 329 330 socket.addEventListener("error", (error) => { 331 logger.error`❌ WebSocket error occurred`; 332 logger.error` Error: ${error}`; 333 logger.error` ReadyState: ${socket.readyState}`; 334 const clientState = connectedClients.get(socket); 335 if (clientState) { 336 logger.error` Was paginating: ${clientState.isPaginating}`; 337 logger.error` Queued events: ${clientState.queue.length}`; 338 } 339 connectedClients.delete(socket); 340 }); 341 342 return response; 343 }, 344); 345 346globalThis.addEventListener("beforeunload", () => { 347 flushBatch(); 348}); 349 350const url = `ws://localhost:${Deno.env.get("WS_PORT") || 2481}`; 351logger.info`🚀 Tap WebSocket server is running! ${url}`;