A decentralized music tracking and discovery platform built on AT Protocol 馃幍
rocksky.app
spotify
atproto
lastfm
musicbrainz
scrobbling
listenbrainz
1import chalk from "chalk";
2import { ctx } from "context";
3import { createHash } from "crypto";
4import { and, eq } from "drizzle-orm";
5import type { Context } from "hono";
6import jwt from "jsonwebtoken";
7import { env } from "lib/env";
8import lovedTracks from "schema/loved-tracks";
9import tracks from "schema/tracks";
10import users from "schema/users";
11import { v4 as uuidv4 } from "uuid";
12import z from "zod";
13
14// Define the schema for the incoming message
15const ControlMessageSchema = z.object({
16 type: z.string(),
17 target: z.string().optional(),
18 action: z.string(),
19 args: z.any().optional(),
20 token: z.string(),
21});
22
23type ControlMessage = z.infer<typeof ControlMessageSchema>;
24
25const RegisterDeviceSchema = z.object({
26 type: z.literal("register"),
27 clientName: z.string(),
28 token: z.string(),
29});
30
31type RegisterDeviceMessage = z.infer<typeof RegisterDeviceSchema>;
32
33const MessageSchema = z.object({
34 type: z.literal("message"),
35 data: z.any(),
36 device_id: z.string(),
37 token: z.string(),
38});
39
40type Message = z.infer<typeof MessageSchema>;
41
42const devices: Record<string, WebSocket> = {};
43const deviceNames: Record<string, string> = {};
44const userDevices: Record<string, string[]> = {};
45
46function handleWebsocket(c: Context) {
47 return {
48 async onMessage(event, ws) {
49 try {
50 if (event.data === "ping") {
51 ws.send("pong");
52 return;
53 }
54 const message = JSON.parse(event.data);
55 const controlMessage = ControlMessageSchema.safeParse(message);
56 const registerMessage = RegisterDeviceSchema.safeParse(message);
57 const deviceMessage = MessageSchema.safeParse(message);
58
59 if (deviceMessage.success) {
60 const { data, device_id, token } = deviceMessage.data;
61 const { did } = jwt.verify(token, env.JWT_SECRET, {
62 ignoreExpiration: true,
63 });
64 // broadcast to all devices
65 userDevices[did].forEach(async (id) => {
66 const targetDevice = devices[id];
67 if (targetDevice) {
68 // check if message is a track or a status
69 // otherwise, it's a status
70 if (data.type === "track") {
71 const sha256 = createHash("sha256")
72 .update(
73 `${data.title} - ${data.artist} - ${data.album}`.toLowerCase(),
74 )
75 .digest("hex");
76 const [cachedTrack, cachedLikes] = await Promise.all([
77 ctx.redis.get(`track:${sha256}`),
78 ctx.redis.get(`likes:${did}:${sha256}`),
79 ]);
80
81 if (cachedLikes) {
82 const cachedData = JSON.parse(cachedLikes);
83 data.liked = cachedData.liked;
84 } else {
85 const [likes] = await ctx.db
86 .select()
87 .from(lovedTracks)
88 .leftJoin(tracks, eq(lovedTracks.trackId, tracks.id))
89 .leftJoin(users, eq(lovedTracks.userId, users.id))
90 .where(and(eq(users.did, did), eq(tracks.sha256, sha256)))
91 .execute();
92 data.liked = likes ? true : false;
93 await ctx.redis.setEx(
94 `likes:${did}:${sha256}`,
95 2,
96 JSON.stringify({ liked: data.liked }),
97 );
98 }
99
100 // Check if the track is cached,
101 // if not, fetch it from the database
102 // and cache it for 10 seconds
103 if (cachedTrack) {
104 const cachedData = JSON.parse(cachedTrack);
105 data.album_art = cachedData.albumArt;
106 data.song_uri = cachedData.uri;
107 data.album_uri = cachedData.albumUri;
108 data.artist_uri = cachedData.artistUri;
109 await ctx.redis.setEx(
110 `nowplaying:${did}`,
111 3,
112 JSON.stringify({
113 ...data,
114 sha256,
115 liked: data.liked,
116 }),
117 );
118 } else {
119 const [track] = await ctx.db
120 .select()
121 .from(tracks)
122 .where(eq(tracks.sha256, sha256))
123 .execute();
124 if (track) {
125 data.album_art = track.albumArt;
126 data.song_uri = track.uri;
127 data.album_uri = track.albumUri;
128 data.artist_uri = track.artistUri;
129 await Promise.all([
130 ctx.redis.setEx(
131 `track:${sha256}`,
132 10,
133 JSON.stringify({
134 albumArt: track.albumArt,
135 uri: track.uri,
136 albumUri: track.albumUri,
137 artistUri: track.artistUri,
138 liked: data.liked,
139 }),
140 ),
141 ctx.redis.setEx(
142 `nowplaying:${did}`,
143 3,
144 JSON.stringify({
145 ...data,
146 sha256,
147 liked: data.liked,
148 }),
149 ),
150 ]);
151 }
152 }
153 } else {
154 await ctx.redis.setEx(
155 `nowplaying:${did}:status`,
156 3,
157 `${data.status}`,
158 );
159 }
160
161 targetDevice.send(
162 JSON.stringify({
163 type: "message",
164 data,
165 device_id,
166 }),
167 );
168 }
169 });
170 }
171
172 if (controlMessage.success) {
173 const { type, target, action, args, token } = controlMessage.data;
174 const { did } = jwt.verify(token, env.JWT_SECRET, {
175 ignoreExpiration: true,
176 });
177 console.log(
178 `Control message: ${chalk.greenBright(type)}, ${chalk.greenBright(target)}, ${chalk.greenBright(action)}, ${chalk.greenBright(args)}, ${chalk.greenBright("***")}`,
179 );
180 // Handle control message
181 const deviceId = userDevices[did]?.find((id) => id === target);
182 if (deviceId) {
183 const targetDevice = devices[deviceId];
184 if (targetDevice) {
185 targetDevice.send(JSON.stringify({ type, action, args }));
186 console.log(
187 `Control message sent to device: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(target)}`,
188 );
189 return;
190 }
191 console.error(`Device not found: ${target}`);
192 return;
193 }
194 userDevices[did]?.forEach((id) => {
195 const targetDevice = devices[id];
196 if (targetDevice) {
197 targetDevice.send(JSON.stringify({ type, action, args }));
198 console.log(
199 `Control message sent to all devices: ${chalk.greenBright(id)}, ${chalk.greenBright(target)}`,
200 );
201 }
202 });
203
204 console.error(`Device ID not found for target: ${target}`);
205 return;
206 }
207
208 if (registerMessage.success) {
209 const { type, clientName, token } = registerMessage.data;
210 console.log(
211 `Register message: ${chalk.greenBright(type)}, ${chalk.greenBright(clientName)}, ${chalk.greenBright("****")}`,
212 );
213 // Handle register Message
214 const { did } = jwt.verify(token, env.JWT_SECRET, {
215 ignoreExpiration: true,
216 });
217 const deviceId = uuidv4();
218 ws.deviceId = deviceId;
219 ws.did = did;
220 devices[deviceId] = ws;
221 deviceNames[deviceId] = clientName;
222 userDevices[did] = [...(userDevices[did] || []), deviceId];
223 console.log(
224 `Device registered: ${chalk.greenBright(deviceId)}, ${chalk.greenBright(clientName)}`,
225 );
226
227 // broadcast to all devices
228 userDevices[did]
229 .filter((id) => id !== deviceId)
230 .forEach((id) => {
231 const targetDevice = devices[id];
232 if (targetDevice) {
233 targetDevice.send(
234 JSON.stringify({
235 type: "device_registered",
236 deviceId,
237 clientName,
238 }),
239 );
240 }
241 });
242
243 ws.send(JSON.stringify({ status: "registered", deviceId }));
244 return;
245 }
246 } catch (e) {
247 console.error("Error parsing message:", e);
248 }
249 },
250 onClose: (_, ws) => {
251 console.log("Connection closed");
252 // remove device from devices
253 const deviceId = ws.deviceId;
254 const did = ws.did;
255 if (deviceId && devices[deviceId]) {
256 delete devices[deviceId];
257 console.log(`Device removed: ${chalk.redBright(deviceId)}`);
258 }
259 if (did && userDevices[did]) {
260 userDevices[did] = userDevices[did].filter((id) => id !== deviceId);
261 if (userDevices[did].length === 0) {
262 delete userDevices[did];
263 }
264 }
265 if (deviceId && deviceNames[deviceId]) {
266 const clientName = deviceNames[deviceId];
267 delete deviceNames[deviceId];
268 console.log(
269 `Device name removed: ${chalk.redBright(deviceId)}, ${chalk.redBright(clientName)}`,
270 );
271 }
272 },
273 };
274}
275
276export default handleWebsocket;