A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 161 lines 4.9 kB view raw
1import { consola } from "consola"; 2import type { Context } from "context"; 3import { eq, or, desc } from "drizzle-orm"; 4import _ from "lodash"; 5import { StringCodec } from "nats"; 6import { createHash } from "node:crypto"; 7import tables from "schema"; 8import chalk from "chalk"; 9import { publishScrobble } from "nowplaying/nowplaying.service"; 10 11export function onNewScrobble(ctx: Context) { 12 const sc = StringCodec(); 13 const sub = ctx.nc.subscribe("rocksky.scrobble.new"); 14 (async () => { 15 for await (const m of sub) { 16 const scrobbleId = sc.decode(m.data); 17 const result = await ctx.db 18 .select() 19 .from(tables.scrobbles) 20 .where(eq(tables.scrobbles.id, scrobbleId)) 21 .leftJoin(tables.users, eq(tables.scrobbles.userId, tables.users.id)) 22 .execute() 23 .then((rows) => rows[0]); 24 25 if (!result) { 26 consola.info(`Scrobble with ID ${scrobbleId} not found, skipping`); 27 continue; 28 } 29 30 await updateUris(ctx, result.users.did); 31 await refreshScrobbles(ctx, result.users.did); 32 } 33 })(); 34} 35 36async function updateUris(ctx: Context, did: string) { 37 // Get scrobbles with track and user data 38 const records = await ctx.db 39 .select({ 40 track: tables.tracks, 41 user: tables.users, 42 }) 43 .from(tables.scrobbles) 44 .innerJoin(tables.tracks, eq(tables.scrobbles.trackId, tables.tracks.id)) 45 .innerJoin(tables.users, eq(tables.scrobbles.userId, tables.users.id)) 46 .where(or(eq(tables.users.did, did), eq(tables.users.handle, did))) 47 .orderBy(desc(tables.scrobbles.createdAt)) 48 .limit(5); 49 50 for (const { track } of records) { 51 const trackHash = createHash("sha256") 52 .update(`${track.title} - ${track.artist} - ${track.album}`.toLowerCase()) 53 .digest("hex"); 54 55 const existingTrack = await ctx.db 56 .select() 57 .from(tables.tracks) 58 .where(eq(tables.tracks.sha256, trackHash)) 59 .limit(1) 60 .then((rows) => rows[0]); 61 62 if (existingTrack && !existingTrack.albumUri) { 63 consola.info(`Updating album uri for ${chalk.cyan(track.id)} ...`); 64 65 const albumHash = createHash("sha256") 66 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 67 .digest("hex"); 68 69 const album = await ctx.db 70 .select() 71 .from(tables.albums) 72 .where(eq(tables.albums.sha256, albumHash)) 73 .limit(1) 74 .then((rows) => rows[0]); 75 76 if (album) { 77 await ctx.db 78 .update(tables.tracks) 79 .set({ albumUri: album.uri }) 80 .where(eq(tables.tracks.id, existingTrack.id)); 81 } 82 } 83 84 if (existingTrack && !existingTrack.artistUri) { 85 consola.info(`Updating artist uri for ${chalk.cyan(track.id)} ...`); 86 87 const artistHash = createHash("sha256") 88 .update(track.albumArtist.toLowerCase()) 89 .digest("hex"); 90 91 const artist = await ctx.db 92 .select() 93 .from(tables.artists) 94 .where(eq(tables.artists.sha256, artistHash)) 95 .limit(1) 96 .then((rows) => rows[0]); 97 98 if (artist) { 99 await ctx.db 100 .update(tables.tracks) 101 .set({ artistUri: artist.uri }) 102 .where(eq(tables.tracks.id, existingTrack.id)); 103 } 104 } 105 106 const albumHash = createHash("sha256") 107 .update(`${track.album} - ${track.albumArtist}`.toLowerCase()) 108 .digest("hex"); 109 110 const album = await ctx.db 111 .select() 112 .from(tables.albums) 113 .where(eq(tables.albums.sha256, albumHash)) 114 .limit(1) 115 .then((rows) => rows[0]); 116 117 if (existingTrack && album && !album.artistUri) { 118 consola.info(`Updating artist uri for ${chalk.cyan(album.id)} ...`); 119 120 const artistHash = createHash("sha256") 121 .update(track.albumArtist.toLowerCase()) 122 .digest("hex"); 123 124 const artist = await ctx.db 125 .select() 126 .from(tables.artists) 127 .where(eq(tables.artists.sha256, artistHash)) 128 .limit(1) 129 .then((rows) => rows[0]); 130 131 if (artist) { 132 await ctx.db 133 .update(tables.albums) 134 .set({ artistUri: artist.uri }) 135 .where(eq(tables.albums.id, album.id)); 136 } 137 } 138 } 139} 140 141async function refreshScrobbles(ctx: Context, did: string) { 142 const records = await ctx.db 143 .select({ 144 scrobble: tables.scrobbles, 145 }) 146 .from(tables.scrobbles) 147 .innerJoin(tables.users, eq(tables.scrobbles.userId, tables.users.id)) 148 .where(or(eq(tables.users.did, did), eq(tables.users.handle, did))) 149 .orderBy(desc(tables.scrobbles.createdAt)) 150 .limit(5); 151 152 for (const { scrobble } of records) { 153 consola.info(`Syncing scrobble ${chalk.cyan(scrobble.id)} ...`); 154 try { 155 await publishScrobble(ctx, scrobble.id); 156 } catch (err) { 157 consola.error(`Failed to sync scrobble ${chalk.cyan(scrobble.id)}:`, err); 158 } 159 } 160 consola.info(`Synced ${chalk.greenBright(records.length)} scrobbles`); 161}