forked from
rocksky.app/rocksky
A decentralized music tracking and discovery platform built on AT Protocol 馃幍
1import { consola } from "consola";
2import type { Context } from "context";
3import { eq, or, desc } from "drizzle-orm";
4import _ from "lodash";
5import { StringCodec } from "nats";
6import { createHash } from "node:crypto";
7import tables from "schema";
8import chalk from "chalk";
9import { publishScrobble } from "nowplaying/nowplaying.service";
10
11export function onNewScrobble(ctx: Context) {
12 const sc = StringCodec();
13 const sub = ctx.nc.subscribe("rocksky.scrobble.new");
14 (async () => {
15 for await (const m of sub) {
16 const scrobbleId = sc.decode(m.data);
17 const result = await ctx.db
18 .select()
19 .from(tables.scrobbles)
20 .where(eq(tables.scrobbles.id, scrobbleId))
21 .leftJoin(tables.users, eq(tables.scrobbles.userId, tables.users.id))
22 .execute()
23 .then((rows) => rows[0]);
24
25 if (!result) {
26 consola.info(`Scrobble with ID ${scrobbleId} not found, skipping`);
27 continue;
28 }
29
30 await updateUris(ctx, result.users.did);
31 await refreshScrobbles(ctx, result.users.did);
32 }
33 })();
34}
35
36async function updateUris(ctx: Context, did: string) {
37 // Get scrobbles with track and user data
38 const records = await ctx.db
39 .select({
40 track: tables.tracks,
41 user: tables.users,
42 })
43 .from(tables.scrobbles)
44 .innerJoin(tables.tracks, eq(tables.scrobbles.trackId, tables.tracks.id))
45 .innerJoin(tables.users, eq(tables.scrobbles.userId, tables.users.id))
46 .where(or(eq(tables.users.did, did), eq(tables.users.handle, did)))
47 .orderBy(desc(tables.scrobbles.createdAt))
48 .limit(5);
49
50 for (const { track } of records) {
51 const trackHash = createHash("sha256")
52 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase())
53 .digest("hex");
54
55 const existingTrack = await ctx.db
56 .select()
57 .from(tables.tracks)
58 .where(eq(tables.tracks.sha256, trackHash))
59 .limit(1)
60 .then((rows) => rows[0]);
61
62 if (existingTrack && !existingTrack.albumUri) {
63 consola.info(`Updating album uri for ${chalk.cyan(track.id)} ...`);
64
65 const albumHash = createHash("sha256")
66 .update(`${track.album} - ${track.albumArtist}`.toLowerCase())
67 .digest("hex");
68
69 const album = await ctx.db
70 .select()
71 .from(tables.albums)
72 .where(eq(tables.albums.sha256, albumHash))
73 .limit(1)
74 .then((rows) => rows[0]);
75
76 if (album) {
77 await ctx.db
78 .update(tables.tracks)
79 .set({ albumUri: album.uri })
80 .where(eq(tables.tracks.id, existingTrack.id));
81 }
82 }
83
84 if (existingTrack && !existingTrack.artistUri) {
85 consola.info(`Updating artist uri for ${chalk.cyan(track.id)} ...`);
86
87 const artistHash = createHash("sha256")
88 .update(track.albumArtist.toLowerCase())
89 .digest("hex");
90
91 const artist = await ctx.db
92 .select()
93 .from(tables.artists)
94 .where(eq(tables.artists.sha256, artistHash))
95 .limit(1)
96 .then((rows) => rows[0]);
97
98 if (artist) {
99 await ctx.db
100 .update(tables.tracks)
101 .set({ artistUri: artist.uri })
102 .where(eq(tables.tracks.id, existingTrack.id));
103 }
104 }
105
106 const albumHash = createHash("sha256")
107 .update(`${track.album} - ${track.albumArtist}`.toLowerCase())
108 .digest("hex");
109
110 const album = await ctx.db
111 .select()
112 .from(tables.albums)
113 .where(eq(tables.albums.sha256, albumHash))
114 .limit(1)
115 .then((rows) => rows[0]);
116
117 if (existingTrack && album && !album.artistUri) {
118 consola.info(`Updating artist uri for ${chalk.cyan(album.id)} ...`);
119
120 const artistHash = createHash("sha256")
121 .update(track.albumArtist.toLowerCase())
122 .digest("hex");
123
124 const artist = await ctx.db
125 .select()
126 .from(tables.artists)
127 .where(eq(tables.artists.sha256, artistHash))
128 .limit(1)
129 .then((rows) => rows[0]);
130
131 if (artist) {
132 await ctx.db
133 .update(tables.albums)
134 .set({ artistUri: artist.uri })
135 .where(eq(tables.albums.id, album.id));
136 }
137 }
138 }
139}
140
141async function refreshScrobbles(ctx: Context, did: string) {
142 const records = await ctx.db
143 .select({
144 scrobble: tables.scrobbles,
145 })
146 .from(tables.scrobbles)
147 .innerJoin(tables.users, eq(tables.scrobbles.userId, tables.users.id))
148 .where(or(eq(tables.users.did, did), eq(tables.users.handle, did)))
149 .orderBy(desc(tables.scrobbles.createdAt))
150 .limit(5);
151
152 for (const { scrobble } of records) {
153 consola.info(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`);
154 try {
155 await publishScrobble(ctx, scrobble.id);
156 } catch (err) {
157 consola.error(`Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, err);
158 }
159 }
160 consola.info(`Synced ${chalk.greenBright(records.length)} scrobbles`);
161}