A decentralized music tracking and discovery platform built on AT Protocol 馃幍
listenbrainz spotify atproto lastfm musicbrainz scrobbling
at main 5.3 kB view raw
1import chalk from "chalk"; 2import { consola } from "consola"; 3import { ctx } from "context"; 4import { desc, eq, or } from "drizzle-orm"; 5import { createHash } from "node:crypto"; 6import { publishScrobble } from "nowplaying/nowplaying.service"; 7import albums from "../schema/albums"; 8import artists from "../schema/artists"; 9import scrobbles from "../schema/scrobbles"; 10import tracks from "../schema/tracks"; 11import users from "../schema/users"; 12 13const args = process.argv.slice(2); 14 15export async function updateUris(did: string) { 16 // Get scrobbles with track and user data 17 const records = await ctx.db 18 .select({ 19 track: tracks, 20 user: users, 21 }) 22 .from(scrobbles) 23 .innerJoin(tracks, eq(scrobbles.trackId, tracks.id)) 24 .innerJoin(users, eq(scrobbles.userId, users.id)) 25 .where(or(eq(users.did, did), eq(users.handle, did))) 26 .orderBy(desc(scrobbles.createdAt)) 27 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20); 28 29 for (const { track } of records) { 30 const trackHash = createHash("sha256") 31 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase()) 32 .digest("hex"); 33 34 const existingTrack = await ctx.db 35 .select() 36 .from(tracks) 37 .where(eq(tracks.sha256, trackHash)) 38 .limit(1) 39 .then((rows) => rows[0]); 40 41 if (existingTrack && !existingTrack.albumUri) { 42 consola.info(`Updating album uri for ${chalk.cyan(track.id)} ...`); 43 44 const albumHash = createHash("sha256") 45 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 46 .digest("hex"); 47 48 const album = await ctx.db 49 .select() 50 .from(albums) 51 .where(eq(albums.sha256, albumHash)) 52 .limit(1) 53 .then((rows) => rows[0]); 54 55 if (album) { 56 await ctx.db 57 .update(tracks) 58 .set({ albumUri: album.uri }) 59 .where(eq(tracks.id, existingTrack.id)); 60 } 61 } 62 63 if (existingTrack && !existingTrack.artistUri) { 64 consola.info(`Updating artist uri for ${chalk.cyan(track.id)} ...`); 65 66 const artistHash = createHash("sha256") 67 .update(track.albumArtist.toLowerCase()) 68 .digest("hex"); 69 70 const artist = await ctx.db 71 .select() 72 .from(artists) 73 .where(eq(artists.sha256, artistHash)) 74 .limit(1) 75 .then((rows) => rows[0]); 76 77 if (artist) { 78 await ctx.db 79 .update(tracks) 80 .set({ artistUri: artist.uri }) 81 .where(eq(tracks.id, existingTrack.id)); 82 } 83 } 84 85 const albumHash = createHash("sha256") 86 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 87 .digest("hex"); 88 89 const album = await ctx.db 90 .select() 91 .from(albums) 92 .where(eq(albums.sha256, albumHash)) 93 .limit(1) 94 .then((rows) => rows[0]); 95 96 if (existingTrack && album && !album.artistUri) { 97 consola.info(`Updating artist uri for ${chalk.cyan(album.id)} ...`); 98 99 const artistHash = createHash("sha256") 100 .update(track.albumArtist.toLowerCase()) 101 .digest("hex"); 102 103 const artist = await ctx.db 104 .select() 105 .from(artists) 106 .where(eq(artists.sha256, artistHash)) 107 .limit(1) 108 .then((rows) => rows[0]); 109 110 if (artist) { 111 await ctx.db 112 .update(albums) 113 .set({ artistUri: artist.uri }) 114 .where(eq(albums.id, album.id)); 115 } 116 } 117 } 118} 119 120if (args.includes("--background")) { 121 consola.info("Wait for new scrobbles to sync ..."); 122 const sub = ctx.nc.subscribe("rocksky.user.scrobble.sync"); 123 for await (const m of sub) { 124 const did = new TextDecoder().decode(m.data); 125 // wait for 15 seconds to ensure the scrobble is fully created 126 await new Promise((resolve) => setTimeout(resolve, 15000)); 127 consola.info(`Syncing scrobbles ${chalk.magenta(did)} ...`); 128 await updateUris(did); 129 130 const records = await ctx.db 131 .select({ 132 scrobble: scrobbles, 133 }) 134 .from(scrobbles) 135 .innerJoin(users, eq(scrobbles.userId, users.id)) 136 .where(or(eq(users.did, did), eq(users.handle, did))) 137 .orderBy(desc(scrobbles.createdAt)) 138 .limit(5); 139 140 for (const { scrobble } of records) { 141 consola.info(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 142 try { 143 await publishScrobble(ctx, scrobble.id); 144 } catch (err) { 145 consola.error( 146 `Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, 147 err, 148 ); 149 } 150 } 151 } 152 process.exit(0); 153} 154 155for (const arg of args) { 156 consola.info(`Syncing scrobbles ${chalk.magenta(arg)} ...`); 157 await updateUris(arg); 158 159 const records = await ctx.db 160 .select({ 161 scrobble: scrobbles, 162 }) 163 .from(scrobbles) 164 .innerJoin(users, eq(scrobbles.userId, users.id)) 165 .where(or(eq(users.did, arg), eq(users.handle, arg))) 166 .orderBy(desc(scrobbles.createdAt)) 167 .limit(process.env.SYNC_SIZE ? parseInt(process.env.SYNC_SIZE, 10) : 20); 168 169 for (const { scrobble } of records) { 170 consola.info(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 171 try { 172 await publishScrobble(ctx, scrobble.id); 173 } catch (err) { 174 consola.error(`Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, err); 175 } 176 } 177 consola.info(`Synced ${chalk.greenBright(records.length)} scrobbles`); 178} 179 180process.exit(0);