A decentralized music tracking and discovery platform built on AT Protocol 馃幍
listenbrainz
spotify
atproto
lastfm
musicbrainz
scrobbling
1import chalk from "chalk";
2import { consola } from "consola";
3import { ctx } from "context";
4import { and, eq } from "drizzle-orm";
5import type { Context } from "hono";
6import jwt from "jsonwebtoken";
7import { env } from "lib/env";
8import { createHash } from "node:crypto";
9import lovedTracks from "schema/loved-tracks";
10import tracks from "schema/tracks";
11import users from "schema/users";
12import { v4 as uuidv4 } from "uuid";
13import z from "zod";
14
15// Define the schema for the incoming message
16const ControlMessageSchema = z.object({
17 type: z.string(),
18 target: z.string().optional(),
19 action: z.string(),
20 args: z.any().optional(),
21 token: z.string(),
22});
23
24type ControlMessage = z.infer<typeof ControlMessageSchema>;
25
26const RegisterDeviceSchema = z.object({
27 type: z.literal("register"),
28 clientName: z.string(),
29 token: z.string(),
30});
31
32type RegisterDeviceMessage = z.infer<typeof RegisterDeviceSchema>;
33
34const MessageSchema = z.object({
35 type: z.literal("message"),
36 data: z.any(),
37 device_id: z.string(),
38 token: z.string(),
39});
40
41type Message = z.infer<typeof MessageSchema>;
42
43const devices: Record<string, WebSocket> = {};
44const deviceNames: Record<string, string> = {};
45const userDevices: Record<string, string[]> = {};
46
47function handleWebsocket(c: Context) {
48 return {
49 async onMessage(event, ws) {
50 try {
51 if (event.data === "ping") {
52 ws.send("pong");
53 return;
54 }
55 const message = JSON.parse(event.data);
56 const controlMessage = ControlMessageSchema.safeParse(message);
57 const registerMessage = RegisterDeviceSchema.safeParse(message);
58 const deviceMessage = MessageSchema.safeParse(message);
59
60 if (deviceMessage.success) {
61 const { data, device_id, token } = deviceMessage.data;
62 const { did } = jwt.verify(token, env.JWT_SECRET, {
63 ignoreExpiration: true,
64 });
65 // broadcast to all devices
66 userDevices[did].forEach(async (id) => {
67 const targetDevice = devices[id];
68 if (targetDevice) {
69 // check if message is a track or a status
70 // otherwise, it's a status
71 if (data.type === "track") {
72 const sha256 = createHash("sha256")
73 .update(
74 `${data.title} - ${data.artist} - ${data.album}`.toLowerCase(),
75 )
76 .digest("hex");
77 const [cachedTrack, cachedLikes] = await Promise.all([
78 ctx.redis.get(`track:${sha256}`),
79 ctx.redis.get(`likes:${did}:${sha256}`),
80 ]);
81
82 if (cachedLikes) {
83 const cachedData = JSON.parse(cachedLikes);
84 data.liked = cachedData.liked;
85 } else {
86 const [likes] = await ctx.db
87 .select()
88 .from(lovedTracks)
89 .leftJoin(tracks, eq(lovedTracks.trackId, tracks.id))
90 .leftJoin(users, eq(lovedTracks.userId, users.id))
91 .where(and(eq(users.did, did), eq(tracks.sha256, sha256)))
92 .execute();
93 data.liked = likes ? true : false;
94 await ctx.redis.setEx(
95 `likes:${did}:${sha256}`,
96 2,
97 JSON.stringify({ liked: data.liked }),
98 );
99 }
100
101 // Check if the track is cached,
102 // if not, fetch it from the database
103 // and cache it for 10 seconds
104 if (cachedTrack) {
105 const cachedData = JSON.parse(cachedTrack);
106 data.album_art = cachedData.albumArt;
107 data.song_uri = cachedData.uri;
108 data.album_uri = cachedData.albumUri;
109 data.artist_uri = cachedData.artistUri;
110 await ctx.redis.setEx(
111 `nowplaying:${did}`,
112 3,
113 JSON.stringify({
114 ...data,
115 sha256,
116 liked: data.liked,
117 }),
118 );
119 } else {
120 const [track] = await ctx.db
121 .select()
122 .from(tracks)
123 .where(eq(tracks.sha256, sha256))
124 .execute();
125 if (track) {
126 data.album_art = track.albumArt;
127 data.song_uri = track.uri;
128 data.album_uri = track.albumUri;
129 data.artist_uri = track.artistUri;
130 await Promise.all([
131 ctx.redis.setEx(
132 `track:${sha256}`,
133 10,
134 JSON.stringify({
135 albumArt: track.albumArt,
136 uri: track.uri,
137 albumUri: track.albumUri,
138 artistUri: track.artistUri,
139 liked: data.liked,
140 }),
141 ),
142 ctx.redis.setEx(
143 `nowplaying:${did}`,
144 3,
145 JSON.stringify({
146 ...data,
147 sha256,
148 liked: data.liked,
149 }),
150 ),
151 ]);
152 }
153 }
154 } else {
155 await ctx.redis.setEx(
156 `nowplaying:${did}:status`,
157 3,
158 `${data.status}`,
159 );
160 }
161
162 targetDevice.send(
163 JSON.stringify({
164 type: "message",
165 data,
166 device_id,
167 }),
168 );
169 }
170 });
171 }
172
173 if (controlMessage.success) {
174 const { type, target, action, args, token } = controlMessage.data;
175 const { did } = jwt.verify(token, env.JWT_SECRET, {
176 ignoreExpiration: true,
177 });
178 consola.info(
179 `Control message: ${chalk.greenBright(type)}, ${chalk.greenBright(target)}, ${chalk.greenBright(action)}, ${chalk.greenBright(args)}, ${chalk.greenBright("***")}`,
180 );
181 // Handle control message
182 const deviceId = userDevices[did]?.find((id) => id === target);
183 if (deviceId) {
184 const targetDevice = devices[deviceId];
185 if (targetDevice) {
186 targetDevice.send(JSON.stringify({ type, action, args }));
187 consola.info(
188 `Control message sent to device: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(target)}`,
189 );
190 return;
191 }
192 consola.error(`Device not found: ${target}`);
193 return;
194 }
195 userDevices[did]?.forEach((id) => {
196 const targetDevice = devices[id];
197 if (targetDevice) {
198 targetDevice.send(JSON.stringify({ type, action, args }));
199 consola.info(
200 `Control message sent to all devices: ${chalk.greenBright(id)}, ${chalk.greenBright(target)}`,
201 );
202 }
203 });
204
205 consola.error(`Device ID not found for target: ${target}`);
206 return;
207 }
208
209 if (registerMessage.success) {
210 const { type, clientName, token } = registerMessage.data;
211 consola.info(
212 `Register message: ${chalk.greenBright(type)}, ${chalk.greenBright(clientName)}, ${chalk.greenBright("****")}`,
213 );
214 // Handle register Message
215 const { did } = jwt.verify(token, env.JWT_SECRET, {
216 ignoreExpiration: true,
217 });
218 const deviceId = uuidv4();
219 ws.deviceId = deviceId;
220 ws.did = did;
221 devices[deviceId] = ws;
222 deviceNames[deviceId] = clientName;
223 userDevices[did] = [...(userDevices[did] || []), deviceId];
224 consola.info(
225 `Device registered: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(clientName)}`,
226 );
227
228 // broadcast to all devices
229 userDevices[did]
230 .filter((id) => id !== deviceId)
231 .forEach((id) => {
232 const targetDevice = devices[id];
233 if (targetDevice) {
234 targetDevice.send(
235 JSON.stringify({
236 type: "device_registered",
237 deviceId,
238 clientName,
239 }),
240 );
241 }
242 });
243
244 ws.send(JSON.stringify({ status: "registered", deviceId }));
245 return;
246 }
247 } catch (e) {
248 consola.error("Error parsing message:", e);
249 }
250 },
251 onClose: (_, ws) => {
252 consola.info("Connection closed");
253 // remove device from devices
254 const deviceId = ws.deviceId;
255 const did = ws.did;
256 if (deviceId && devices[deviceId]) {
257 delete devices[deviceId];
258 consola.info(`Device removed: ${chalk.redBright(deviceId)}`);
259 }
260 if (did && userDevices[did]) {
261 userDevices[did] = userDevices[did].filter((id) => id !== deviceId);
262 if (userDevices[did].length === 0) {
263 delete userDevices[did];
264 }
265 }
266 if (deviceId && deviceNames[deviceId]) {
267 const clientName = deviceNames[deviceId];
268 delete deviceNames[deviceId];
269 consola.info(
270 `Device name removed: ${chalk.redBright(deviceId)}, ${chalk.redBright(clientName)}`,
271 );
272 }
273 },
274 };
275}
276
277export default handleWebsocket;