forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import { MatchTrackResult } from "lib/matchTrack";
2import { logger } from "logger";
3import dayjs from "dayjs";
4import { createAgent } from "lib/agent";
5import { getDidAndHandle } from "lib/getDidAndHandle";
6import { ctx } from "context";
7import schema from "schema";
8import { and, eq, gte, lte, or, sql } from "drizzle-orm";
9import os from "node:os";
10import path from "node:path";
11import fs from "node:fs";
12import chalk from "chalk";
13import * as Album from "lexicon/types/app/rocksky/album";
14import * as Artist from "lexicon/types/app/rocksky/artist";
15import * as Scrobble from "lexicon/types/app/rocksky/scrobble";
16import * as Song from "lexicon/types/app/rocksky/song";
17import { TID } from "@atproto/common";
18import { Agent } from "@atproto/api";
19import { createUser, subscribeToJetstream, sync } from "cmd/sync";
20import _ from "lodash";
21
22export async function publishScrobble(
23 track: MatchTrackResult,
24 timestamp?: number,
25 dryRun?: boolean,
26) {
27 const [did, handle] = await getDidAndHandle();
28 const agent: Agent = await createAgent(did, handle);
29 const recentScrobble = await getRecentScrobble(did, track, timestamp);
30 const user = await createUser(agent, did, handle);
31 await subscribeToJetstream(user);
32
33 const lockFilePath = path.join(os.tmpdir(), `rocksky-${did}.lock`);
34
35 if (fs.existsSync(lockFilePath)) {
36 logger.error(
37 `${chalk.greenBright(handle)} Scrobble publishing failed: lock file exists, maybe rocksky-cli is still syncing?\nPlease wait for rocksky to finish syncing before publishing scrobbles or delete the lock file manually ${chalk.greenBright(lockFilePath)}`,
38 );
39 return false;
40 }
41
42 if (recentScrobble) {
43 logger.info`${handle} Skipping scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")} (already scrobbled)`;
44 return true;
45 }
46
47 const totalScrobbles = await countScrobbles(did);
48 if (totalScrobbles === 0) {
49 logger.warn`${handle} No scrobbles found for this user. Are you sure you have successfully synced your scrobbles locally?\nIf not, please run ${"rocksky sync"} to sync your scrobbles before publishing scrobbles.`;
50 }
51
52 logger.info`${handle} Publishing scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")}`;
53
54 if (await shouldSync(agent)) {
55 logger.info`${handle} Syncing scrobbles before publishing`;
56 await sync();
57 } else {
58 logger.info`${handle} Local scrobbles are up-to-date, skipping sync`;
59 }
60
61 if (dryRun) {
62 logger.info`${handle} Dry run: Skipping publishing scrobble for ${track.title} by ${track.artist} at ${timestamp ? dayjs.unix(timestamp).format("YYYY-MM-DD HH:mm:ss") : dayjs().format("YYYY-MM-DD HH:mm:ss")}`;
63 return true;
64 }
65
66 const existingTrack = await ctx.db
67 .select()
68 .from(schema.tracks)
69 .where(
70 or(
71 and(
72 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`,
73 sql`LOWER(${schema.tracks.artist}) = LOWER(${track.artist})`,
74 ),
75 and(
76 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`,
77 sql`LOWER(${schema.tracks.albumArtist}) = LOWER(${track.artist})`,
78 ),
79 and(
80 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`,
81 sql`LOWER(${schema.tracks.albumArtist}) = LOWER(${track.albumArtist})`,
82 ),
83 ),
84 )
85 .limit(1)
86 .execute()
87 .then((rows) => rows[0]);
88
89 if (!existingTrack) {
90 await putSongRecord(agent, track);
91 }
92
93 const existingArtist = await ctx.db
94 .select()
95 .from(schema.artists)
96 .where(
97 or(
98 sql`LOWER(${schema.artists.name}) = LOWER(${track.artist})`,
99 sql`LOWER(${schema.artists.name}) = LOWER(${track.albumArtist})`,
100 ),
101 )
102 .limit(1)
103 .execute()
104 .then((rows) => rows[0]);
105
106 if (!existingArtist) {
107 await putArtistRecord(agent, track);
108 }
109
110 const existingAlbum = await ctx.db
111 .select()
112 .from(schema.albums)
113 .where(
114 and(
115 sql`LOWER(${schema.albums.title}) = LOWER(${track.album})`,
116 sql`LOWER(${schema.albums.artist}) = LOWER(${track.albumArtist})`,
117 ),
118 )
119 .limit(1)
120 .execute()
121 .then((rows) => rows[0]);
122
123 if (!existingAlbum) {
124 await putAlbumRecord(agent, track);
125 }
126
127 const scrobbleUri = await putScrobbleRecord(agent, track, timestamp);
128
129 // wait for the scrobble to be published
130 if (scrobbleUri) {
131 const MAX_ATTEMPTS = 40;
132 let attempts = 0;
133 do {
134 const count = await ctx.db
135 .select({
136 count: sql`COUNT(*)`,
137 })
138 .from(schema.scrobbles)
139 .where(eq(schema.scrobbles.uri, scrobbleUri))
140 .execute()
141 .then((rows) => _.get(rows, "[0].count", 0) as number);
142
143 if (count > 0 || attempts >= MAX_ATTEMPTS) {
144 if (attempts == MAX_ATTEMPTS) {
145 logger.error`Failed to detect published scrobble after ${MAX_ATTEMPTS} attempts`;
146 }
147 break;
148 }
149
150 await new Promise((resolve) => setTimeout(resolve, 600));
151 attempts += 1;
152 } while (true);
153 }
154
155 return true;
156}
157
158async function getRecentScrobble(
159 did: string,
160 track: MatchTrackResult,
161 timestamp?: number,
162) {
163 const scrobbleTime = dayjs.unix(timestamp || dayjs().unix());
164 return ctx.db
165 .select({
166 scrobble: schema.scrobbles,
167 user: schema.users,
168 track: schema.tracks,
169 })
170 .from(schema.scrobbles)
171 .innerJoin(schema.users, eq(schema.scrobbles.userId, schema.users.id))
172 .innerJoin(schema.tracks, eq(schema.scrobbles.trackId, schema.tracks.id))
173 .where(
174 and(
175 eq(schema.users.did, did),
176 sql`LOWER(${schema.tracks.title}) = LOWER(${track.title})`,
177 sql`LOWER(${schema.tracks.artist}) = LOWER(${track.artist})`,
178 gte(
179 schema.scrobbles.timestamp,
180 scrobbleTime.subtract(60, "seconds").toDate(),
181 ),
182 lte(
183 schema.scrobbles.timestamp,
184 scrobbleTime.add(60, "seconds").toDate(),
185 ),
186 ),
187 )
188 .limit(1)
189 .then((rows) => rows[0]);
190}
191
192async function countScrobbles(did: string): Promise<number> {
193 return ctx.db
194 .select({ count: sql<number>`count(*)` })
195 .from(schema.scrobbles)
196 .innerJoin(schema.users, eq(schema.scrobbles.userId, schema.users.id))
197 .where(eq(schema.users.did, did))
198 .then((rows) => rows[0].count);
199}
200
201async function putSongRecord(agent: Agent, track: MatchTrackResult) {
202 const rkey = TID.nextStr();
203
204 const record: Song.Record = {
205 $type: "app.rocksky.song",
206 title: track.title,
207 artist: track.artist,
208 artists: track.mbArtists === null ? undefined : track.mbArtists,
209 album: track.album,
210 albumArtist: track.albumArtist,
211 duration: track.duration,
212 releaseDate: track.releaseDate
213 ? new Date(track.releaseDate).toISOString()
214 : undefined,
215 year: track.year === null ? undefined : track.year,
216 albumArtUrl: track.albumArt,
217 composer: track.composer ? track.composer : undefined,
218 lyrics: track.lyrics ? track.lyrics : undefined,
219 trackNumber: track.trackNumber,
220 discNumber: track.discNumber === 0 ? 1 : track.discNumber,
221 copyrightMessage: track.copyrightMessage
222 ? track.copyrightMessage
223 : undefined,
224 createdAt: new Date().toISOString(),
225 spotifyLink: track.spotifyLink ? track.spotifyLink : undefined,
226 tags: track.genres || [],
227 mbid: track.mbId,
228 };
229
230 if (!Song.validateRecord(record).success) {
231 logger.info`${Song.validateRecord(record)}`;
232 logger.info`${record}`;
233 throw new Error("Invalid Song record");
234 }
235
236 try {
237 const res = await agent.com.atproto.repo.putRecord({
238 repo: agent.assertDid,
239 collection: "app.rocksky.song",
240 rkey,
241 record,
242 validate: false,
243 });
244 const uri = res.data.uri;
245 logger.info`Song record created at ${uri}`;
246 return uri;
247 } catch (e) {
248 logger.error`Error creating song record: ${e}`;
249 return null;
250 }
251}
252
253async function putArtistRecord(agent: Agent, track: MatchTrackResult) {
254 const rkey = TID.nextStr();
255 const record: Artist.Record = {
256 $type: "app.rocksky.artist",
257 name: track.albumArtist,
258 createdAt: new Date().toISOString(),
259 pictureUrl: track.artistPicture || undefined,
260 tags: track.genres || [],
261 };
262
263 if (!Artist.validateRecord(record).success) {
264 logger.info`${Artist.validateRecord(record)}`;
265 logger.info`${record}`;
266 throw new Error("Invalid Artist record");
267 }
268
269 try {
270 const res = await agent.com.atproto.repo.putRecord({
271 repo: agent.assertDid,
272 collection: "app.rocksky.artist",
273 rkey,
274 record,
275 validate: false,
276 });
277 const uri = res.data.uri;
278 logger.info`Artist record created at ${uri}`;
279 return uri;
280 } catch (e) {
281 logger.error`Error creating artist record: ${e}`;
282 return null;
283 }
284}
285
286async function putAlbumRecord(agent: Agent, track: MatchTrackResult) {
287 const rkey = TID.nextStr();
288
289 const record = {
290 $type: "app.rocksky.album",
291 title: track.album,
292 artist: track.albumArtist,
293 year: track.year === null ? undefined : track.year,
294 releaseDate: track.releaseDate
295 ? new Date(track.releaseDate).toISOString()
296 : undefined,
297 createdAt: new Date().toISOString(),
298 albumArtUrl: track.albumArt,
299 };
300
301 if (!Album.validateRecord(record).success) {
302 logger.info`${Album.validateRecord(record)}`;
303 logger.info`${record}`;
304 throw new Error("Invalid Album record");
305 }
306
307 try {
308 const res = await agent.com.atproto.repo.putRecord({
309 repo: agent.assertDid,
310 collection: "app.rocksky.album",
311 rkey,
312 record,
313 validate: false,
314 });
315 const uri = res.data.uri;
316 logger.info`Album record created at ${uri}`;
317 return uri;
318 } catch (e) {
319 logger.error`Error creating album record: ${e}`;
320 return null;
321 }
322}
323
324async function putScrobbleRecord(
325 agent: Agent,
326 track: MatchTrackResult,
327 timestamp?: number,
328) {
329 const rkey = TID.nextStr();
330
331 const record: Scrobble.Record = {
332 $type: "app.rocksky.scrobble",
333 title: track.title,
334 albumArtist: track.albumArtist,
335 albumArtUrl: track.albumArt,
336 artist: track.artist,
337 artists: track.mbArtists === null ? undefined : track.mbArtists,
338 album: track.album,
339 duration: track.duration,
340 trackNumber: track.trackNumber,
341 discNumber: track.discNumber === 0 ? 1 : track.discNumber,
342 releaseDate: track.releaseDate
343 ? new Date(track.releaseDate).toISOString()
344 : undefined,
345 year: track.year === null ? undefined : track.year,
346 composer: track.composer ? track.composer : undefined,
347 lyrics: track.lyrics ? track.lyrics : undefined,
348 copyrightMessage: track.copyrightMessage
349 ? track.copyrightMessage
350 : undefined,
351 createdAt: timestamp
352 ? dayjs.unix(timestamp).toISOString()
353 : new Date().toISOString(),
354 spotifyLink: track.spotifyLink ? track.spotifyLink : undefined,
355 tags: track.genres || [],
356 mbid: track.mbId,
357 };
358
359 if (!Scrobble.validateRecord(record).success) {
360 logger.info`${Scrobble.validateRecord(record)}`;
361 logger.info`${record}`;
362 throw new Error("Invalid Scrobble record");
363 }
364
365 try {
366 const res = await agent.com.atproto.repo.putRecord({
367 repo: agent.assertDid,
368 collection: "app.rocksky.scrobble",
369 rkey,
370 record,
371 validate: false,
372 });
373 const uri = res.data.uri;
374 logger.info`Scrobble record created at ${uri}`;
375 return uri;
376 } catch (e) {
377 logger.error`Error creating scrobble record: ${e}`;
378 return null;
379 }
380}
381
382async function shouldSync(agent: Agent): Promise<boolean> {
383 const res = await agent.com.atproto.repo.listRecords({
384 repo: agent.assertDid,
385 collection: "app.rocksky.scrobble",
386 limit: 1,
387 });
388
389 const records = res.data.records as Array<{
390 uri: string;
391 cid: string;
392 value: Scrobble.Record;
393 }>;
394
395 if (!records.length) {
396 logger.info`No scrobble records found`;
397 return true;
398 }
399
400 const { count } = await ctx.db
401 .select({
402 count: sql<number>`count(*)`,
403 })
404 .from(schema.scrobbles)
405 .where(eq(schema.scrobbles.cid, records[0].cid))
406 .execute()
407 .then((result) => result[0]);
408
409 return count === 0;
410}