A decentralized music tracking and discovery platform built on AT Protocol 馃幍
listenbrainz
spotify
atproto
lastfm
musicbrainz
scrobbling
1import chalk from "chalk";
2import { consola } from "consola";
3import { ctx } from "context";
4import { desc, eq, or } from "drizzle-orm";
5import { createHash } from "node:crypto";
6import { publishScrobble } from "nowplaying/nowplaying.service";
7import albums from "../schema/albums";
8import artists from "../schema/artists";
9import scrobbles from "../schema/scrobbles";
10import tracks from "../schema/tracks";
11import users from "../schema/users";
12
13const args = process.argv.slice(2);
14
15export async function updateUris(did: string) {
16 // Get scrobbles with track and user data
17 const records = await ctx.db
18 .select({
19 track: tracks,
20 user: users,
21 })
22 .from(scrobbles)
23 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id))
24 .innerJoin(users, eq(scrobbles.userId, users.id))
25 .where(or(eq(users.did, did), eq(users.handle, did)))
26 .orderBy(desc(scrobbles.createdAt))
27 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20);
28
29 for (const { track } of records) {
30 const trackHash = createHash("sha256")
31 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase())
32 .digest("hex");
33
34 const existingTrack = await ctx.db
35 .select()
36 .from(tracks)
37 .where(eq(tracks.sha256, trackHash))
38 .limit(1)
39 .then((rows) => rows[0]);
40
41 if (existingTrack && !existingTrack.albumUri) {
42 consola.info(`Updating album uri for ${chalk.cyan(track.id)} ...`);
43
44 const albumHash = createHash("sha256")
45 .update(`${track.album} - ${track.albumArtist}`.toLowerCase())
46 .digest("hex");
47
48 const album = await ctx.db
49 .select()
50 .from(albums)
51 .where(eq(albums.sha256, albumHash))
52 .limit(1)
53 .then((rows) => rows[0]);
54
55 if (album) {
56 await ctx.db
57 .update(tracks)
58 .set({ albumUri: album.uri })
59 .where(eq(tracks.id, existingTrack.id));
60 }
61 }
62
63 if (existingTrack && !existingTrack.artistUri) {
64 consola.info(`Updating artist uri for ${chalk.cyan(track.id)} ...`);
65
66 const artistHash = createHash("sha256")
67 .update(track.albumArtist.toLowerCase())
68 .digest("hex");
69
70 const artist = await ctx.db
71 .select()
72 .from(artists)
73 .where(eq(artists.sha256, artistHash))
74 .limit(1)
75 .then((rows) => rows[0]);
76
77 if (artist) {
78 await ctx.db
79 .update(tracks)
80 .set({ artistUri: artist.uri })
81 .where(eq(tracks.id, existingTrack.id));
82 }
83 }
84
85 const albumHash = createHash("sha256")
86 .update(`${track.album} - ${track.albumArtist}`.toLowerCase())
87 .digest("hex");
88
89 const album = await ctx.db
90 .select()
91 .from(albums)
92 .where(eq(albums.sha256, albumHash))
93 .limit(1)
94 .then((rows) => rows[0]);
95
96 if (existingTrack && album && !album.artistUri) {
97 consola.info(`Updating artist uri for ${chalk.cyan(album.id)} ...`);
98
99 const artistHash = createHash("sha256")
100 .update(track.albumArtist.toLowerCase())
101 .digest("hex");
102
103 const artist = await ctx.db
104 .select()
105 .from(artists)
106 .where(eq(artists.sha256, artistHash))
107 .limit(1)
108 .then((rows) => rows[0]);
109
110 if (artist) {
111 await ctx.db
112 .update(albums)
113 .set({ artistUri: artist.uri })
114 .where(eq(albums.id, album.id));
115 }
116 }
117 }
118}
119
120if (args.includes("--background")) {
121 consola.info("Wait for new scrobbles to sync ...");
122 const sub = ctx.nc.subscribe("rocksky.user.scrobble.sync");
123 for await (const m of sub) {
124 const did = new TextDecoder().decode(m.data);
125 // wait for 15 seconds to ensure the scrobble is fully created
126 await new Promise((resolve) => setTimeout(resolve, 15000));
127 consola.info(`Syncing scrobbles ${chalk.magenta(did)} ...`);
128 await updateUris(did);
129
130 const records = await ctx.db
131 .select({
132 scrobble: scrobbles,
133 })
134 .from(scrobbles)
135 .innerJoin(users, eq(scrobbles.userId, users.id))
136 .where(or(eq(users.did, did), eq(users.handle, did)))
137 .orderBy(desc(scrobbles.createdAt))
138 .limit(5);
139
140 for (const { scrobble } of records) {
141 consola.info(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`);
142 try {
143 await publishScrobble(ctx, scrobble.id);
144 } catch (err) {
145 consola.error(
146 `Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`,
147 err,
148 );
149 }
150 }
151 }
152 process.exit(0);
153}
154
155for (const arg of args) {
156 consola.info(`Syncing scrobbles ${chalk.magenta(arg)} ...`);
157 await updateUris(arg);
158
159 const records = await ctx.db
160 .select({
161 scrobble: scrobbles,
162 })
163 .from(scrobbles)
164 .innerJoin(users, eq(scrobbles.userId, users.id))
165 .where(or(eq(users.did, arg), eq(users.handle, arg)))
166 .orderBy(desc(scrobbles.createdAt))
167 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20);
168
169 for (const { scrobble } of records) {
170 consola.info(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`);
171 try {
172 await publishScrobble(ctx, scrobble.id);
173 } catch (err) {
174 consola.error(`Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, err);
175 }
176 }
177 consola.info(`Synced ${chalk.greenBright(records.length)} scrobbles`);
178}
179
180process.exit(0);