A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต
listenbrainz spotify atproto lastfm musicbrainz scrobbling

feat: Enhance MusicBrainz integration and improve data handling #10

merged opened by tsiry-sandratraina.com targeting main from feat/musicbrainz
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/sh.tangled.repo.pull/3lzvdu4sh5l22
+1582 -186
Diff #1
+57 -4
Cargo.lock
··· 3184 3184 "windows-sys 0.59.0", 3185 3185 ] 3186 3186 3187 + [[package]] 3188 + name = "nanoid" 3189 + version = "0.4.0" 3190 + source = "registry+https://github.com/rust-lang/crates.io-index" 3191 + checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" 3192 + dependencies = [ 3193 + "rand 0.8.5", 3194 + ] 3195 + 3187 3196 [[package]] 3188 3197 name = "nkeys" 3189 3198 version = "0.4.4" ··· 4697 4706 4698 4707 [[package]] 4699 4708 name = "regex" 4700 - version = "1.11.1" 4709 + version = "1.11.3" 4701 4710 source = "registry+https://github.com/rust-lang/crates.io-index" 4702 - checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" 4711 + checksum = "8b5288124840bee7b386bc413c487869b360b2b4ec421ea56425128692f2a82c" 4703 4712 dependencies = [ 4704 4713 "aho-corasick", 4705 4714 "memchr", ··· 4709 4718 4710 4719 [[package]] 4711 4720 name = "regex-automata" 4712 - version = "0.4.9" 4721 + version = "0.4.11" 4713 4722 source = "registry+https://github.com/rust-lang/crates.io-index" 4714 - checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" 4723 + checksum = "833eb9ce86d40ef33cb1306d8accf7bc8ec2bfea4355cbdebb3df68b40925cad" 4715 4724 dependencies = [ 4716 4725 "aho-corasick", 4717 4726 "memchr", ··· 4999 5008 "hex", 5000 5009 "jsonwebtoken", 5001 5010 "md5", 5011 + "nanoid", 5002 5012 "owo-colors", 5003 5013 "quick-xml 0.37.5", 5004 5014 "rand 0.9.2", ··· 5006 5016 "reqwest", 5007 5017 "serde", 5008 5018 "serde_json", 5019 + "serial_test", 5009 5020 "sqlx", 5010 5021 "tokio", 5011 5022 "tokio-stream", ··· 5089 5100 "hex", 5090 5101 "jsonwebtoken", 5091 5102 "md5", 5103 + "nanoid", 5092 5104 "owo-colors", 5093 5105 "rand 0.9.2", 5094 5106 "redis 0.29.5", 5095 5107 "reqwest", 5096 5108 "serde", 5097 5109 "serde_json", 5110 + "serial_test", 5098 5111 "sqlx", 5099 5112 "tokio", 5100 5113 "tokio-stream", ··· 5438 5451 "winapi-util", 5439 5452 ] 5440 5453 5454 + [[package]] 5455 + name = "scc" 5456 + version = "2.4.0" 5457 + source = "registry+https://github.com/rust-lang/crates.io-index" 5458 + checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" 5459 + dependencies = [ 5460 + "sdd", 5461 + ] 5462 + 5441 5463 [[package]] 5442 5464 name = "schannel" 5443 5465 version = "0.1.27" ··· 5463 5485 "untrusted", 5464 5486 ] 5465 5487 5488 + [[package]] 5489 + name = "sdd" 5490 + version = "3.0.10" 5491 + source = "registry+https://github.com/rust-lang/crates.io-index" 5492 + checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" 5493 + 5466 5494 [[package]] 5467 5495 name = "seahash" 5468 5496 version = "4.1.0" ··· 5612 5640 "serde", 5613 5641 ] 5614 5642 5643 + [[package]] 5644 + name = "serial_test" 5645 + version = "3.2.0" 5646 + source = "registry+https://github.com/rust-lang/crates.io-index" 5647 + checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" 5648 + dependencies = [ 5649 + "futures", 5650 + "log", 5651 + "once_cell", 5652 + "parking_lot", 5653 + "scc", 5654 + "serial_test_derive", 5655 + ] 5656 + 5657 + [[package]] 5658 + name = "serial_test_derive" 5659 + version = "3.2.0" 5660 + source = "registry+https://github.com/rust-lang/crates.io-index" 5661 + checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" 5662 + dependencies = [ 5663 + "proc-macro2", 5664 + "quote", 5665 + "syn 2.0.101", 5666 + ] 5667 + 5615 5668 [[package]] 5616 5669 name = "sha1" 5617 5670 version = "0.10.6"
+5 -1
crates/scrobbler/Cargo.toml
··· 22 22 dotenv = "0.15.0" 23 23 anyhow = "1.0.96" 24 24 actix-web = "4.9.0" 25 - redis = "0.29.0" 25 + redis = { version = "0.29.0", features = ["tokio-comp"] } 26 26 hex = "0.4.3" 27 27 jsonwebtoken = "9.3.1" 28 28 md5 = "0.7.0" ··· 41 41 actix-session = "0.10.1" 42 42 tokio-stream = { version = "0.1.17", features = ["full"] } 43 43 tracing = "0.1.41" 44 + nanoid = "0.4.0" 45 + 46 + [dev-dependencies] 47 + serial_test = "3.0.0"
+15 -5
crates/scrobbler/src/handlers/mod.rs
··· 9 9 use v1::submission::submission; 10 10 11 11 use crate::cache::Cache; 12 + use crate::musicbrainz::client::MusicbrainzClient; 12 13 use crate::BANNER; 13 14 14 15 pub mod scrobble; ··· 43 44 pub async fn handle_submission( 44 45 data: web::Data<Arc<Pool<Postgres>>>, 45 46 cache: web::Data<Cache>, 47 + mb_client: web::Data<Arc<MusicbrainzClient>>, 46 48 form: web::Form<BTreeMap<String, String>>, 47 49 ) -> impl Responder { 48 - submission(form.into_inner(), cache.get_ref(), data.get_ref()) 49 - .await 50 - .map_err(actix_web::error::ErrorInternalServerError) 50 + submission( 51 + form.into_inner(), 52 + cache.get_ref(), 53 + data.get_ref(), 54 + mb_client.get_ref(), 55 + ) 56 + .await 57 + .map_err(actix_web::error::ErrorInternalServerError) 51 58 } 52 59 53 60 #[get("/2.0")] ··· 60 67 data: web::Data<Arc<Pool<Postgres>>>, 61 68 cache: web::Data<Cache>, 62 69 form: web::Form<BTreeMap<String, String>>, 70 + mb_client: web::Data<Arc<MusicbrainzClient>>, 63 71 ) -> impl Responder { 64 72 let conn = data.get_ref(); 65 73 let cache = cache.get_ref(); 74 + let mb_client = mb_client.get_ref(); 66 75 67 76 let method = form.get("method").unwrap_or(&"".to_string()).to_string(); 68 - call_method(&method, conn, cache, form.into_inner()) 77 + call_method(&method, conn, cache, mb_client, form.into_inner()) 69 78 .await 70 79 .map_err(actix_web::error::ErrorInternalServerError) 71 80 } ··· 74 83 method: &str, 75 84 pool: &Arc<Pool<Postgres>>, 76 85 cache: &Cache, 86 + mb_client: &Arc<MusicbrainzClient>, 77 87 form: BTreeMap<String, String>, 78 88 ) -> Result<HttpResponse, Error> { 79 89 match method { 80 - "track.scrobble" => handle_scrobble(form, pool, cache).await, 90 + "track.scrobble" => handle_scrobble(form, pool, cache, mb_client).await, 81 91 _ => Err(Error::msg(format!("Unsupported method: {}", method))), 82 92 } 83 93 }
+4 -3
crates/scrobbler/src/handlers/scrobble.rs
··· 5 5 use std::collections::BTreeMap; 6 6 7 7 use crate::{ 8 - auth::authenticate, cache::Cache, params::validate_scrobble_params, response::build_response, 9 - scrobbler::scrobble, 8 + auth::authenticate, cache::Cache, musicbrainz::client::MusicbrainzClient, 9 + params::validate_scrobble_params, response::build_response, scrobbler::scrobble, 10 10 }; 11 11 12 12 pub async fn handle_scrobble( 13 13 form: BTreeMap<String, String>, 14 14 conn: &Pool<sqlx::Postgres>, 15 15 cache: &Cache, 16 + mb_client: &MusicbrainzClient, 16 17 ) -> Result<HttpResponse, Error> { 17 18 let params = match validate_scrobble_params(&form, &["api_key", "api_sig", "sk", "method"]) { 18 19 Ok(params) => params, ··· 31 32 }))); 32 33 } 33 34 34 - match scrobble(&conn, cache, &form).await { 35 + match scrobble(&conn, cache, mb_client, &form).await { 35 36 Ok(scrobbles) => Ok(HttpResponse::Ok().json(build_response(scrobbles))), 36 37 Err(e) => { 37 38 if e.to_string().contains("Timestamp") {
+4 -2
crates/scrobbler/src/handlers/v1/submission.rs
··· 4 4 use std::{collections::BTreeMap, sync::Arc}; 5 5 6 6 use crate::{ 7 - auth::verify_session_id, cache::Cache, params::validate_required_params, scrobbler::scrobble_v1, 7 + auth::verify_session_id, cache::Cache, musicbrainz::client::MusicbrainzClient, 8 + params::validate_required_params, scrobbler::scrobble_v1, 8 9 }; 9 10 10 11 pub async fn submission( 11 12 form: BTreeMap<String, String>, 12 13 cache: &Cache, 13 14 pool: &Arc<sqlx::Pool<sqlx::Postgres>>, 15 + mb_client: &Arc<MusicbrainzClient>, 14 16 ) -> Result<HttpResponse, Error> { 15 17 match validate_required_params(&form, &["s", "a[0]", "t[0]", "i[0]"]) { 16 18 Ok(_) => { ··· 30 32 let user_id = user_id.unwrap(); 31 33 tracing::info!(artist = %a, track = %t, timestamp = %i, user_id = %user_id, "Submission"); 32 34 33 - match scrobble_v1(pool, cache, &form).await { 35 + match scrobble_v1(pool, cache, mb_client, &form).await { 34 36 Ok(_) => Ok(HttpResponse::Ok().body("OK\n")), 35 37 Err(e) => Ok(HttpResponse::BadRequest().json(json!({ 36 38 "error": 4,
+5 -1
crates/scrobbler/src/lib.rs
··· 27 27 use owo_colors::OwoColorize; 28 28 use sqlx::postgres::PgPoolOptions; 29 29 30 - use crate::cache::Cache; 30 + use crate::{cache::Cache, musicbrainz::client::MusicbrainzClient}; 31 31 32 32 pub const BANNER: &str = r#" 33 33 ___ ___ _____ __ __ __ ··· 71 71 .unwrap(), 72 72 ); 73 73 74 + let mb_client = MusicbrainzClient::new().await?; 75 + let mb_client = Arc::new(mb_client); 76 + 74 77 HttpServer::new(move || { 75 78 App::new() 76 79 .wrap(RateLimiter::default()) 77 80 .app_data(limiter.clone()) 78 81 .app_data(Data::new(conn.clone())) 79 82 .app_data(Data::new(cache.clone())) 83 + .app_data(Data::new(mb_client.clone())) 80 84 .service(handlers::handle_methods) 81 85 .service(handlers::handle_nowplaying) 82 86 .service(handlers::handle_submission)
+3 -1
crates/scrobbler/src/listenbrainz/core/submit.rs
··· 5 5 use std::sync::Arc; 6 6 7 7 use crate::auth::decode_token; 8 + use crate::musicbrainz::client::MusicbrainzClient; 8 9 use crate::repo; 9 10 use crate::{cache::Cache, scrobbler::scrobble_listenbrainz}; 10 11 ··· 14 15 payload: SubmitListensRequest, 15 16 cache: &Cache, 16 17 pool: &Arc<sqlx::Pool<sqlx::Postgres>>, 18 + mb_client: &Arc<MusicbrainzClient>, 17 19 token: &str, 18 20 ) -> Result<HttpResponse, Error> { 19 21 if payload.listen_type != "playing_now" { ··· 29 31 30 32 const RETRIES: usize = 15; 31 33 for attempt in 1..=RETRIES { 32 - match scrobble_listenbrainz(pool, cache, &payload, token).await { 34 + match scrobble_listenbrainz(pool, cache, mb_client, &payload, token).await { 33 35 Ok(_) => { 34 36 return Ok(HttpResponse::Ok().json(json!({ 35 37 "status": "ok",
+4 -1
crates/scrobbler/src/listenbrainz/handlers.rs
··· 16 16 }, 17 17 types::SubmitListensRequest, 18 18 }, 19 + musicbrainz::client::MusicbrainzClient, 19 20 read_payload, repo, 20 21 }; 21 22 use tokio_stream::StreamExt; ··· 39 40 req: HttpRequest, 40 41 data: web::Data<Arc<Pool<Postgres>>>, 41 42 cache: web::Data<Cache>, 43 + mb_client: web::Data<Arc<MusicbrainzClient>>, 42 44 mut payload: web::Payload, 43 45 ) -> impl Responder { 44 46 let token = match req.headers().get("Authorization") { ··· 63 65 }) 64 66 .map_err(actix_web::error::ErrorBadRequest)?; 65 67 66 - submit_listens(req, cache.get_ref(), data.get_ref(), token) 68 + let mb_client = mb_client.get_ref(); 69 + submit_listens(req, cache.get_ref(), data.get_ref(), &mb_client, token) 67 70 .await 68 71 .map_err(actix_web::error::ErrorInternalServerError) 69 72 }
+2 -2
crates/scrobbler/src/musicbrainz/artist.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 2 3 - #[derive(Debug, Deserialize, Clone)] 3 + #[derive(Debug, Deserialize, Clone, Default)] 4 4 pub struct Artist { 5 5 pub name: String, 6 6 #[serde(rename = "sort-name")] ··· 29 29 pub aliases: Option<Vec<Alias>>, 30 30 } 31 31 32 - #[derive(Debug, Deserialize, Clone)] 32 + #[derive(Debug, Deserialize, Clone, Default)] 33 33 pub struct ArtistCredit { 34 34 pub joinphrase: Option<String>, 35 35 pub name: String,
+310 -24
crates/scrobbler/src/musicbrainz/client.rs
··· 1 + use std::env; 2 + 1 3 use super::recording::{Recording, Recordings}; 2 - use anyhow::Error; 4 + use anyhow::{anyhow, Context, Error}; 5 + use redis::aio::MultiplexedConnection; 6 + use redis::AsyncCommands; 7 + use serde::{Deserialize, Serialize}; 8 + use tokio::time::{timeout, Duration, Instant}; 3 9 4 10 pub const BASE_URL: &str = "https://musicbrainz.org/ws/2"; 5 - pub const USER_AGENT: &str = "Rocksky/0.1.0"; 11 + pub const USER_AGENT: &str = "Rocksky/0.1.0 (+https://rocksky.app)"; 12 + 13 + const Q_QUEUE: &str = "mb:queue:v1"; 14 + const CACHE_SEARCH_PREFIX: &str = "mb:cache:search:"; 15 + const CACHE_REC_PREFIX: &str = "mb:cache:rec:"; 16 + const INFLIGHT_PREFIX: &str = "mb:inflight:"; 17 + const RESP_PREFIX: &str = "mb:resp:"; 6 18 7 - pub struct MusicbrainzClient {} 19 + const CACHE_TTL_SECS: u64 = 60 * 60 * 24; 20 + const WAIT_TIMEOUT_SECS: u64 = 20; 21 + const INFLIGHT_TTL_SECS: i64 = 30; // de-dup window while the worker is fetching 22 + 23 + #[derive(Clone)] 24 + pub struct MusicbrainzClient { 25 + http: reqwest::Client, 26 + redis: MultiplexedConnection, 27 + cache_ttl: u64, 28 + } 29 + 30 + #[derive(Debug, Serialize, Deserialize)] 31 + #[serde(tag = "kind", rename_all = "snake_case")] 32 + enum Job { 33 + Search { id: String, query: String }, 34 + GetRecording { id: String, mbid: String }, 35 + } 8 36 9 37 impl MusicbrainzClient { 10 - pub fn new() -> Self { 11 - MusicbrainzClient {} 38 + pub async fn new() -> Result<Self, Error> { 39 + let client = 40 + redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))?; 41 + let redis = client.get_multiplexed_tokio_connection().await?; 42 + let http = reqwest::Client::builder() 43 + .user_agent(USER_AGENT) 44 + .build() 45 + .context("build http client")?; 46 + let me = MusicbrainzClient { 47 + http, 48 + redis, 49 + cache_ttl: CACHE_TTL_SECS, 50 + }; 51 + 52 + let mut worker_conn = client.get_multiplexed_async_connection().await?; 53 + 54 + let http = me.http.clone(); 55 + tokio::spawn(async move { worker_loop(http, &mut worker_conn).await }); 56 + 57 + Ok(me) 12 58 } 13 59 14 60 pub async fn search(&self, query: &str) -> Result<Recordings, Error> { 15 - let url = format!("{}/recording", BASE_URL); 16 - let client = reqwest::Client::new(); 17 - let response = client 18 - .get(&url) 19 - .header("Accept", "application/json") 20 - .header("User-Agent", USER_AGENT) 21 - .query(&[("query", query), ("inc", "artist-credits+releases")]) 22 - .send() 61 + if let Some(h) = self.get_cache(&cache_key_search(query)).await? { 62 + return Ok(serde_json::from_str(&h).context("decode cached search")?); 63 + } 64 + let id = nanoid::nanoid!(); 65 + let job = Job::Search { 66 + id: id.clone(), 67 + query: query.to_string(), 68 + }; 69 + self.enqueue_if_needed(&job, &infl_key_search(query)) 23 70 .await?; 24 71 25 - Ok(response.json().await?) 72 + let raw = self.wait_for_response(&id).await?; 73 + let parsed: Recordings = serde_json::from_str(&raw).context("decode search response")?; 74 + 75 + self.set_cache(&cache_key_search(query), &raw).await?; 76 + Ok(parsed) 26 77 } 27 78 28 79 pub async fn get_recording(&self, mbid: &str) -> Result<Recording, Error> { 29 - let url = format!("{}/recording/{}", BASE_URL, mbid); 30 - let client = reqwest::Client::new(); 31 - let response = client 32 - .get(&url) 33 - .header("Accept", "application/json") 34 - .header("User-Agent", USER_AGENT) 35 - .query(&[("inc", "artist-credits+releases")]) 36 - .send() 37 - .await?; 80 + if let Some(h) = self.get_cache(&cache_key_rec(mbid)).await? { 81 + return Ok(serde_json::from_str(&h).context("decode cached recording")?); 82 + } 83 + let id = nanoid::nanoid!(); 84 + let job = Job::GetRecording { 85 + id: id.clone(), 86 + mbid: mbid.to_string(), 87 + }; 88 + self.enqueue_if_needed(&job, &infl_key_rec(mbid)).await?; 89 + let raw = self.wait_for_response(&id).await?; 90 + let parsed: Recording = serde_json::from_str(&raw).context("decode recording response")?; 91 + self.set_cache(&cache_key_rec(mbid), &raw).await?; 92 + Ok(parsed) 93 + } 94 + 95 + // ---------- Redis helpers ---------- 96 + 97 + async fn get_cache(&self, key: &str) -> Result<Option<String>, Error> { 98 + let mut r = self.redis.clone(); 99 + let val: Option<String> = r.get(key).await?; 100 + Ok(val) 101 + } 102 + 103 + async fn set_cache(&self, key: &str, json: &str) -> Result<(), Error> { 104 + let mut r = self.redis.clone(); 105 + let _: () = r 106 + .set_ex(key, json, self.cache_ttl) 107 + .await 108 + .with_context(|| format!("cache set {key}"))?; 109 + Ok(()) 110 + } 111 + 112 + async fn enqueue_if_needed(&self, job: &Job, inflight_key: &str) -> Result<(), Error> { 113 + let mut r = self.redis.clone(); 114 + 115 + // set NX to avoid duplicate work; short TTL 116 + let set: bool = r.set_nx(inflight_key, "1").await.context("set in-flight")?; 117 + if set { 118 + let _: () = r 119 + .expire(inflight_key, INFLIGHT_TTL_SECS) 120 + .await 121 + .context("expire inflight")?; 122 + let payload = serde_json::to_string(job).expect("serialize job"); 123 + let _: () = r.rpush(Q_QUEUE, payload).await.context("enqueue job")?; 124 + } 125 + Ok(()) 126 + } 127 + 128 + async fn wait_for_response(&self, id: &str) -> Result<String, Error> { 129 + let mut r = self.redis.clone(); 130 + let resp_q = resp_key(id); 131 + 132 + let fut = async { 133 + loop { 134 + let popped: Option<(String, String)> = r.brpop(&resp_q, 2.0).await?; 135 + if let Some((_key, json)) = popped { 136 + return Ok::<String, Error>(json); 137 + } 138 + } 139 + }; 140 + 141 + match timeout(Duration::from_secs(WAIT_TIMEOUT_SECS), fut).await { 142 + Ok(res) => res, 143 + Err(_) => Err(anyhow!("timed out waiting for MusicBrainz response")), 144 + } 145 + } 146 + } 147 + 148 + async fn worker_loop( 149 + http: reqwest::Client, 150 + redis: &mut MultiplexedConnection, 151 + ) -> Result<(), Error> { 152 + // pacing ticker: strictly 1 request/second 153 + let mut next_allowed = Instant::now(); 154 + 155 + loop { 156 + tokio::select! { 157 + res = async { 158 + 159 + // finite timeout pop 160 + let v: Option<Vec<String>> = redis.blpop(Q_QUEUE, 2.0).await.ok(); 161 + Ok::<_, Error>(v) 162 + } => { 163 + let Some(mut v) = res? else { 164 + continue }; 165 + if v.len() != 2 { 166 + continue; } 167 + let payload = v.pop().unwrap(); 168 + 169 + // 1 rps pacing 170 + let now = Instant::now(); 171 + if now < next_allowed { tokio::time::sleep(next_allowed - now).await; } 172 + next_allowed = Instant::now() + Duration::from_secs(1); 173 + 174 + let payload: Job = match serde_json::from_str(&payload) { 175 + Ok(j) => j, 176 + Err(e) => { 177 + tracing::error!(%e, "invalid job payload"); 178 + continue; 179 + } 180 + }; 181 + if let Err(e) = process_job(&http, redis, payload).await { 182 + tracing::error!(%e, "job failed"); 183 + } 184 + } 185 + } 186 + } 187 + } 188 + 189 + async fn process_job( 190 + http: &reqwest::Client, 191 + redis: &mut MultiplexedConnection, 192 + job: Job, 193 + ) -> Result<(), Error> { 194 + match job { 195 + Job::Search { id, query } => { 196 + let url = format!("{}/recording", BASE_URL); 197 + let resp = http 198 + .get(&url) 199 + .header("Accept", "application/json") 200 + .query(&[ 201 + ("query", query.as_str()), 202 + ("fmt", "json"), 203 + ("inc", "artists+releases+isrcs"), 204 + ]) 205 + .send() 206 + .await 207 + .context("http search")?; 208 + 209 + if !resp.status().is_success() { 210 + // Push an error payload so waiters donโ€™t hang forever 211 + let _ = push_response( 212 + redis, 213 + &id, 214 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 215 + ) 216 + .await; 217 + return Err(anyhow!("musicbrainz search http {}", resp.status())); 218 + } 219 + 220 + let text = resp.text().await.context("read body")?; 221 + push_response(redis, &id, &text).await?; 222 + } 223 + Job::GetRecording { id, mbid } => { 224 + let url = format!("{}/recording/{}", BASE_URL, mbid); 225 + let resp = http 226 + .get(&url) 227 + .header("Accept", "application/json") 228 + .query(&[("fmt", "json"), ("inc", "artists+releases+isrcs")]) 229 + .send() 230 + .await 231 + .context("http get_recording")?; 232 + 233 + if !resp.status().is_success() { 234 + let _ = push_response( 235 + redis, 236 + &id, 237 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 238 + ) 239 + .await; 240 + return Err(anyhow!("musicbrainz get_recording http {}", resp.status())); 241 + } 242 + 243 + let text = resp.text().await.context("read body")?; 244 + push_response(redis, &id, &text).await?; 245 + } 246 + } 247 + Ok(()) 248 + } 249 + 250 + async fn push_response( 251 + redis: &mut MultiplexedConnection, 252 + id: &str, 253 + json: &str, 254 + ) -> Result<(), Error> { 255 + let q = resp_key(id); 256 + // RPUSH then EXPIRE to avoid leaks if a client never BRPOPs 257 + let _: () = redis.rpush(&q, json).await?; 258 + let _: () = redis.expire(&q, WAIT_TIMEOUT_SECS as i64 + 5).await?; 259 + Ok(()) 260 + } 261 + 262 + fn cache_key_search(query: &str) -> String { 263 + format!("{}{}", CACHE_SEARCH_PREFIX, fast_hash(query)) 264 + } 265 + fn cache_key_rec(mbid: &str) -> String { 266 + format!("{}{}", CACHE_REC_PREFIX, mbid) 267 + } 268 + fn infl_key_search(query: &str) -> String { 269 + format!("{}search:{}", INFLIGHT_PREFIX, fast_hash(query)) 270 + } 271 + fn infl_key_rec(mbid: &str) -> String { 272 + format!("{}rec:{}", INFLIGHT_PREFIX, mbid) 273 + } 274 + fn resp_key(id: &str) -> String { 275 + format!("{}{}", RESP_PREFIX, id) 276 + } 277 + 278 + fn fast_hash(s: &str) -> u64 { 279 + use std::hash::{Hash, Hasher}; 280 + let mut h = std::collections::hash_map::DefaultHasher::new(); 281 + s.hash(&mut h); 282 + h.finish() 283 + } 284 + 285 + #[cfg(test)] 286 + mod tests { 287 + use super::*; 288 + use serial_test::serial; 289 + 290 + #[test] 291 + fn test_fast_hash() { 292 + let h1 = fast_hash("hello"); 293 + let h2 = fast_hash("hello"); 294 + let h3 = fast_hash("world"); 295 + assert_eq!(h1, h2); 296 + assert_ne!(h1, h3); 297 + } 298 + 299 + #[test] 300 + fn test_cache_keys() { 301 + let q = "test query"; 302 + let mbid = "some-mbid"; 303 + assert!(cache_key_search(q).starts_with(CACHE_SEARCH_PREFIX)); 304 + assert!(cache_key_rec(mbid).starts_with(CACHE_REC_PREFIX)); 305 + assert!(infl_key_search(q).starts_with(INFLIGHT_PREFIX)); 306 + assert!(infl_key_rec(mbid).starts_with(INFLIGHT_PREFIX)); 307 + assert!(resp_key("id").starts_with(RESP_PREFIX)); 308 + } 309 + 310 + #[tokio::test] 311 + #[serial] 312 + async fn test_musicbrainz_client() -> Result<(), Error> { 313 + let client = MusicbrainzClient::new().await?; 314 + let query = format!( 315 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 316 + "Come As You Are", "Nirvana" 317 + ); 318 + let search_res = client.search(&query).await?; 38 319 39 - Ok(response.json().await?) 320 + assert!(!search_res.recordings.is_empty()); 321 + let rec = &search_res.recordings[0]; 322 + let mbid = &rec.id; 323 + let rec_res = client.get_recording(mbid).await?; 324 + assert_eq!(rec_res.id, *mbid); 325 + Ok(()) 40 326 } 41 327 }
+278
crates/scrobbler/src/musicbrainz/mod.rs
··· 1 + use anyhow::Error; 2 + 3 + use crate::musicbrainz::{recording::Recordings, release::Release}; 4 + use std::cmp::Ordering; 5 + 1 6 pub mod artist; 2 7 pub mod client; 3 8 pub mod label; 4 9 pub mod recording; 5 10 pub mod release; 11 + 12 + fn get_best_release(releases: &[Release]) -> Option<Release> { 13 + if releases.is_empty() { 14 + return None; 15 + } 16 + 17 + // Remove the single filtering - this was causing the issue 18 + let mut candidates: Vec<&Release> = releases.iter().collect(); 19 + 20 + if candidates.is_empty() { 21 + return None; 22 + } 23 + 24 + candidates.sort_by(|a, b| cmp_release(a, b)); 25 + candidates.first().cloned().cloned() 26 + } 27 + 28 + pub fn get_best_release_from_recordings(all: &Recordings, artist: &str) -> Option<Release> { 29 + use std::collections::HashSet; 30 + 31 + let mut pool: Vec<Release> = Vec::new(); 32 + let mut seen: HashSet<String> = HashSet::new(); 33 + 34 + let all_recordings: Vec<&recording::Recording> = all 35 + .recordings 36 + .iter() 37 + .filter(|rec| { 38 + if let Some(credits) = &rec.artist_credit { 39 + artist_credit_contains(credits, artist) 40 + } else { 41 + false 42 + } 43 + }) 44 + .collect(); 45 + 46 + for rec in &all_recordings { 47 + if let Some(rels) = &rec.releases { 48 + for r in rels { 49 + if seen.insert(r.id.clone()) { 50 + pool.push(r.clone()); 51 + } 52 + } 53 + } 54 + } 55 + 56 + get_best_release(&pool) 57 + } 58 + 59 + fn cmp_release(a: &Release, b: &Release) -> Ordering { 60 + // First priority: prefer albums over singles 61 + let sa = is_single_release_type(a); 62 + let sb = is_single_release_type(b); 63 + if sa != sb { 64 + return bool_true_last(sa, sb); // Albums (false) come before singles (true) 65 + } 66 + 67 + let ta = release_tier(a.status.as_deref()); 68 + let tb = release_tier(b.status.as_deref()); 69 + if ta != tb { 70 + return ta.cmp(&tb); 71 + } 72 + 73 + let pa = has_preferred_country(a, &["XW", "US"]); 74 + let pb = has_preferred_country(b, &["XW", "US"]); 75 + if pa != pb { 76 + return bool_true_first(pa, pb); 77 + } 78 + 79 + let la = is_live_release(a); 80 + let lb = is_live_release(b); 81 + if la != lb { 82 + return bool_true_last(la, lb); 83 + } 84 + 85 + let da = date_key(a.date.as_deref()); 86 + let db = date_key(b.date.as_deref()); 87 + if da != db { 88 + return da.cmp(&db); 89 + } 90 + 91 + match a.title.cmp(&b.title) { 92 + Ordering::Equal => a.id.cmp(&b.id), 93 + ord => ord, 94 + } 95 + } 96 + 97 + fn release_tier(status: Option<&str>) -> u8 { 98 + match status.map(|s| s.to_ascii_lowercase()) { 99 + Some(s) if s == "official" => 0, 100 + Some(s) if s == "bootleg" => 1, 101 + _ => 2, 102 + } 103 + } 104 + 105 + fn bool_true_first(a: bool, b: bool) -> Ordering { 106 + match (a, b) { 107 + (true, false) => Ordering::Less, 108 + (false, true) => Ordering::Greater, 109 + _ => Ordering::Equal, 110 + } 111 + } 112 + 113 + fn bool_true_last(a: bool, b: bool) -> Ordering { 114 + match (a, b) { 115 + (true, false) => Ordering::Greater, 116 + (false, true) => Ordering::Less, 117 + _ => Ordering::Equal, 118 + } 119 + } 120 + 121 + fn is_single_release_type(rel: &Release) -> bool { 122 + if let Some(release_group) = &rel.release_group { 123 + if let Some(primary_type) = &release_group.primary_type { 124 + if primary_type.to_ascii_lowercase() == "single" { 125 + return true; 126 + } 127 + } 128 + } 129 + 130 + if rel.track_count == Some(1) { 131 + return true; 132 + } 133 + if let Some(media) = &rel.media { 134 + if media.len() == 1 && media[0].track_count == 1 { 135 + return true; 136 + } 137 + let total: u32 = media.iter().map(|m| m.track_count).sum(); 138 + if total == 1 { 139 + return true; 140 + } 141 + } 142 + false 143 + } 144 + 145 + fn has_preferred_country(rel: &Release, prefs: &[&str]) -> bool { 146 + if let Some(c) = rel.country.as_deref() { 147 + if prefs.iter().any(|p| *p == c) { 148 + return true; 149 + } 150 + } 151 + if let Some(events) = rel.release_events.as_ref() { 152 + for ev in events { 153 + if let Some(area) = &ev.area { 154 + if area 155 + .iso_3166_1_codes 156 + .iter() 157 + .any(|codes| prefs.iter().any(|p| codes.contains(&p.to_string()))) 158 + { 159 + return true; 160 + } 161 + } 162 + } 163 + } 164 + false 165 + } 166 + 167 + /// Convert "YYYY[-MM[-DD]]" into YYYYMMDD (missing parts โ†’ 01). Unknown dates sort last. 168 + fn date_key(d: Option<&str>) -> i32 { 169 + if let Some(d) = d { 170 + let mut parts = d.split('-'); 171 + let y = parts.next().unwrap_or("9999"); 172 + let m = parts.next().unwrap_or("01"); 173 + let day = parts.next().unwrap_or("01"); 174 + 175 + let y: i32 = y.parse().unwrap_or(9999); 176 + let m: i32 = m.parse().unwrap_or(1); 177 + let day: i32 = day.parse().unwrap_or(1); 178 + 179 + return y * 10000 + m * 100 + day; 180 + } 181 + 9_999_01_01 182 + } 183 + 184 + fn is_live_release(rel: &Release) -> bool { 185 + let t_live = rel.title.to_ascii_lowercase().contains("live"); 186 + let d_live = rel 187 + .disambiguation 188 + .as_ref() 189 + .map(|d| d.to_ascii_lowercase().contains("live")) 190 + .unwrap_or(false); 191 + t_live || d_live 192 + } 193 + 194 + fn artist_credit_contains(credits: &[artist::ArtistCredit], name: &str) -> bool { 195 + credits.iter().any(|c| c.name.eq_ignore_ascii_case(name)) 196 + } 197 + 198 + #[cfg(test)] 199 + mod tests { 200 + use crate::musicbrainz::client::MusicbrainzClient; 201 + use crate::musicbrainz::release::Media; 202 + use anyhow::Error; 203 + use serial_test::serial; 204 + 205 + use super::*; 206 + 207 + #[test] 208 + fn test_date_key() { 209 + assert_eq!(date_key(Some("2020-05-15")), 20200515); 210 + assert_eq!(date_key(Some("2020-05")), 20200501); 211 + assert_eq!(date_key(Some("2020")), 20200101); 212 + assert_eq!(date_key(None), 99990101); 213 + assert_eq!(date_key(Some("invalid-date")), 99990101); 214 + } 215 + 216 + #[test] 217 + fn test_is_single() { 218 + let rel1 = Release { 219 + track_count: Some(1), 220 + media: None, 221 + ..Default::default() 222 + }; 223 + assert!(is_single_release_type(&rel1)); 224 + let rel2 = Release { 225 + track_count: Some(2), 226 + media: Some(vec![ 227 + Media { 228 + track_count: 1, 229 + ..Default::default() 230 + }, 231 + Media { 232 + track_count: 1, 233 + ..Default::default() 234 + }, 235 + ]), 236 + ..Default::default() 237 + }; 238 + assert!(!is_single_release_type(&rel2)); 239 + } 240 + 241 + #[tokio::test] 242 + #[serial] 243 + async fn test_get_best_release_from_recordings() -> Result<(), Error> { 244 + let client = MusicbrainzClient::new().await?; 245 + let query = format!( 246 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 247 + "Smells Like Teen Spirit", "Nirvana" 248 + ); 249 + let recordings = client.search(&query).await?; 250 + let best = get_best_release_from_recordings(&recordings, "Nirvana"); 251 + assert!(best.is_some()); 252 + let best = best.unwrap(); 253 + assert_eq!(best.title, "Nevermind"); 254 + assert_eq!(best.status.as_deref(), Some("Official")); 255 + assert_eq!(best.country.as_deref(), Some("US")); 256 + 257 + let query = format!( 258 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 259 + "Medicine", "Joji" 260 + ); 261 + let recordings = client.search(&query).await?; 262 + let best = get_best_release_from_recordings(&recordings, "Joji"); 263 + assert!(best.is_some()); 264 + let best = best.unwrap(); 265 + assert_eq!(best.title, "Chloe Burbank Vol. 1"); 266 + assert_eq!(best.status.as_deref(), Some("Bootleg")); 267 + assert_eq!(best.country.as_deref(), Some("XW")); 268 + 269 + let query = format!( 270 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 271 + "Don't Stay", "Linkin Park" 272 + ); 273 + let recordings = client.search(&query).await?; 274 + let best = get_best_release_from_recordings(&recordings, "Linkin Park"); 275 + assert!(best.is_some()); 276 + let best = best.unwrap(); 277 + assert_eq!(best.title, "Meteora"); 278 + assert_eq!(best.status.as_deref(), Some("Official")); 279 + assert_eq!(best.country.as_deref(), Some("US")); 280 + 281 + Ok(()) 282 + } 283 + }
+26 -7
crates/scrobbler/src/musicbrainz/release.rs
··· 6 6 recording::Recording, 7 7 }; 8 8 9 - #[derive(Debug, Deserialize, Clone)] 9 + #[derive(Debug, Deserialize, Clone, Default)] 10 10 pub struct Release { 11 11 #[serde(rename = "release-events")] 12 12 pub release_events: Option<Vec<ReleaseEvent>>, ··· 24 24 #[serde(rename = "cover-art-archive")] 25 25 pub cover_art_archive: Option<CoverArtArchive>, 26 26 #[serde(rename = "artist-credit")] 27 - pub artist_credit: Vec<ArtistCredit>, 27 + pub artist_credit: Option<Vec<ArtistCredit>>, 28 28 #[serde(rename = "status-id")] 29 29 pub status_id: Option<String>, 30 30 #[serde(rename = "label-info")] ··· 33 33 pub date: Option<String>, 34 34 pub country: Option<String>, 35 35 pub asin: Option<String>, 36 + #[serde(rename = "track-count")] 37 + pub track_count: Option<u32>, 38 + #[serde(rename = "release-group")] 39 + pub release_group: Option<ReleaseGroup>, 36 40 } 37 41 38 - #[derive(Debug, Deserialize, Clone)] 42 + #[derive(Debug, Deserialize, Clone, Default)] 39 43 pub struct CoverArtArchive { 40 44 pub back: bool, 41 45 pub artwork: bool, ··· 44 48 pub darkened: bool, 45 49 } 46 50 47 - #[derive(Debug, Deserialize, Clone)] 51 + #[derive(Debug, Deserialize, Clone, Default)] 48 52 pub struct ReleaseEvent { 49 53 pub area: Option<Area>, 50 54 pub date: String, 51 55 } 52 56 53 - #[derive(Debug, Deserialize, Clone)] 57 + #[derive(Debug, Deserialize, Clone, Default)] 54 58 pub struct TextRepresentation { 55 59 pub language: Option<String>, 56 60 pub script: Option<String>, 57 61 } 58 62 59 - #[derive(Debug, Deserialize, Clone)] 63 + #[derive(Debug, Deserialize, Clone, Default)] 60 64 pub struct Media { 61 65 #[serde(rename = "format-id")] 62 66 pub format_id: Option<String>, ··· 87 91 pub title: String, 88 92 pub recording: Recording, 89 93 #[serde(rename = "artist-credit")] 90 - pub artist_credit: Vec<ArtistCredit>, 94 + pub artist_credit: Option<Vec<ArtistCredit>>, 91 95 pub number: String, 92 96 } 97 + 98 + #[derive(Debug, Deserialize, Clone, Default)] 99 + pub struct ReleaseGroup { 100 + pub id: String, 101 + pub title: String, 102 + #[serde(rename = "primary-type")] 103 + pub primary_type: Option<String>, 104 + #[serde(rename = "secondary-types")] 105 + pub secondary_types: Option<Vec<String>>, 106 + pub disambiguation: Option<String>, 107 + #[serde(rename = "first-release-date")] 108 + pub first_release_date: Option<String>, 109 + #[serde(rename = "artist-credit")] 110 + pub artist_credit: Option<Vec<ArtistCredit>>, 111 + }
+118 -69
crates/scrobbler/src/scrobbler.rs
··· 9 9 cache::Cache, 10 10 crypto::decrypt_aes_256_ctr, 11 11 listenbrainz::types::SubmitListensRequest, 12 - musicbrainz::client::MusicbrainzClient, 12 + musicbrainz::{ 13 + client::MusicbrainzClient, get_best_release_from_recordings, recording::Recording, 14 + }, 13 15 repo, rocksky, 14 16 spotify::{client::SpotifyClient, refresh_token}, 15 17 types::{Scrobble, Track}, ··· 94 96 pub async fn scrobble( 95 97 pool: &Pool<Postgres>, 96 98 cache: &Cache, 99 + mb_client: &MusicbrainzClient, 97 100 form: &BTreeMap<String, String>, 98 101 ) -> Result<Vec<Scrobble>, Error> { 99 102 let mut scrobbles = parse_batch(form)?; ··· 110 113 return Err(Error::msg("No Spotify tokens found")); 111 114 } 112 115 113 - let mb_client = MusicbrainzClient::new(); 114 - 115 116 for scrobble in &mut scrobbles { 116 117 /* 117 118 0. check if scrobble is cached ··· 224 225 } 225 226 226 227 let query = format!( 227 - r#"recording:"{}" AND artist:"{}""#, 228 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 228 229 scrobble.track, scrobble.artist 229 230 ); 230 231 let result = mb_client.search(&query).await?; ··· 248 249 pub async fn scrobble_v1( 249 250 pool: &Pool<Postgres>, 250 251 cache: &Cache, 252 + mb_client: &MusicbrainzClient, 251 253 form: &BTreeMap<String, String>, 252 254 ) -> Result<(), Error> { 253 255 let session_id = form.get("s").unwrap().to_string(); ··· 269 271 return Err(Error::msg("No Spotify tokens found")); 270 272 } 271 273 272 - let mb_client = MusicbrainzClient::new(); 273 - 274 274 let mut scrobble = Scrobble { 275 275 artist: artist.trim().to_string(), 276 276 track: track.trim().to_string(), ··· 375 375 376 376 377 377 378 + .await?; 378 379 380 + if let Some(track) = result.tracks.items.first() { 381 + let artists = track 382 + .artists 383 + .iter() 384 + .map(|a| a.name.to_lowercase().clone()) 385 + .collect::<Vec<_>>() 386 + .join(", ") 387 + .to_lowercase(); 379 388 389 + // check if artists don't contain the scrobble artist (to avoid wrong matches) 390 + if !artists.contains(&scrobble.artist.to_lowercase()) { 391 + tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping"); 392 + return Ok(()); 393 + } else { 394 + tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Spotify (track)"); 395 + scrobble.album = Some(track.album.name.clone()); 396 + let mut track = track.clone(); 397 + 398 + if let Some(album) = spotify_client.get_album(&track.album.id).await? { 399 + track.album = album; 400 + } 401 + 402 + if let Some(artist) = spotify_client 403 + .get_artist(&track.album.artists[0].id) 404 + .await? 405 + { 406 + track.album.artists[0] = artist; 407 + } 380 408 381 - 382 - 383 - 384 - 385 - 386 - 387 - 388 - 389 - 390 - 391 - 392 - 393 - 394 - 395 - 396 - 397 - 398 - 409 + rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?; 410 + tokio::time::sleep(std::time::Duration::from_secs(1)).await; 411 + return Ok(()); 412 + } 399 413 } 400 414 401 415 let query = format!( 402 - r#"recording:"{}" AND artist:"{}""#, 416 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 403 417 scrobble.track, scrobble.artist 404 418 ); 405 - let result = mb_client.search(&query).await?; 406 - 407 - if let Some(recording) = result.recordings.first() { 419 + let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await; 420 + if let Err(e) = result { 421 + tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Musicbrainz search error: {}", e); 422 + return Ok(()); 423 + } 424 + let result = result.unwrap(); 425 + if let Some(recording) = result { 408 426 let result = mb_client.get_recording(&recording.id).await?; 409 427 tracing::info!(%scrobble.artist, %scrobble.track, "Musicbrainz (recording)"); 410 428 scrobble.album = Some(Track::from(result.clone()).album); ··· 421 439 pub async fn scrobble_listenbrainz( 422 440 pool: &Pool<Postgres>, 423 441 cache: &Cache, 442 + mb_client: &MusicbrainzClient, 424 443 req: &SubmitListensRequest, 425 444 token: &str, 426 445 ) -> Result<(), Error> { ··· 503 522 return Err(Error::msg("No Spotify tokens found")); 504 523 } 505 524 506 - let mb_client = MusicbrainzClient::new(); 507 - 508 525 let mut scrobble = Scrobble { 509 526 artist: artist.trim().to_string(), 510 527 track: track.trim().to_string(), ··· 610 627 611 628 612 629 613 - 614 - 615 - 616 - 617 - 618 - 619 - 630 + let artists = track 631 + .artists 632 + .iter() 633 + .map(|a| a.name.to_lowercase().clone()) 634 + .collect::<Vec<_>>() 635 + .join(", ") 636 + .to_lowercase(); 620 637 621 638 // check if artists don't contain the scrobble artist (to avoid wrong matches) 622 639 if !artists.contains(&scrobble.artist.to_lowercase()) { 623 - tracing::warn!(artist = %artist, track = %track, "Artist mismatch, skipping"); 640 + tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping"); 624 641 return Ok(()); 625 - } 626 - 627 - 628 - 629 - 630 - 631 - 632 - 633 - 634 - 635 - 636 - 637 - 638 - 639 - 640 - 641 - 642 + } else { 643 + tracing::info!("Spotify (track)"); 644 + scrobble.album = Some(track.album.name.clone()); 645 + let mut track = track.clone(); 646 + 647 + if let Some(album) = spotify_client.get_album(&track.album.id).await? { 648 + track.album = album; 649 + } 650 + 651 + if let Some(artist) = spotify_client 652 + .get_artist(&track.album.artists[0].id) 653 + .await? 654 + { 655 + track.album.artists[0] = artist; 656 + } 642 657 643 - 644 - return Ok(()); 658 + rocksky::scrobble(cache, &did, track.into(), scrobble.timestamp).await?; 659 + tokio::time::sleep(std::time::Duration::from_secs(1)).await; 660 + return Ok(()); 661 + } 645 662 } 646 663 647 - // Temporary disable Musicbrainz search to reduce rate limiting issues 648 - // and because it often returns wrong results 649 - // we can re-enable it later with a retry mechanism 650 - /* 651 664 let query = format!( 652 - r#"recording:"{}" AND artist:"{}""#, 665 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 653 666 scrobble.track, scrobble.artist 654 667 ); 655 - let result = mb_client.search(&query).await?; 656 - 657 - if let Some(recording) = result.recordings.first() { 658 - let result = mb_client.get_recording(&recording.id).await?; 659 - println!("{}", "Musicbrainz (recording)".yellow()); 660 - scrobble.album = Some(Track::from(result.clone()).album); 668 + let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await; 669 + if let Err(e) = result { 670 + tracing::warn!(artist = %artist, track = %track, "Musicbrainz search error: {}", e); 671 + return Ok(()); 672 + } 673 + let result = result.unwrap(); 674 + if let Some(result) = result { 675 + tracing::info!("Musicbrainz (recording)"); 661 676 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?; 662 677 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 663 678 return Ok(()); 664 679 } 665 - */ 666 680 667 681 tracing::warn!(artist = %artist, track = %track, "Track not found, skipping"); 668 682 669 683 Ok(()) 670 684 } 685 + 686 + async fn search_musicbrainz_recording( 687 + query: &str, 688 + mb_client: &MusicbrainzClient, 689 + scrobble: &Scrobble, 690 + ) -> Result<Option<Recording>, Error> { 691 + let result = mb_client.search(&query).await; 692 + if let Err(e) = result { 693 + tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Musicbrainz search error: {}", e); 694 + return Ok(None); 695 + } 696 + let result = result.unwrap(); 697 + 698 + let release = get_best_release_from_recordings(&result, &scrobble.artist); 699 + 700 + if let Some(release) = release { 701 + let recording = result.recordings.into_iter().find(|r| { 702 + r.releases 703 + .as_ref() 704 + .map(|releases| releases.iter().any(|rel| rel.id == release.id)) 705 + .unwrap_or(false) 706 + }); 707 + if recording.is_none() { 708 + tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Recording not found in MusicBrainz result, skipping"); 709 + return Ok(None); 710 + } 711 + let recording = recording.unwrap(); 712 + let mut result = mb_client.get_recording(&recording.id).await?; 713 + tracing::info!("Musicbrainz (recording)"); 714 + result.releases = Some(vec![release]); 715 + return Ok(Some(result)); 716 + } 717 + 718 + Ok(None) 719 + }
+20 -4
crates/scrobbler/src/types.rs
··· 69 69 .map(|credit| credit.name.clone()) 70 70 .unwrap_or_default(); 71 71 let releases = recording.releases.unwrap_or_default(); 72 - let album_artist = releases 73 - .first() 74 - .and_then(|release| release.artist_credit.first()) 75 - .map(|credit| credit.name.clone()); 72 + let album_artist = releases.first().and_then(|release| { 73 + let credits = release.artist_credit.clone().unwrap_or_default(); 74 + credits.first().map(|credit| credit.name.clone()) 75 + }); 76 76 let album = releases 77 77 .first() 78 78 .map(|release| release.title.clone()) 79 + .unwrap_or_default(); 80 + let release_date = releases.first().and_then(|release| release.date.clone()); 81 + Track { 82 + title: recording.title.clone(), 83 + album, 84 + artist: artist_credit, 85 + album_artist, 86 + duration: recording.length.unwrap_or_default(), 87 + year: release_date 88 + .as_ref() 89 + .and_then(|date| date.split('-').next()) 90 + .and_then(|year| year.parse::<u32>().ok()), 91 + release_date: release_date.clone(), 92 + track_number: releases 93 + .first() 94 + .and_then(|release| {
+4
crates/webscrobbler/Cargo.toml
··· 39 39 actix-session = "0.10.1" 40 40 actix-limitation = "0.5.1" 41 41 tracing = "0.1.41" 42 + nanoid = "0.4.0" 43 + 44 + [dev-dependencies] 45 + serial_test = "3.0.0"
+7 -2
crates/webscrobbler/src/handlers.rs
··· 1 - use crate::{cache::Cache, consts::BANNER, repo, scrobbler::scrobble, types::ScrobbleRequest}; 1 + use crate::{ 2 + cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient, repo, 3 + scrobbler::scrobble, types::ScrobbleRequest, 4 + }; 2 5 use actix_web::{get, post, web, HttpRequest, HttpResponse, Responder}; 3 6 use owo_colors::OwoColorize; 4 7 use sqlx::{Pool, Postgres}; ··· 28 31 async fn handle_scrobble( 29 32 data: web::Data<Arc<Pool<Postgres>>>, 30 33 cache: web::Data<Cache>, 34 + mb_client: web::Data<Arc<MusicbrainzClient>>, 31 35 mut payload: web::Payload, 32 36 req: HttpRequest, 33 37 ) -> Result<impl Responder, actix_web::Error> { ··· 100 104 } 101 105 } 102 106 103 - scrobble(&pool, &cache, params, &user.did) 107 + let mb_client = mb_client.get_ref().as_ref(); 108 + scrobble(&pool, &cache, mb_client, params, &user.did) 104 109 .await 105 110 .map_err(|err| { 106 111 actix_web::error::ErrorInternalServerError(format!("Failed to scrobble: {}", err))
+5 -1
crates/webscrobbler/src/lib.rs
··· 11 11 use owo_colors::OwoColorize; 12 12 use sqlx::postgres::PgPoolOptions; 13 13 14 - use crate::{cache::Cache, consts::BANNER}; 14 + use crate::{cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient}; 15 15 16 16 pub mod auth; 17 17 pub mod cache; ··· 38 38 39 39 let conn = Arc::new(pool); 40 40 41 + let mb_client = MusicbrainzClient::new().await?; 42 + let mb_client = Arc::new(mb_client); 43 + 41 44 let host = env::var("WEBSCROBBLER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); 42 45 let port = env::var("WEBSCROBBLER_PORT") 43 46 .unwrap_or_else(|_| "7883".to_string()) ··· 65 68 .app_data(limiter.clone()) 66 69 .app_data(Data::new(conn.clone())) 67 70 .app_data(Data::new(cache.clone())) 71 + .app_data(Data::new(mb_client.clone())) 68 72 .service(handlers::index) 69 73 .service(handlers::handle_scrobble) 70 74 })
+2 -2
crates/webscrobbler/src/musicbrainz/artist.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 2 3 - #[derive(Debug, Deserialize, Clone)] 3 + #[derive(Debug, Deserialize, Clone, Default)] 4 4 pub struct Artist { 5 5 pub name: String, 6 6 #[serde(rename = "sort-name")] ··· 29 29 pub aliases: Option<Vec<Alias>>, 30 30 } 31 31 32 - #[derive(Debug, Deserialize, Clone)] 32 + #[derive(Debug, Deserialize, Clone, Default)] 33 33 pub struct ArtistCredit { 34 34 pub joinphrase: Option<String>, 35 35 pub name: String,
+310 -24
crates/webscrobbler/src/musicbrainz/client.rs
··· 1 + use std::env; 2 + 1 3 use super::recording::{Recording, Recordings}; 2 - use anyhow::Error; 4 + use anyhow::{anyhow, Context, Error}; 5 + use redis::aio::MultiplexedConnection; 6 + use redis::AsyncCommands; 7 + use serde::{Deserialize, Serialize}; 8 + use tokio::time::{timeout, Duration, Instant}; 3 9 4 10 pub const BASE_URL: &str = "https://musicbrainz.org/ws/2"; 5 - pub const USER_AGENT: &str = "Rocksky/0.1.0"; 11 + pub const USER_AGENT: &str = "Rocksky/0.1.0 (+https://rocksky.app)"; 12 + 13 + const Q_QUEUE: &str = "mb:queue:v1"; 14 + const CACHE_SEARCH_PREFIX: &str = "mb:cache:search:"; 15 + const CACHE_REC_PREFIX: &str = "mb:cache:rec:"; 16 + const INFLIGHT_PREFIX: &str = "mb:inflight:"; 17 + const RESP_PREFIX: &str = "mb:resp:"; 6 18 7 - pub struct MusicbrainzClient {} 19 + const CACHE_TTL_SECS: u64 = 60 * 60 * 24; 20 + const WAIT_TIMEOUT_SECS: u64 = 20; 21 + const INFLIGHT_TTL_SECS: i64 = 30; // de-dup window while the worker is fetching 22 + 23 + #[derive(Clone)] 24 + pub struct MusicbrainzClient { 25 + http: reqwest::Client, 26 + redis: MultiplexedConnection, 27 + cache_ttl: u64, 28 + } 29 + 30 + #[derive(Debug, Serialize, Deserialize)] 31 + #[serde(tag = "kind", rename_all = "snake_case")] 32 + enum Job { 33 + Search { id: String, query: String }, 34 + GetRecording { id: String, mbid: String }, 35 + } 8 36 9 37 impl MusicbrainzClient { 10 - pub fn new() -> Self { 11 - MusicbrainzClient {} 38 + pub async fn new() -> Result<Self, Error> { 39 + let client = 40 + redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))?; 41 + let redis = client.get_multiplexed_tokio_connection().await?; 42 + let http = reqwest::Client::builder() 43 + .user_agent(USER_AGENT) 44 + .build() 45 + .context("build http client")?; 46 + let me = MusicbrainzClient { 47 + http, 48 + redis, 49 + cache_ttl: CACHE_TTL_SECS, 50 + }; 51 + 52 + let mut worker_conn = client.get_multiplexed_async_connection().await?; 53 + 54 + let http = me.http.clone(); 55 + tokio::spawn(async move { worker_loop(http, &mut worker_conn).await }); 56 + 57 + Ok(me) 12 58 } 13 59 14 60 pub async fn search(&self, query: &str) -> Result<Recordings, Error> { 15 - let url = format!("{}/recording", BASE_URL); 16 - let client = reqwest::Client::new(); 17 - let response = client 18 - .get(&url) 19 - .header("Accept", "application/json") 20 - .header("User-Agent", USER_AGENT) 21 - .query(&[("query", query), ("inc", "artist-credits+releases")]) 22 - .send() 61 + if let Some(h) = self.get_cache(&cache_key_search(query)).await? { 62 + return Ok(serde_json::from_str(&h).context("decode cached search")?); 63 + } 64 + let id = nanoid::nanoid!(); 65 + let job = Job::Search { 66 + id: id.clone(), 67 + query: query.to_string(), 68 + }; 69 + self.enqueue_if_needed(&job, &infl_key_search(query)) 23 70 .await?; 24 71 25 - Ok(response.json().await?) 72 + let raw = self.wait_for_response(&id).await?; 73 + let parsed: Recordings = serde_json::from_str(&raw).context("decode search response")?; 74 + 75 + self.set_cache(&cache_key_search(query), &raw).await?; 76 + Ok(parsed) 26 77 } 27 78 28 79 pub async fn get_recording(&self, mbid: &str) -> Result<Recording, Error> { 29 - let url = format!("{}/recording/{}", BASE_URL, mbid); 30 - let client = reqwest::Client::new(); 31 - let response = client 32 - .get(&url) 33 - .header("Accept", "application/json") 34 - .header("User-Agent", USER_AGENT) 35 - .query(&[("inc", "artist-credits+releases")]) 36 - .send() 37 - .await?; 80 + if let Some(h) = self.get_cache(&cache_key_rec(mbid)).await? { 81 + return Ok(serde_json::from_str(&h).context("decode cached recording")?); 82 + } 83 + let id = nanoid::nanoid!(); 84 + let job = Job::GetRecording { 85 + id: id.clone(), 86 + mbid: mbid.to_string(), 87 + }; 88 + self.enqueue_if_needed(&job, &infl_key_rec(mbid)).await?; 89 + let raw = self.wait_for_response(&id).await?; 90 + let parsed: Recording = serde_json::from_str(&raw).context("decode recording response")?; 91 + self.set_cache(&cache_key_rec(mbid), &raw).await?; 92 + Ok(parsed) 93 + } 94 + 95 + // ---------- Redis helpers ---------- 96 + 97 + async fn get_cache(&self, key: &str) -> Result<Option<String>, Error> { 98 + let mut r = self.redis.clone(); 99 + let val: Option<String> = r.get(key).await?; 100 + Ok(val) 101 + } 102 + 103 + async fn set_cache(&self, key: &str, json: &str) -> Result<(), Error> { 104 + let mut r = self.redis.clone(); 105 + let _: () = r 106 + .set_ex(key, json, self.cache_ttl) 107 + .await 108 + .with_context(|| format!("cache set {key}"))?; 109 + Ok(()) 110 + } 111 + 112 + async fn enqueue_if_needed(&self, job: &Job, inflight_key: &str) -> Result<(), Error> { 113 + let mut r = self.redis.clone(); 114 + 115 + // set NX to avoid duplicate work; short TTL 116 + let set: bool = r.set_nx(inflight_key, "1").await.context("set in-flight")?; 117 + if set { 118 + let _: () = r 119 + .expire(inflight_key, INFLIGHT_TTL_SECS) 120 + .await 121 + .context("expire inflight")?; 122 + let payload = serde_json::to_string(job).expect("serialize job"); 123 + let _: () = r.rpush(Q_QUEUE, payload).await.context("enqueue job")?; 124 + } 125 + Ok(()) 126 + } 127 + 128 + async fn wait_for_response(&self, id: &str) -> Result<String, Error> { 129 + let mut r = self.redis.clone(); 130 + let resp_q = resp_key(id); 131 + 132 + let fut = async { 133 + loop { 134 + let popped: Option<(String, String)> = r.brpop(&resp_q, 2.0).await?; 135 + if let Some((_key, json)) = popped { 136 + return Ok::<String, Error>(json); 137 + } 138 + } 139 + }; 140 + 141 + match timeout(Duration::from_secs(WAIT_TIMEOUT_SECS), fut).await { 142 + Ok(res) => res, 143 + Err(_) => Err(anyhow!("timed out waiting for MusicBrainz response")), 144 + } 145 + } 146 + } 147 + 148 + async fn worker_loop( 149 + http: reqwest::Client, 150 + redis: &mut MultiplexedConnection, 151 + ) -> Result<(), Error> { 152 + // pacing ticker: strictly 1 request/second 153 + let mut next_allowed = Instant::now(); 154 + 155 + loop { 156 + tokio::select! { 157 + res = async { 158 + 159 + // finite timeout pop 160 + let v: Option<Vec<String>> = redis.blpop(Q_QUEUE, 2.0).await.ok(); 161 + Ok::<_, Error>(v) 162 + } => { 163 + let Some(mut v) = res? else { 164 + continue }; 165 + if v.len() != 2 { 166 + continue; } 167 + let payload = v.pop().unwrap(); 168 + 169 + // 1 rps pacing 170 + let now = Instant::now(); 171 + if now < next_allowed { tokio::time::sleep(next_allowed - now).await; } 172 + next_allowed = Instant::now() + Duration::from_secs(1); 173 + 174 + let payload: Job = match serde_json::from_str(&payload) { 175 + Ok(j) => j, 176 + Err(e) => { 177 + tracing::error!(%e, "invalid job payload"); 178 + continue; 179 + } 180 + }; 181 + if let Err(e) = process_job(&http, redis, payload).await { 182 + tracing::error!(%e, "job failed"); 183 + } 184 + } 185 + } 186 + } 187 + } 188 + 189 + async fn process_job( 190 + http: &reqwest::Client, 191 + redis: &mut MultiplexedConnection, 192 + job: Job, 193 + ) -> Result<(), Error> { 194 + match job { 195 + Job::Search { id, query } => { 196 + let url = format!("{}/recording", BASE_URL); 197 + let resp = http 198 + .get(&url) 199 + .header("Accept", "application/json") 200 + .query(&[ 201 + ("query", query.as_str()), 202 + ("fmt", "json"), 203 + ("inc", "artists+releases+isrcs"), 204 + ]) 205 + .send() 206 + .await 207 + .context("http search")?; 208 + 209 + if !resp.status().is_success() { 210 + // Push an error payload so waiters donโ€™t hang forever 211 + let _ = push_response( 212 + redis, 213 + &id, 214 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 215 + ) 216 + .await; 217 + return Err(anyhow!("musicbrainz search http {}", resp.status())); 218 + } 219 + 220 + let text = resp.text().await.context("read body")?; 221 + push_response(redis, &id, &text).await?; 222 + } 223 + Job::GetRecording { id, mbid } => { 224 + let url = format!("{}/recording/{}", BASE_URL, mbid); 225 + let resp = http 226 + .get(&url) 227 + .header("Accept", "application/json") 228 + .query(&[("fmt", "json"), ("inc", "artists+releases+isrcs")]) 229 + .send() 230 + .await 231 + .context("http get_recording")?; 232 + 233 + if !resp.status().is_success() { 234 + let _ = push_response( 235 + redis, 236 + &id, 237 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 238 + ) 239 + .await; 240 + return Err(anyhow!("musicbrainz get_recording http {}", resp.status())); 241 + } 242 + 243 + let text = resp.text().await.context("read body")?; 244 + push_response(redis, &id, &text).await?; 245 + } 246 + } 247 + Ok(()) 248 + } 249 + 250 + async fn push_response( 251 + redis: &mut MultiplexedConnection, 252 + id: &str, 253 + json: &str, 254 + ) -> Result<(), Error> { 255 + let q = resp_key(id); 256 + // RPUSH then EXPIRE to avoid leaks if a client never BRPOPs 257 + let _: () = redis.rpush(&q, json).await?; 258 + let _: () = redis.expire(&q, WAIT_TIMEOUT_SECS as i64 + 5).await?; 259 + Ok(()) 260 + } 261 + 262 + fn cache_key_search(query: &str) -> String { 263 + format!("{}{}", CACHE_SEARCH_PREFIX, fast_hash(query)) 264 + } 265 + fn cache_key_rec(mbid: &str) -> String { 266 + format!("{}{}", CACHE_REC_PREFIX, mbid) 267 + } 268 + fn infl_key_search(query: &str) -> String { 269 + format!("{}search:{}", INFLIGHT_PREFIX, fast_hash(query)) 270 + } 271 + fn infl_key_rec(mbid: &str) -> String { 272 + format!("{}rec:{}", INFLIGHT_PREFIX, mbid) 273 + } 274 + fn resp_key(id: &str) -> String { 275 + format!("{}{}", RESP_PREFIX, id) 276 + } 277 + 278 + fn fast_hash(s: &str) -> u64 { 279 + use std::hash::{Hash, Hasher}; 280 + let mut h = std::collections::hash_map::DefaultHasher::new(); 281 + s.hash(&mut h); 282 + h.finish() 283 + } 284 + 285 + #[cfg(test)] 286 + mod tests { 287 + use super::*; 288 + use serial_test::serial; 289 + 290 + #[test] 291 + fn test_fast_hash() { 292 + let h1 = fast_hash("hello"); 293 + let h2 = fast_hash("hello"); 294 + let h3 = fast_hash("world"); 295 + assert_eq!(h1, h2); 296 + assert_ne!(h1, h3); 297 + } 298 + 299 + #[test] 300 + fn test_cache_keys() { 301 + let q = "test query"; 302 + let mbid = "some-mbid"; 303 + assert!(cache_key_search(q).starts_with(CACHE_SEARCH_PREFIX)); 304 + assert!(cache_key_rec(mbid).starts_with(CACHE_REC_PREFIX)); 305 + assert!(infl_key_search(q).starts_with(INFLIGHT_PREFIX)); 306 + assert!(infl_key_rec(mbid).starts_with(INFLIGHT_PREFIX)); 307 + assert!(resp_key("id").starts_with(RESP_PREFIX)); 308 + } 309 + 310 + #[tokio::test] 311 + #[serial] 312 + async fn test_musicbrainz_client() -> Result<(), Error> { 313 + let client = MusicbrainzClient::new().await?; 314 + let query = format!( 315 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 316 + "Come As You Are", "Nirvana" 317 + ); 318 + let search_res = client.search(&query).await?; 38 319 39 - Ok(response.json().await?) 320 + assert!(!search_res.recordings.is_empty()); 321 + let rec = &search_res.recordings[0]; 322 + let mbid = &rec.id; 323 + let rec_res = client.get_recording(mbid).await?; 324 + assert_eq!(rec_res.id, *mbid); 325 + Ok(()) 40 326 } 41 327 }
+278
crates/webscrobbler/src/musicbrainz/mod.rs
··· 1 + use anyhow::Error; 2 + 3 + use crate::musicbrainz::{recording::Recordings, release::Release}; 4 + use std::cmp::Ordering; 5 + 1 6 pub mod artist; 2 7 pub mod client; 3 8 pub mod label; 4 9 pub mod recording; 5 10 pub mod release; 11 + 12 + fn get_best_release(releases: &[Release]) -> Option<Release> { 13 + if releases.is_empty() { 14 + return None; 15 + } 16 + 17 + // Remove the single filtering - this was causing the issue 18 + let mut candidates: Vec<&Release> = releases.iter().collect(); 19 + 20 + if candidates.is_empty() { 21 + return None; 22 + } 23 + 24 + candidates.sort_by(|a, b| cmp_release(a, b)); 25 + candidates.first().cloned().cloned() 26 + } 27 + 28 + pub fn get_best_release_from_recordings(all: &Recordings, artist: &str) -> Option<Release> { 29 + use std::collections::HashSet; 30 + 31 + let mut pool: Vec<Release> = Vec::new(); 32 + let mut seen: HashSet<String> = HashSet::new(); 33 + 34 + let all_recordings: Vec<&recording::Recording> = all 35 + .recordings 36 + .iter() 37 + .filter(|rec| { 38 + if let Some(credits) = &rec.artist_credit { 39 + artist_credit_contains(credits, artist) 40 + } else { 41 + false 42 + } 43 + }) 44 + .collect(); 45 + 46 + for rec in &all_recordings { 47 + if let Some(rels) = &rec.releases { 48 + for r in rels { 49 + if seen.insert(r.id.clone()) { 50 + pool.push(r.clone()); 51 + } 52 + } 53 + } 54 + } 55 + 56 + get_best_release(&pool) 57 + } 58 + 59 + fn cmp_release(a: &Release, b: &Release) -> Ordering { 60 + // First priority: prefer albums over singles 61 + let sa = is_single_release_type(a); 62 + let sb = is_single_release_type(b); 63 + if sa != sb { 64 + return bool_true_last(sa, sb); // Albums (false) come before singles (true) 65 + } 66 + 67 + let ta = release_tier(a.status.as_deref()); 68 + let tb = release_tier(b.status.as_deref()); 69 + if ta != tb { 70 + return ta.cmp(&tb); 71 + } 72 + 73 + let pa = has_preferred_country(a, &["XW", "US"]); 74 + let pb = has_preferred_country(b, &["XW", "US"]); 75 + if pa != pb { 76 + return bool_true_first(pa, pb); 77 + } 78 + 79 + let la = is_live_release(a); 80 + let lb = is_live_release(b); 81 + if la != lb { 82 + return bool_true_last(la, lb); 83 + } 84 + 85 + let da = date_key(a.date.as_deref()); 86 + let db = date_key(b.date.as_deref()); 87 + if da != db { 88 + return da.cmp(&db); 89 + } 90 + 91 + match a.title.cmp(&b.title) { 92 + Ordering::Equal => a.id.cmp(&b.id), 93 + ord => ord, 94 + } 95 + } 96 + 97 + fn release_tier(status: Option<&str>) -> u8 { 98 + match status.map(|s| s.to_ascii_lowercase()) { 99 + Some(s) if s == "official" => 0, 100 + Some(s) if s == "bootleg" => 1, 101 + _ => 2, 102 + } 103 + } 104 + 105 + fn bool_true_first(a: bool, b: bool) -> Ordering { 106 + match (a, b) { 107 + (true, false) => Ordering::Less, 108 + (false, true) => Ordering::Greater, 109 + _ => Ordering::Equal, 110 + } 111 + } 112 + 113 + fn bool_true_last(a: bool, b: bool) -> Ordering { 114 + match (a, b) { 115 + (true, false) => Ordering::Greater, 116 + (false, true) => Ordering::Less, 117 + _ => Ordering::Equal, 118 + } 119 + } 120 + 121 + fn is_single_release_type(rel: &Release) -> bool { 122 + if let Some(release_group) = &rel.release_group { 123 + if let Some(primary_type) = &release_group.primary_type { 124 + if primary_type.to_ascii_lowercase() == "single" { 125 + return true; 126 + } 127 + } 128 + } 129 + 130 + if rel.track_count == Some(1) { 131 + return true; 132 + } 133 + if let Some(media) = &rel.media { 134 + if media.len() == 1 && media[0].track_count == 1 { 135 + return true; 136 + } 137 + let total: u32 = media.iter().map(|m| m.track_count).sum(); 138 + if total == 1 { 139 + return true; 140 + } 141 + } 142 + false 143 + } 144 + 145 + fn has_preferred_country(rel: &Release, prefs: &[&str]) -> bool { 146 + if let Some(c) = rel.country.as_deref() { 147 + if prefs.iter().any(|p| *p == c) { 148 + return true; 149 + } 150 + } 151 + if let Some(events) = rel.release_events.as_ref() { 152 + for ev in events { 153 + if let Some(area) = &ev.area { 154 + if area 155 + .iso_3166_1_codes 156 + .iter() 157 + .any(|codes| prefs.iter().any(|p| codes.contains(&p.to_string()))) 158 + { 159 + return true; 160 + } 161 + } 162 + } 163 + } 164 + false 165 + } 166 + 167 + /// Convert "YYYY[-MM[-DD]]" into YYYYMMDD (missing parts โ†’ 01). Unknown dates sort last. 168 + fn date_key(d: Option<&str>) -> i32 { 169 + if let Some(d) = d { 170 + let mut parts = d.split('-'); 171 + let y = parts.next().unwrap_or("9999"); 172 + let m = parts.next().unwrap_or("01"); 173 + let day = parts.next().unwrap_or("01"); 174 + 175 + let y: i32 = y.parse().unwrap_or(9999); 176 + let m: i32 = m.parse().unwrap_or(1); 177 + let day: i32 = day.parse().unwrap_or(1); 178 + 179 + return y * 10000 + m * 100 + day; 180 + } 181 + 9_999_01_01 182 + } 183 + 184 + fn is_live_release(rel: &Release) -> bool { 185 + let t_live = rel.title.to_ascii_lowercase().contains("live"); 186 + let d_live = rel 187 + .disambiguation 188 + .as_ref() 189 + .map(|d| d.to_ascii_lowercase().contains("live")) 190 + .unwrap_or(false); 191 + t_live || d_live 192 + } 193 + 194 + fn artist_credit_contains(credits: &[artist::ArtistCredit], name: &str) -> bool { 195 + credits.iter().any(|c| c.name.eq_ignore_ascii_case(name)) 196 + } 197 + 198 + #[cfg(test)] 199 + mod tests { 200 + use crate::musicbrainz::client::MusicbrainzClient; 201 + use crate::musicbrainz::release::Media; 202 + use anyhow::Error; 203 + use serial_test::serial; 204 + 205 + use super::*; 206 + 207 + #[test] 208 + fn test_date_key() { 209 + assert_eq!(date_key(Some("2020-05-15")), 20200515); 210 + assert_eq!(date_key(Some("2020-05")), 20200501); 211 + assert_eq!(date_key(Some("2020")), 20200101); 212 + assert_eq!(date_key(None), 99990101); 213 + assert_eq!(date_key(Some("invalid-date")), 99990101); 214 + } 215 + 216 + #[test] 217 + fn test_is_single() { 218 + let rel1 = Release { 219 + track_count: Some(1), 220 + media: None, 221 + ..Default::default() 222 + }; 223 + assert!(is_single_release_type(&rel1)); 224 + let rel2 = Release { 225 + track_count: Some(2), 226 + media: Some(vec![ 227 + Media { 228 + track_count: 1, 229 + ..Default::default() 230 + }, 231 + Media { 232 + track_count: 1, 233 + ..Default::default() 234 + }, 235 + ]), 236 + ..Default::default() 237 + }; 238 + assert!(!is_single_release_type(&rel2)); 239 + } 240 + 241 + #[tokio::test] 242 + #[serial] 243 + async fn test_get_best_release_from_recordings() -> Result<(), Error> { 244 + let client = MusicbrainzClient::new().await?; 245 + let query = format!( 246 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 247 + "Smells Like Teen Spirit", "Nirvana" 248 + ); 249 + let recordings = client.search(&query).await?; 250 + let best = get_best_release_from_recordings(&recordings, "Nirvana"); 251 + assert!(best.is_some()); 252 + let best = best.unwrap(); 253 + assert_eq!(best.title, "Nevermind"); 254 + assert_eq!(best.status.as_deref(), Some("Official")); 255 + assert_eq!(best.country.as_deref(), Some("US")); 256 + 257 + let query = format!( 258 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 259 + "Medicine", "Joji" 260 + ); 261 + let recordings = client.search(&query).await?; 262 + let best = get_best_release_from_recordings(&recordings, "Joji"); 263 + assert!(best.is_some()); 264 + let best = best.unwrap(); 265 + assert_eq!(best.title, "Chloe Burbank Vol. 1"); 266 + assert_eq!(best.status.as_deref(), Some("Bootleg")); 267 + assert_eq!(best.country.as_deref(), Some("XW")); 268 + 269 + let query = format!( 270 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 271 + "Don't Stay", "Linkin Park" 272 + ); 273 + let recordings = client.search(&query).await?; 274 + let best = get_best_release_from_recordings(&recordings, "Linkin Park"); 275 + assert!(best.is_some()); 276 + let best = best.unwrap(); 277 + assert_eq!(best.title, "Meteora"); 278 + assert_eq!(best.status.as_deref(), Some("Official")); 279 + assert_eq!(best.country.as_deref(), Some("US")); 280 + 281 + Ok(()) 282 + } 283 + }
+26 -7
crates/webscrobbler/src/musicbrainz/release.rs
··· 6 6 recording::Recording, 7 7 }; 8 8 9 - #[derive(Debug, Deserialize, Clone)] 9 + #[derive(Debug, Deserialize, Clone, Default)] 10 10 pub struct Release { 11 11 #[serde(rename = "release-events")] 12 12 pub release_events: Option<Vec<ReleaseEvent>>, ··· 24 24 #[serde(rename = "cover-art-archive")] 25 25 pub cover_art_archive: Option<CoverArtArchive>, 26 26 #[serde(rename = "artist-credit")] 27 - pub artist_credit: Vec<ArtistCredit>, 27 + pub artist_credit: Option<Vec<ArtistCredit>>, 28 28 #[serde(rename = "status-id")] 29 29 pub status_id: Option<String>, 30 30 #[serde(rename = "label-info")] ··· 33 33 pub date: Option<String>, 34 34 pub country: Option<String>, 35 35 pub asin: Option<String>, 36 + #[serde(rename = "track-count")] 37 + pub track_count: Option<u32>, 38 + #[serde(rename = "release-group")] 39 + pub release_group: Option<ReleaseGroup>, 36 40 } 37 41 38 - #[derive(Debug, Deserialize, Clone)] 42 + #[derive(Debug, Deserialize, Clone, Default)] 39 43 pub struct CoverArtArchive { 40 44 pub back: bool, 41 45 pub artwork: bool, ··· 44 48 pub darkened: bool, 45 49 } 46 50 47 - #[derive(Debug, Deserialize, Clone)] 51 + #[derive(Debug, Deserialize, Clone, Default)] 48 52 pub struct ReleaseEvent { 49 53 pub area: Option<Area>, 50 54 pub date: String, 51 55 } 52 56 53 - #[derive(Debug, Deserialize, Clone)] 57 + #[derive(Debug, Deserialize, Clone, Default)] 54 58 pub struct TextRepresentation { 55 59 pub language: Option<String>, 56 60 pub script: Option<String>, 57 61 } 58 62 59 - #[derive(Debug, Deserialize, Clone)] 63 + #[derive(Debug, Deserialize, Clone, Default)] 60 64 pub struct Media { 61 65 #[serde(rename = "format-id")] 62 66 pub format_id: Option<String>, ··· 87 91 pub title: String, 88 92 pub recording: Recording, 89 93 #[serde(rename = "artist-credit")] 90 - pub artist_credit: Vec<ArtistCredit>, 94 + pub artist_credit: Option<Vec<ArtistCredit>>, 91 95 pub number: String, 92 96 } 97 + 98 + #[derive(Debug, Deserialize, Clone, Default)] 99 + pub struct ReleaseGroup { 100 + pub id: String, 101 + pub title: String, 102 + #[serde(rename = "primary-type")] 103 + pub primary_type: Option<String>, 104 + #[serde(rename = "secondary-types")] 105 + pub secondary_types: Option<Vec<String>>, 106 + pub disambiguation: Option<String>, 107 + #[serde(rename = "first-release-date")] 108 + pub first_release_date: Option<String>, 109 + #[serde(rename = "artist-credit")] 110 + pub artist_credit: Option<Vec<ArtistCredit>>, 111 + }
+76 -23
crates/webscrobbler/src/scrobbler.rs
··· 3 3 use crate::cache::Cache; 4 4 use crate::crypto::decrypt_aes_256_ctr; 5 5 use crate::musicbrainz::client::MusicbrainzClient; 6 + use crate::musicbrainz::get_best_release_from_recordings; 7 + use crate::musicbrainz::recording::Recording; 6 8 use crate::spotify::client::SpotifyClient; 7 9 use crate::spotify::refresh_token; 8 10 use crate::types::{ScrobbleRequest, Track}; ··· 15 17 pub async fn scrobble( 16 18 pool: &Pool<Postgres>, 17 19 cache: &Cache, 20 + mb_client: &MusicbrainzClient, 18 21 scrobble: ScrobbleRequest, 19 22 did: &str, 20 23 ) -> Result<(), Error> { ··· 24 27 return Err(Error::msg("No Spotify tokens found")); 25 28 } 26 29 27 - let mb_client = MusicbrainzClient::new(); 28 - 29 30 let key = format!( 30 31 "{} - {}", 31 32 scrobble.data.song.parsed.artist.to_lowercase(), ··· 127 128 let result = spotify_client.search(&query).await?; 128 129 129 130 if let Some(track) = result.tracks.items.first() { 130 - tracing::info!("Spotify (track)"); 131 - let mut track = track.clone(); 132 - 133 - 134 - 135 - 136 - 137 - 138 - 139 - 140 - 141 - 142 - 143 - 144 - 145 - 146 - 131 + let artists = track 132 + .artists 133 + .iter() 134 + .map(|a| a.name.to_lowercase().clone()) 135 + .collect::<Vec<_>>() 136 + .join(", ") 137 + .to_lowercase(); 138 + let artist = scrobble.data.song.parsed.artist.trim(); 139 + // check if artists don't contain the scrobble artist (to avoid wrong matches) 140 + if !artists.contains(&scrobble.data.song.parsed.artist.trim().to_lowercase()) { 141 + tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping"); 142 + } else { 143 + tracing::info!("Spotify (track)"); 144 + let mut track = track.clone(); 145 + 146 + if let Some(album) = spotify_client.get_album(&track.album.id).await? { 147 + track.album = album; 148 + } 149 + 150 + if let Some(artist) = spotify_client 151 + .get_artist(&track.album.artists[0].id) 152 + .await? 153 + { 154 + track.album.artists[0] = artist; 155 + } 156 + 157 + rocksky::scrobble(cache, &did, track.into(), scrobble.time).await?; 158 + tokio::time::sleep(std::time::Duration::from_secs(1)).await; 159 + return Ok(()); 160 + } 147 161 } 148 162 149 163 let query = format!( 150 - r#"recording:"{}" AND artist:"{}""#, 164 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 151 165 scrobble.data.song.parsed.track, scrobble.data.song.parsed.artist 152 166 ); 153 - let result = mb_client.search(&query).await?; 154 167 155 - if let Some(recording) = result.recordings.first() { 156 - let result = mb_client.get_recording(&recording.id).await?; 168 + let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await; 169 + if let Err(e) = result { 170 + tracing::warn!(artist = %scrobble.data.song.parsed.artist, track = %scrobble.data.song.parsed.track, "Musicbrainz search error: {}", e); 171 + return Ok(()); 172 + } 173 + let result = result.unwrap(); 174 + if let Some(result) = result { 157 175 tracing::info!("Musicbrainz (recording)"); 158 176 rocksky::scrobble(cache, &did, result.into(), scrobble.time).await?; 159 177 tokio::time::sleep(std::time::Duration::from_secs(1)).await; ··· 164 182 165 183 Ok(()) 166 184 } 185 + 186 + async fn search_musicbrainz_recording( 187 + query: &str, 188 + mb_client: &MusicbrainzClient, 189 + scrobble: &ScrobbleRequest, 190 + ) -> Result<Option<Recording>, Error> { 191 + let result = mb_client.search(&query).await; 192 + if let Err(e) = result { 193 + tracing::warn!(artist = %scrobble.data.song.parsed.artist, track = %scrobble.data.song.parsed.track, "Musicbrainz search error: {}", e); 194 + return Ok(None); 195 + } 196 + let result = result.unwrap(); 197 + 198 + let release = get_best_release_from_recordings(&result, &scrobble.data.song.parsed.artist); 199 + 200 + if let Some(release) = release { 201 + let recording = result.recordings.into_iter().find(|r| { 202 + r.releases 203 + .as_ref() 204 + .map(|releases| releases.iter().any(|rel| rel.id == release.id)) 205 + .unwrap_or(false) 206 + }); 207 + if recording.is_none() { 208 + tracing::warn!(artist = %scrobble.data.song.parsed.artist, track = %scrobble.data.song.parsed.track, "Recording not found in MusicBrainz result, skipping"); 209 + return Ok(None); 210 + } 211 + let recording = recording.unwrap(); 212 + let mut result = mb_client.get_recording(&recording.id).await?; 213 + tracing::info!("Musicbrainz (recording)"); 214 + result.releases = Some(vec![release]); 215 + return Ok(Some(result)); 216 + } 217 + 218 + Ok(None) 219 + }
+23 -3
crates/webscrobbler/src/types.rs
··· 158 158 159 159 160 160 161 - 162 - 161 + .map(|credit| credit.name.clone()) 162 + .unwrap_or_default(); 163 163 let releases = recording.releases.unwrap_or_default(); 164 + let release_date = releases.first().and_then(|release| release.date.clone()); 164 165 let album_artist = releases 165 166 .first() 166 - .and_then(|release| release.artist_credit.first()) 167 + .and_then(|release| { 168 + let credits = release.artist_credit.clone().unwrap_or_default(); 169 + credits.first().map(|credit| credit.clone()) 170 + }) 167 171 .map(|credit| credit.name.clone()); 168 172 let album = releases 169 173 .first() 174 + 175 + 176 + 177 + 178 + 179 + artist: artist_credit, 180 + album_artist, 181 + duration: recording.length.unwrap_or_default(), 182 + year: release_date 183 + .as_ref() 184 + .and_then(|date| date.split('-').next()) 185 + .and_then(|year| year.parse::<u32>().ok()), 186 + release_date: release_date.clone(), 187 + track_number: releases 188 + .first() 189 + .and_then(|release| {

Submissions

sign up or login to add to the discussion
tsiry-sandratraina.com submitted #1
3 commits
expand
feat: Enhance MusicBrainz integration and improve data handling
fix: update MusicBrainz recording handling to include release information
fix: improve artist matching logic in scrobble functions to prevent mismatches
pull request successfully merged
tsiry-sandratraina.com submitted #0
2 commits
expand
feat: Enhance MusicBrainz integration and improve data handling
fix: update MusicBrainz recording handling to include release information