A decentralized music tracking and discovery platform built on AT Protocol 馃幍 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at setup-tracing 276 lines 9.7 kB view raw
1import chalk from "chalk"; 2import { ctx } from "context"; 3import { createHash } from "crypto"; 4import { and, eq } from "drizzle-orm"; 5import type { Context } from "hono"; 6import jwt from "jsonwebtoken"; 7import { env } from "lib/env"; 8import lovedTracks from "schema/loved-tracks"; 9import tracks from "schema/tracks"; 10import users from "schema/users"; 11import { v4 as uuidv4 } from "uuid"; 12import z from "zod"; 13 14// Define the schema for the incoming message 15const ControlMessageSchema = z.object({ 16 type: z.string(), 17 target: z.string().optional(), 18 action: z.string(), 19 args: z.any().optional(), 20 token: z.string(), 21}); 22 23type ControlMessage = z.infer<typeof ControlMessageSchema>; 24 25const RegisterDeviceSchema = z.object({ 26 type: z.literal("register"), 27 clientName: z.string(), 28 token: z.string(), 29}); 30 31type RegisterDeviceMessage = z.infer<typeof RegisterDeviceSchema>; 32 33const MessageSchema = z.object({ 34 type: z.literal("message"), 35 data: z.any(), 36 device_id: z.string(), 37 token: z.string(), 38}); 39 40type Message = z.infer<typeof MessageSchema>; 41 42const devices: Record<string, WebSocket> = {}; 43const deviceNames: Record<string, string> = {}; 44const userDevices: Record<string, string[]> = {}; 45 46function handleWebsocket(c: Context) { 47 return { 48 async onMessage(event, ws) { 49 try { 50 if (event.data === "ping") { 51 ws.send("pong"); 52 return; 53 } 54 const message = JSON.parse(event.data); 55 const controlMessage = ControlMessageSchema.safeParse(message); 56 const registerMessage = RegisterDeviceSchema.safeParse(message); 57 const deviceMessage = MessageSchema.safeParse(message); 58 59 if (deviceMessage.success) { 60 const { data, device_id, token } = deviceMessage.data; 61 const { did } = jwt.verify(token, env.JWT_SECRET, { 62 ignoreExpiration: true, 63 }); 64 // broadcast to all devices 65 userDevices[did].forEach(async (id) => { 66 const targetDevice = devices[id]; 67 if (targetDevice) { 68 // check if message is a track or a status 69 // otherwise, it's a status 70 if (data.type === "track") { 71 const sha256 = createHash("sha256") 72 .update( 73 `${data.title} - ${data.artist} - ${data.album}`.toLowerCase(), 74 ) 75 .digest("hex"); 76 const [cachedTrack, cachedLikes] = await Promise.all([ 77 ctx.redis.get(`track:${sha256}`), 78 ctx.redis.get(`likes:${did}:${sha256}`), 79 ]); 80 81 if (cachedLikes) { 82 const cachedData = JSON.parse(cachedLikes); 83 data.liked = cachedData.liked; 84 } else { 85 const [likes] = await ctx.db 86 .select() 87 .from(lovedTracks) 88 .leftJoin(tracks, eq(lovedTracks.trackId, tracks.id)) 89 .leftJoin(users, eq(lovedTracks.userId, users.id)) 90 .where(and(eq(users.did, did), eq(tracks.sha256, sha256))) 91 .execute(); 92 data.liked = likes ? true : false; 93 await ctx.redis.setEx( 94 `likes:${did}:${sha256}`, 95 2, 96 JSON.stringify({ liked: data.liked }), 97 ); 98 } 99 100 // Check if the track is cached, 101 // if not, fetch it from the database 102 // and cache it for 10 seconds 103 if (cachedTrack) { 104 const cachedData = JSON.parse(cachedTrack); 105 data.album_art = cachedData.albumArt; 106 data.song_uri = cachedData.uri; 107 data.album_uri = cachedData.albumUri; 108 data.artist_uri = cachedData.artistUri; 109 await ctx.redis.setEx( 110 `nowplaying:${did}`, 111 3, 112 JSON.stringify({ 113 ...data, 114 sha256, 115 liked: data.liked, 116 }), 117 ); 118 } else { 119 const [track] = await ctx.db 120 .select() 121 .from(tracks) 122 .where(eq(tracks.sha256, sha256)) 123 .execute(); 124 if (track) { 125 data.album_art = track.albumArt; 126 data.song_uri = track.uri; 127 data.album_uri = track.albumUri; 128 data.artist_uri = track.artistUri; 129 await Promise.all([ 130 ctx.redis.setEx( 131 `track:${sha256}`, 132 10, 133 JSON.stringify({ 134 albumArt: track.albumArt, 135 uri: track.uri, 136 albumUri: track.albumUri, 137 artistUri: track.artistUri, 138 liked: data.liked, 139 }), 140 ), 141 ctx.redis.setEx( 142 `nowplaying:${did}`, 143 3, 144 JSON.stringify({ 145 ...data, 146 sha256, 147 liked: data.liked, 148 }), 149 ), 150 ]); 151 } 152 } 153 } else { 154 await ctx.redis.setEx( 155 `nowplaying:${did}:status`, 156 3, 157 `${data.status}`, 158 ); 159 } 160 161 targetDevice.send( 162 JSON.stringify({ 163 type: "message", 164 data, 165 device_id, 166 }), 167 ); 168 } 169 }); 170 } 171 172 if (controlMessage.success) { 173 const { type, target, action, args, token } = controlMessage.data; 174 const { did } = jwt.verify(token, env.JWT_SECRET, { 175 ignoreExpiration: true, 176 }); 177 console.log( 178 `Control message: ${chalk.greenBright(type)}, ${chalk.greenBright(target)}, ${chalk.greenBright(action)}, ${chalk.greenBright(args)}, ${chalk.greenBright("***")}`, 179 ); 180 // Handle control message 181 const deviceId = userDevices[did]?.find((id) => id === target); 182 if (deviceId) { 183 const targetDevice = devices[deviceId]; 184 if (targetDevice) { 185 targetDevice.send(JSON.stringify({ type, action, args })); 186 console.log( 187 `Control message sent to device: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(target)}`, 188 ); 189 return; 190 } 191 console.error(`Device not found: ${target}`); 192 return; 193 } 194 userDevices[did]?.forEach((id) => { 195 const targetDevice = devices[id]; 196 if (targetDevice) { 197 targetDevice.send(JSON.stringify({ type, action, args })); 198 console.log( 199 `Control message sent to all devices: ${chalk.greenBright(id)}, ${chalk.greenBright(target)}`, 200 ); 201 } 202 }); 203 204 console.error(`Device ID not found for target: ${target}`); 205 return; 206 } 207 208 if (registerMessage.success) { 209 const { type, clientName, token } = registerMessage.data; 210 console.log( 211 `Register message: ${chalk.greenBright(type)}, ${chalk.greenBright(clientName)}, ${chalk.greenBright("****")}`, 212 ); 213 // Handle register Message 214 const { did } = jwt.verify(token, env.JWT_SECRET, { 215 ignoreExpiration: true, 216 }); 217 const deviceId = uuidv4(); 218 ws.deviceId = deviceId; 219 ws.did = did; 220 devices[deviceId] = ws; 221 deviceNames[deviceId] = clientName; 222 userDevices[did] = [...(userDevices[did] || []), deviceId]; 223 console.log( 224 `Device registered: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(clientName)}`, 225 ); 226 227 // broadcast to all devices 228 userDevices[did] 229 .filter((id) => id !== deviceId) 230 .forEach((id) => { 231 const targetDevice = devices[id]; 232 if (targetDevice) { 233 targetDevice.send( 234 JSON.stringify({ 235 type: "device_registered", 236 deviceId, 237 clientName, 238 }), 239 ); 240 } 241 }); 242 243 ws.send(JSON.stringify({ status: "registered", deviceId })); 244 return; 245 } 246 } catch (e) { 247 console.error("Error parsing message:", e); 248 } 249 }, 250 onClose: (_, ws) => { 251 console.log("Connection closed"); 252 // remove device from devices 253 const deviceId = ws.deviceId; 254 const did = ws.did; 255 if (deviceId && devices[deviceId]) { 256 delete devices[deviceId]; 257 console.log(`Device removed: ${chalk.redBright(deviceId)}`); 258 } 259 if (did && userDevices[did]) { 260 userDevices[did] = userDevices[did].filter((id) => id !== deviceId); 261 if (userDevices[did].length === 0) { 262 delete userDevices[did]; 263 } 264 } 265 if (deviceId && deviceNames[deviceId]) { 266 const clientName = deviceNames[deviceId]; 267 delete deviceNames[deviceId]; 268 console.log( 269 `Device name removed: ${chalk.redBright(deviceId)}, ${chalk.redBright(clientName)}`, 270 ); 271 } 272 }, 273 }; 274} 275 276export default handleWebsocket;