A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz
at main 547 lines 16 kB view raw
1import { AtpAgent } from "@atproto/api"; 2import { Record } from "@atproto/api/dist/client/types/com/atproto/repo/listRecords"; 3import { consola } from "consola"; 4import { ctx } from "context"; 5import extractPdsFromDid from "lib/extractPdsFromDid"; 6import chalk from "chalk"; 7import * as Song from "lexicon/types/app/rocksky/song"; 8import * as Artist from "lexicon/types/app/rocksky/artist"; 9import * as Album from "lexicon/types/app/rocksky/album"; 10import * as Scrobble from "lexicon/types/app/rocksky/scrobble"; 11import schema from "schema"; 12import { and, eq, or } from "drizzle-orm"; 13import { createHash } from "node:crypto"; 14import { publishScrobble } from "nowplaying/nowplaying.service"; 15 16const args = process.argv.slice(2); 17 18if (args.length === 0) { 19 consola.error("Please provide user identifier (handle or DID)."); 20 console.log(`Usage: ${chalk.cyan("npm run collections -- <handle|did>")}`); 21 process.exit(1); 22} 23 24let did: string = args[0]; 25 26if (!did.startsWith("did:")) { 27 did = await ctx.baseIdResolver.handle.resolve(did); 28} 29 30const [user] = await ctx.db 31 .select() 32 .from(schema.users) 33 .where(eq(schema.users.did, did)) 34 .execute(); 35if (!user) { 36 consola.error(`User with DID ${chalk.cyan(did)} not found in database.`); 37 process.exit(1); 38} 39 40async function getAtpAgent(did: string): Promise<AtpAgent> { 41 const serviceEndpoint = await extractPdsFromDid(did); 42 43 consola.info(`Using service endpoint: ${chalk.cyan(serviceEndpoint)}`); 44 45 return new AtpAgent({ service: serviceEndpoint }); 46} 47 48async function getScrobbleRecords(agent: AtpAgent, did: string) { 49 const records = []; 50 let cursor: string | undefined = undefined; 51 52 do { 53 const res = await agent.com.atproto.repo.listRecords({ 54 repo: did, 55 collection: "app.rocksky.scrobble", 56 limit: 100, 57 cursor, 58 }); 59 records.push(...res.data.records); 60 cursor = res.data.cursor; 61 consola.info( 62 `Fetched ${chalk.greenBright(records.length)} scrobble records so far...`, 63 ); 64 } while (cursor); 65 66 return records; 67} 68 69async function getSongRecords(agent: AtpAgent, did: string) { 70 const records = []; 71 let cursor: string | undefined = undefined; 72 73 do { 74 const res = await agent.com.atproto.repo.listRecords({ 75 repo: did, 76 collection: "app.rocksky.song", 77 limit: 100, 78 cursor, 79 }); 80 records.push(...res.data.records); 81 cursor = res.data.cursor; 82 consola.info( 83 `Fetched ${chalk.greenBright(records.length)} song records so far...`, 84 ); 85 } while (cursor); 86 87 return records; 88} 89 90async function getArtistRecords(agent: AtpAgent, did: string) { 91 const records = []; 92 let cursor: string | undefined = undefined; 93 94 do { 95 const res = await agent.com.atproto.repo.listRecords({ 96 repo: did, 97 collection: "app.rocksky.artist", 98 limit: 100, 99 cursor, 100 }); 101 records.push(...res.data.records); 102 cursor = res.data.cursor; 103 consola.info( 104 `Fetched ${chalk.greenBright(records.length)} artist records so far...`, 105 ); 106 } while (cursor); 107 108 return records; 109} 110 111async function getAlbumRecords(agent: AtpAgent, did: string) { 112 const records = []; 113 let cursor: string | undefined = undefined; 114 115 do { 116 const res = await agent.com.atproto.repo.listRecords({ 117 repo: did, 118 collection: "app.rocksky.album", 119 limit: 100, 120 cursor, 121 }); 122 records.push(...res.data.records); 123 cursor = res.data.cursor; 124 consola.info( 125 `Fetched ${chalk.greenBright(records.length)} album records so far...`, 126 ); 127 } while (cursor); 128 129 return records; 130} 131 132async function insertScrobbles(scrobbles: Record[]) { 133 await Promise.all( 134 scrobbles.map(async (scrobble) => { 135 const value: Scrobble.Record = scrobble.value as Scrobble.Record; 136 consola.info( 137 `Inserting scrobble: ${chalk.greenBright(value.title)} ${chalk.cyan(scrobble.uri)}`, 138 ); 139 const trackSha256 = createHash("sha256") 140 .update( 141 `${value.title} - ${value.artist} - ${value.album}`.toLowerCase(), 142 ) 143 .digest("hex"); 144 const albumSha256 = createHash("sha256") 145 .update(`${value.album} - ${value.albumArtist}`.toLowerCase()) 146 .digest("hex"); 147 const artistSha256 = createHash("sha256") 148 .update(value.albumArtist.toLowerCase()) 149 .digest("hex"); 150 151 const [[track], [album], [artist]] = await Promise.all([ 152 ctx.db 153 .select() 154 .from(schema.tracks) 155 .where( 156 value.mbid 157 ? or( 158 eq(schema.tracks.mbId, value.mbid), 159 eq(schema.tracks.sha256, trackSha256), 160 ) 161 : eq(schema.tracks.sha256, trackSha256), 162 ) 163 .limit(1) 164 .execute(), 165 ctx.db 166 .select() 167 .from(schema.albums) 168 .where(eq(schema.albums.sha256, albumSha256)) 169 .limit(1) 170 .execute(), 171 ctx.db 172 .select() 173 .from(schema.artists) 174 .where(eq(schema.artists.sha256, artistSha256)) 175 .limit(1) 176 .execute(), 177 ]); 178 let [newScrobble] = await ctx.db 179 .insert(schema.scrobbles) 180 .values({ 181 albumId: album.id, 182 trackId: track.id, 183 artistId: artist.id, 184 uri: scrobble.uri, 185 userId: user.id, 186 timestamp: new Date(value.createdAt), 187 createdAt: new Date(value.createdAt), 188 }) 189 .onConflictDoNothing() 190 .returning() 191 .execute(); 192 193 try { 194 if (!newScrobble) { 195 [newScrobble] = await ctx.db 196 .select() 197 .from(schema.scrobbles) 198 .where( 199 or( 200 and( 201 eq(schema.scrobbles.userId, user.id), 202 eq(schema.scrobbles.trackId, track.id), 203 eq(schema.scrobbles.artistId, artist.id), 204 eq(schema.scrobbles.timestamp, new Date(value.createdAt)), 205 ), 206 eq(schema.scrobbles.uri, scrobble.uri), 207 ), 208 ) 209 .limit(1) 210 .execute(); 211 } 212 if (!newScrobble) { 213 consola.warn( 214 `Scrobble not found after conflict for ${chalk.cyan(value.title)} ${chalk.yellow( 215 scrobble.uri, 216 )} — skipping publish`, 217 ); 218 await ctx.db 219 .insert(schema.scrobbles) 220 .values({ 221 albumId: album.id, 222 trackId: track.id, 223 artistId: artist.id, 224 uri: scrobble.uri, 225 userId: user.id, 226 timestamp: new Date(value.createdAt), 227 createdAt: new Date(value.createdAt), 228 }) 229 .returning() 230 .execute(); 231 return; 232 } 233 await publishScrobble(ctx, newScrobble.id); 234 } catch (err) { 235 consola.error(`Failed to sync scrobble:`, err); 236 } 237 }), 238 ); 239} 240 241async function insertSongs(songs: Record[]) { 242 await Promise.all( 243 songs.map(async (song) => { 244 const value: Song.Record = song.value as Song.Record; 245 try { 246 consola.info( 247 `Inserting song: ${chalk.greenBright(value.title)} ${chalk.cyan(song.uri)}`, 248 ); 249 250 const [[artist], [album]] = await Promise.all([ 251 ctx.db 252 .select() 253 .from(schema.artists) 254 .where(eq(schema.artists.name, value.albumArtist)) 255 .limit(1) 256 .execute(), 257 ctx.db 258 .select() 259 .from(schema.albums) 260 .where( 261 and( 262 eq(schema.albums.title, value.album), 263 eq(schema.albums.artist, value.albumArtist), 264 ), 265 ) 266 .limit(1) 267 .execute(), 268 ]); 269 270 if (!artist) { 271 consola.warn( 272 `Artist not found for song ${chalk.cyan(value.title)}: ${chalk.yellow(value.albumArtist)} — skipping`, 273 ); 274 return; 275 } 276 if (!album) { 277 consola.warn( 278 `Album not found for song ${chalk.cyan(value.title)}: ${chalk.yellow(value.album)} — skipping`, 279 ); 280 return; 281 } 282 283 const trackSha256 = createHash("sha256") 284 .update( 285 `${value.title} - ${value.artist} - ${value.album}`.toLowerCase(), 286 ) 287 .digest("hex"); 288 289 let [newTrack] = await ctx.db 290 .insert(schema.tracks) 291 .values({ 292 title: value.title, 293 artist: value.artist, 294 albumArtist: value.albumArtist, 295 album: value.album, 296 uri: song.uri, 297 albumArt: value.albumArtUrl, 298 artistUri: artist.uri, 299 albumUri: album.uri, 300 sha256: trackSha256, 301 duration: value.duration, 302 mbId: value.mbid, 303 trackNumber: value.trackNumber, 304 discNumber: value.discNumber, 305 composer: value.composer, 306 label: value.label, 307 lyrics: value.lyrics, 308 genre: value.genre, 309 copyrightMessage: value.copyrightMessage, 310 spotifyLink: value.spotifyLink, 311 appleMusicLink: value.appleMusicLink, 312 tidalLink: value.tidalLink, 313 createdAt: new Date(value.createdAt), 314 }) 315 .onConflictDoNothing() 316 .returning() 317 .execute(); 318 319 if (!newTrack) { 320 const [existingTrack] = await ctx.db 321 .select() 322 .from(schema.tracks) 323 .where( 324 value.mbid 325 ? or( 326 eq(schema.tracks.mbId, value.mbid), 327 eq(schema.tracks.sha256, trackSha256), 328 ) 329 : eq(schema.tracks.sha256, trackSha256), 330 ) 331 .limit(1) 332 .execute(); 333 newTrack = existingTrack; 334 if (!existingTrack) { 335 consola.warn( 336 `Track not found after conflict for song ${chalk.cyan(value.title)} ${value.mbid} — skipping`, 337 ); 338 return; 339 } 340 } 341 342 await Promise.all([ 343 ctx.db 344 .insert(schema.userTracks) 345 .values({ 346 userId: user.id, 347 trackId: newTrack.id, 348 uri: song.uri, 349 scrobbles: 1, 350 }) 351 .onConflictDoUpdate({ 352 target: [schema.userTracks.userId, schema.userTracks.trackId], 353 set: { 354 scrobbles: 1, 355 }, 356 }) 357 .returning() 358 .execute(), 359 ctx.db 360 .insert(schema.albumTracks) 361 .values({ 362 albumId: album.id, 363 trackId: newTrack.id, 364 }) 365 .onConflictDoNothing() 366 .execute(), 367 ctx.db 368 .insert(schema.artistTracks) 369 .values({ 370 artistId: artist.id, 371 trackId: newTrack.id, 372 }) 373 .onConflictDoNothing() 374 .execute(), 375 ]); 376 } catch (error) { 377 const metadata = `${value.title} - ${value.artist} - ${value.album}`; 378 consola.error( 379 `Failed to insert song record with URI ${chalk.cyan(metadata)} ${song.uri} ${createHash( 380 "sha256", 381 ) 382 .update( 383 `${value.title} - ${value.artist} - ${value.album}`.toLowerCase(), 384 ) 385 .digest("hex")}`, 386 error, 387 ); 388 consola.info(JSON.stringify(value, null, 2)); 389 } 390 }), 391 ); 392} 393 394async function insertArtists(artists: Record[]) { 395 await Promise.all( 396 artists.map(async (artist) => { 397 const value: Artist.Record = artist.value as Artist.Record; 398 consola.info( 399 `Inserting artist: ${chalk.greenBright(value.name)} ${chalk.cyan(artist.uri)}`, 400 ); 401 const sha256 = createHash("sha256") 402 .update(value.name.toLowerCase()) 403 .digest("hex"); 404 405 let [newArtist] = await ctx.db 406 .insert(schema.artists) 407 .values({ 408 uri: artist.uri, 409 name: value.name, 410 sha256, 411 picture: value.pictureUrl, 412 genres: value.tags, 413 createdAt: new Date(value.createdAt), 414 }) 415 .onConflictDoNothing() 416 .returning() 417 .execute(); 418 419 if (!newArtist) { 420 const [existingArtist] = await ctx.db 421 .select() 422 .from(schema.artists) 423 .where(eq(schema.artists.sha256, sha256)) 424 .limit(1) 425 .execute(); 426 newArtist = existingArtist; 427 } 428 429 await ctx.db 430 .insert(schema.userArtists) 431 .values({ 432 userId: user.id, 433 artistId: newArtist.id, 434 uri: artist.uri, 435 scrobbles: 1, 436 }) 437 .onConflictDoUpdate({ 438 target: [schema.userArtists.userId, schema.userArtists.artistId], 439 set: { 440 scrobbles: 1, 441 }, 442 }) 443 .execute(); 444 }), 445 ); 446} 447 448async function insertAlbums(albums: Record[]) { 449 await Promise.all( 450 albums.map(async (album) => { 451 const value: Album.Record = album.value as Album.Record; 452 consola.info( 453 `Inserting album: ${chalk.greenBright(value.title)} ${chalk.cyan(album.uri)}`, 454 ); 455 456 const sha256 = createHash("sha256") 457 .update(`${value.title} - ${value.artist}`.toLowerCase()) 458 .digest("hex"); 459 460 const [artist] = await ctx.db 461 .select() 462 .from(schema.artists) 463 .where(eq(schema.artists.name, value.artist)) 464 .limit(1) 465 .execute(); 466 467 let [newAlbum] = await ctx.db 468 .insert(schema.albums) 469 .values({ 470 title: value.title, 471 artist: value.artist, 472 uri: album.uri, 473 albumArt: value.albumArtUrl, 474 artistUri: artist.uri, 475 sha256, 476 year: value.year, 477 releaseDate: value.releaseDate, 478 }) 479 .onConflictDoUpdate({ 480 target: schema.albums.sha256, 481 set: { 482 albumArt: value.albumArtUrl, 483 artistUri: artist.uri, 484 year: value.year, 485 releaseDate: value.releaseDate, 486 }, 487 }) 488 .returning() 489 .execute(); 490 491 if (!newAlbum) { 492 const [existingAlbum] = await ctx.db 493 .select() 494 .from(schema.albums) 495 .where(eq(schema.albums.sha256, sha256)) 496 .limit(1) 497 .execute(); 498 newAlbum = existingAlbum; 499 } 500 501 await Promise.all([ 502 ctx.db 503 .insert(schema.userAlbums) 504 .values({ 505 userId: user.id, 506 albumId: newAlbum.id, 507 uri: album.uri, 508 scrobbles: 1, 509 }) 510 .onConflictDoUpdate({ 511 target: [schema.userAlbums.userId, schema.userAlbums.albumId], 512 set: { 513 scrobbles: 1, 514 }, 515 }) 516 .execute(), 517 ctx.db 518 .insert(schema.artistAlbums) 519 .values({ 520 albumId: newAlbum.id, 521 artistId: artist.id, 522 }) 523 .onConflictDoNothing() 524 .execute(), 525 ]); 526 }), 527 ); 528} 529 530async function main() { 531 const agent = await getAtpAgent(did); 532 const scrobbles = await getScrobbleRecords(agent, did); 533 const songs = await getSongRecords(agent, did); 534 const artists = await getArtistRecords(agent, did); 535 const albums = await getAlbumRecords(agent, did); 536 537 await insertArtists(artists); 538 await insertAlbums(albums); 539 await insertSongs(songs); 540 await insertScrobbles(scrobbles); 541 542 consola.success(`${chalk.cyan(args[0])} Collections fetched successfully!`); 543 544 process.exit(0); 545} 546 547await main();