A decentralized music tracking and discovery platform built on AT Protocol 馃幍
listenbrainz spotify atproto lastfm musicbrainz scrobbling
at main 277 lines 9.8 kB view raw
1import chalk from "chalk"; 2import { consola } from "consola"; 3import { ctx } from "context"; 4import { and, eq } from "drizzle-orm"; 5import type { Context } from "hono"; 6import jwt from "jsonwebtoken"; 7import { env } from "lib/env"; 8import { createHash } from "node:crypto"; 9import lovedTracks from "schema/loved-tracks"; 10import tracks from "schema/tracks"; 11import users from "schema/users"; 12import { v4 as uuidv4 } from "uuid"; 13import z from "zod"; 14 15// Define the schema for the incoming message 16const ControlMessageSchema = z.object({ 17 type: z.string(), 18 target: z.string().optional(), 19 action: z.string(), 20 args: z.any().optional(), 21 token: z.string(), 22}); 23 24type ControlMessage = z.infer<typeof ControlMessageSchema>; 25 26const RegisterDeviceSchema = z.object({ 27 type: z.literal("register"), 28 clientName: z.string(), 29 token: z.string(), 30}); 31 32type RegisterDeviceMessage = z.infer<typeof RegisterDeviceSchema>; 33 34const MessageSchema = z.object({ 35 type: z.literal("message"), 36 data: z.any(), 37 device_id: z.string(), 38 token: z.string(), 39}); 40 41type Message = z.infer<typeof MessageSchema>; 42 43const devices: Record<string, WebSocket> = {}; 44const deviceNames: Record<string, string> = {}; 45const userDevices: Record<string, string[]> = {}; 46 47function handleWebsocket(c: Context) { 48 return { 49 async onMessage(event, ws) { 50 try { 51 if (event.data === "ping") { 52 ws.send("pong"); 53 return; 54 } 55 const message = JSON.parse(event.data); 56 const controlMessage = ControlMessageSchema.safeParse(message); 57 const registerMessage = RegisterDeviceSchema.safeParse(message); 58 const deviceMessage = MessageSchema.safeParse(message); 59 60 if (deviceMessage.success) { 61 const { data, device_id, token } = deviceMessage.data; 62 const { did } = jwt.verify(token, env.JWT_SECRET, { 63 ignoreExpiration: true, 64 }); 65 // broadcast to all devices 66 userDevices[did].forEach(async (id) => { 67 const targetDevice = devices[id]; 68 if (targetDevice) { 69 // check if message is a track or a status 70 // otherwise, it's a status 71 if (data.type === "track") { 72 const sha256 = createHash("sha256") 73 .update( 74 `${data.title} - ${data.artist} - ${data.album}`.toLowerCase(), 75 ) 76 .digest("hex"); 77 const [cachedTrack, cachedLikes] = await Promise.all([ 78 ctx.redis.get(`track:${sha256}`), 79 ctx.redis.get(`likes:${did}:${sha256}`), 80 ]); 81 82 if (cachedLikes) { 83 const cachedData = JSON.parse(cachedLikes); 84 data.liked = cachedData.liked; 85 } else { 86 const [likes] = await ctx.db 87 .select() 88 .from(lovedTracks) 89 .leftJoin(tracks, eq(lovedTracks.trackId, tracks.id)) 90 .leftJoin(users, eq(lovedTracks.userId, users.id)) 91 .where(and(eq(users.did, did), eq(tracks.sha256, sha256))) 92 .execute(); 93 data.liked = likes ? true : false; 94 await ctx.redis.setEx( 95 `likes:${did}:${sha256}`, 96 2, 97 JSON.stringify({ liked: data.liked }), 98 ); 99 } 100 101 // Check if the track is cached, 102 // if not, fetch it from the database 103 // and cache it for 10 seconds 104 if (cachedTrack) { 105 const cachedData = JSON.parse(cachedTrack); 106 data.album_art = cachedData.albumArt; 107 data.song_uri = cachedData.uri; 108 data.album_uri = cachedData.albumUri; 109 data.artist_uri = cachedData.artistUri; 110 await ctx.redis.setEx( 111 `nowplaying:${did}`, 112 3, 113 JSON.stringify({ 114 ...data, 115 sha256, 116 liked: data.liked, 117 }), 118 ); 119 } else { 120 const [track] = await ctx.db 121 .select() 122 .from(tracks) 123 .where(eq(tracks.sha256, sha256)) 124 .execute(); 125 if (track) { 126 data.album_art = track.albumArt; 127 data.song_uri = track.uri; 128 data.album_uri = track.albumUri; 129 data.artist_uri = track.artistUri; 130 await Promise.all([ 131 ctx.redis.setEx( 132 `track:${sha256}`, 133 10, 134 JSON.stringify({ 135 albumArt: track.albumArt, 136 uri: track.uri, 137 albumUri: track.albumUri, 138 artistUri: track.artistUri, 139 liked: data.liked, 140 }), 141 ), 142 ctx.redis.setEx( 143 `nowplaying:${did}`, 144 3, 145 JSON.stringify({ 146 ...data, 147 sha256, 148 liked: data.liked, 149 }), 150 ), 151 ]); 152 } 153 } 154 } else { 155 await ctx.redis.setEx( 156 `nowplaying:${did}:status`, 157 3, 158 `${data.status}`, 159 ); 160 } 161 162 targetDevice.send( 163 JSON.stringify({ 164 type: "message", 165 data, 166 device_id, 167 }), 168 ); 169 } 170 }); 171 } 172 173 if (controlMessage.success) { 174 const { type, target, action, args, token } = controlMessage.data; 175 const { did } = jwt.verify(token, env.JWT_SECRET, { 176 ignoreExpiration: true, 177 }); 178 consola.info( 179 `Control message: ${chalk.greenBright(type)}, ${chalk.greenBright(target)}, ${chalk.greenBright(action)}, ${chalk.greenBright(args)}, ${chalk.greenBright("***")}`, 180 ); 181 // Handle control message 182 const deviceId = userDevices[did]?.find((id) => id === target); 183 if (deviceId) { 184 const targetDevice = devices[deviceId]; 185 if (targetDevice) { 186 targetDevice.send(JSON.stringify({ type, action, args })); 187 consola.info( 188 `Control message sent to device: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(target)}`, 189 ); 190 return; 191 } 192 consola.error(`Device not found: ${target}`); 193 return; 194 } 195 userDevices[did]?.forEach((id) => { 196 const targetDevice = devices[id]; 197 if (targetDevice) { 198 targetDevice.send(JSON.stringify({ type, action, args })); 199 consola.info( 200 `Control message sent to all devices: ${chalk.greenBright(id)}, ${chalk.greenBright(target)}`, 201 ); 202 } 203 }); 204 205 consola.error(`Device ID not found for target: ${target}`); 206 return; 207 } 208 209 if (registerMessage.success) { 210 const { type, clientName, token } = registerMessage.data; 211 consola.info( 212 `Register message: ${chalk.greenBright(type)}, ${chalk.greenBright(clientName)}, ${chalk.greenBright("****")}`, 213 ); 214 // Handle register Message 215 const { did } = jwt.verify(token, env.JWT_SECRET, { 216 ignoreExpiration: true, 217 }); 218 const deviceId = uuidv4(); 219 ws.deviceId = deviceId; 220 ws.did = did; 221 devices[deviceId] = ws; 222 deviceNames[deviceId] = clientName; 223 userDevices[did] = [...(userDevices[did] || []), deviceId]; 224 consola.info( 225 `Device registered: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(clientName)}`, 226 ); 227 228 // broadcast to all devices 229 userDevices[did] 230 .filter((id) => id !== deviceId) 231 .forEach((id) => { 232 const targetDevice = devices[id]; 233 if (targetDevice) { 234 targetDevice.send( 235 JSON.stringify({ 236 type: "device_registered", 237 deviceId, 238 clientName, 239 }), 240 ); 241 } 242 }); 243 244 ws.send(JSON.stringify({ status: "registered", deviceId })); 245 return; 246 } 247 } catch (e) { 248 consola.error("Error parsing message:", e); 249 } 250 }, 251 onClose: (_, ws) => { 252 consola.info("Connection closed"); 253 // remove device from devices 254 const deviceId = ws.deviceId; 255 const did = ws.did; 256 if (deviceId && devices[deviceId]) { 257 delete devices[deviceId]; 258 consola.info(`Device removed: ${chalk.redBright(deviceId)}`); 259 } 260 if (did && userDevices[did]) { 261 userDevices[did] = userDevices[did].filter((id) => id !== deviceId); 262 if (userDevices[did].length === 0) { 263 delete userDevices[did]; 264 } 265 } 266 if (deviceId && deviceNames[deviceId]) { 267 const clientName = deviceNames[deviceId]; 268 delete deviceNames[deviceId]; 269 consola.info( 270 `Device name removed: ${chalk.redBright(deviceId)}, ${chalk.redBright(clientName)}`, 271 ); 272 } 273 }, 274 }; 275} 276 277export default handleWebsocket;