A decentralized music tracking and discovery platform built on AT Protocol 🎵
listenbrainz spotify atproto lastfm musicbrainz scrobbling

feat: Enhance MusicBrainz integration and improve data handling

- Updated Release struct to include optional fields for artist credit, track count, and release group.
- Introduced ReleaseGroup struct for better organization of release data.
- Modified scrobble function to utilize MusicBrainzClient passed as a parameter, improving dependency management.
- Implemented search_musicbrainz_recording function to streamline MusicBrainz recording searches and handle errors gracefully.
- Enhanced caching mechanism for MusicBrainz responses to reduce redundant API calls.
- Improved query construction for MusicBrainz searches to include status filtering.
- Added tests for MusicBrainz client and release selection logic to ensure reliability.
- Refactored artist credit handling in Track conversion for better safety and clarity.
- Updated dependencies in Cargo.toml for improved functionality and testing capabilities.

+57 -4
Cargo.lock
··· 3185 3185 ] 3186 3186 3187 3187 [[package]] 3188 + name = "nanoid" 3189 + version = "0.4.0" 3190 + source = "registry+https://github.com/rust-lang/crates.io-index" 3191 + checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" 3192 + dependencies = [ 3193 + "rand 0.8.5", 3194 + ] 3195 + 3196 + [[package]] 3188 3197 name = "nkeys" 3189 3198 version = "0.4.4" 3190 3199 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 4697 4706 4698 4707 [[package]] 4699 4708 name = "regex" 4700 - version = "1.11.1" 4709 + version = "1.11.3" 4701 4710 source = "registry+https://github.com/rust-lang/crates.io-index" 4702 - checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" 4711 + checksum = "8b5288124840bee7b386bc413c487869b360b2b4ec421ea56425128692f2a82c" 4703 4712 dependencies = [ 4704 4713 "aho-corasick", 4705 4714 "memchr", ··· 4709 4718 4710 4719 [[package]] 4711 4720 name = "regex-automata" 4712 - version = "0.4.9" 4721 + version = "0.4.11" 4713 4722 source = "registry+https://github.com/rust-lang/crates.io-index" 4714 - checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" 4723 + checksum = "833eb9ce86d40ef33cb1306d8accf7bc8ec2bfea4355cbdebb3df68b40925cad" 4715 4724 dependencies = [ 4716 4725 "aho-corasick", 4717 4726 "memchr", ··· 4999 5008 "hex", 5000 5009 "jsonwebtoken", 5001 5010 "md5", 5011 + "nanoid", 5002 5012 "owo-colors", 5003 5013 "quick-xml 0.37.5", 5004 5014 "rand 0.9.2", ··· 5006 5016 "reqwest", 5007 5017 "serde", 5008 5018 "serde_json", 5019 + "serial_test", 5009 5020 "sqlx", 5010 5021 "tokio", 5011 5022 "tokio-stream", ··· 5089 5100 "hex", 5090 5101 "jsonwebtoken", 5091 5102 "md5", 5103 + "nanoid", 5092 5104 "owo-colors", 5093 5105 "rand 0.9.2", 5094 5106 "redis 0.29.5", 5095 5107 "reqwest", 5096 5108 "serde", 5097 5109 "serde_json", 5110 + "serial_test", 5098 5111 "sqlx", 5099 5112 "tokio", 5100 5113 "tokio-stream", ··· 5439 5452 ] 5440 5453 5441 5454 [[package]] 5455 + name = "scc" 5456 + version = "2.4.0" 5457 + source = "registry+https://github.com/rust-lang/crates.io-index" 5458 + checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc" 5459 + dependencies = [ 5460 + "sdd", 5461 + ] 5462 + 5463 + [[package]] 5442 5464 name = "schannel" 5443 5465 version = "0.1.27" 5444 5466 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5464 5486 ] 5465 5487 5466 5488 [[package]] 5489 + name = "sdd" 5490 + version = "3.0.10" 5491 + source = "registry+https://github.com/rust-lang/crates.io-index" 5492 + checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca" 5493 + 5494 + [[package]] 5467 5495 name = "seahash" 5468 5496 version = "4.1.0" 5469 5497 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5610 5638 "itoa", 5611 5639 "ryu", 5612 5640 "serde", 5641 + ] 5642 + 5643 + [[package]] 5644 + name = "serial_test" 5645 + version = "3.2.0" 5646 + source = "registry+https://github.com/rust-lang/crates.io-index" 5647 + checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" 5648 + dependencies = [ 5649 + "futures", 5650 + "log", 5651 + "once_cell", 5652 + "parking_lot", 5653 + "scc", 5654 + "serial_test_derive", 5655 + ] 5656 + 5657 + [[package]] 5658 + name = "serial_test_derive" 5659 + version = "3.2.0" 5660 + source = "registry+https://github.com/rust-lang/crates.io-index" 5661 + checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" 5662 + dependencies = [ 5663 + "proc-macro2", 5664 + "quote", 5665 + "syn 2.0.101", 5613 5666 ] 5614 5667 5615 5668 [[package]]
+5 -1
crates/scrobbler/Cargo.toml
··· 22 22 dotenv = "0.15.0" 23 23 anyhow = "1.0.96" 24 24 actix-web = "4.9.0" 25 - redis = "0.29.0" 25 + redis = { version = "0.29.0", features = ["tokio-comp"] } 26 26 hex = "0.4.3" 27 27 jsonwebtoken = "9.3.1" 28 28 md5 = "0.7.0" ··· 41 41 actix-session = "0.10.1" 42 42 tokio-stream = { version = "0.1.17", features = ["full"] } 43 43 tracing = "0.1.41" 44 + nanoid = "0.4.0" 45 + 46 + [dev-dependencies] 47 + serial_test = "3.0.0"
+15 -5
crates/scrobbler/src/handlers/mod.rs
··· 9 9 use v1::submission::submission; 10 10 11 11 use crate::cache::Cache; 12 + use crate::musicbrainz::client::MusicbrainzClient; 12 13 use crate::BANNER; 13 14 14 15 pub mod scrobble; ··· 43 44 pub async fn handle_submission( 44 45 data: web::Data<Arc<Pool<Postgres>>>, 45 46 cache: web::Data<Cache>, 47 + mb_client: web::Data<Arc<MusicbrainzClient>>, 46 48 form: web::Form<BTreeMap<String, String>>, 47 49 ) -> impl Responder { 48 - submission(form.into_inner(), cache.get_ref(), data.get_ref()) 49 - .await 50 - .map_err(actix_web::error::ErrorInternalServerError) 50 + submission( 51 + form.into_inner(), 52 + cache.get_ref(), 53 + data.get_ref(), 54 + mb_client.get_ref(), 55 + ) 56 + .await 57 + .map_err(actix_web::error::ErrorInternalServerError) 51 58 } 52 59 53 60 #[get("/2.0")] ··· 60 67 data: web::Data<Arc<Pool<Postgres>>>, 61 68 cache: web::Data<Cache>, 62 69 form: web::Form<BTreeMap<String, String>>, 70 + mb_client: web::Data<Arc<MusicbrainzClient>>, 63 71 ) -> impl Responder { 64 72 let conn = data.get_ref(); 65 73 let cache = cache.get_ref(); 74 + let mb_client = mb_client.get_ref(); 66 75 67 76 let method = form.get("method").unwrap_or(&"".to_string()).to_string(); 68 - call_method(&method, conn, cache, form.into_inner()) 77 + call_method(&method, conn, cache, mb_client, form.into_inner()) 69 78 .await 70 79 .map_err(actix_web::error::ErrorInternalServerError) 71 80 } ··· 74 83 method: &str, 75 84 pool: &Arc<Pool<Postgres>>, 76 85 cache: &Cache, 86 + mb_client: &Arc<MusicbrainzClient>, 77 87 form: BTreeMap<String, String>, 78 88 ) -> Result<HttpResponse, Error> { 79 89 match method { 80 - "track.scrobble" => handle_scrobble(form, pool, cache).await, 90 + "track.scrobble" => handle_scrobble(form, pool, cache, mb_client).await, 81 91 _ => Err(Error::msg(format!("Unsupported method: {}", method))), 82 92 } 83 93 }
+4 -3
crates/scrobbler/src/handlers/scrobble.rs
··· 5 5 use std::collections::BTreeMap; 6 6 7 7 use crate::{ 8 - auth::authenticate, cache::Cache, params::validate_scrobble_params, response::build_response, 9 - scrobbler::scrobble, 8 + auth::authenticate, cache::Cache, musicbrainz::client::MusicbrainzClient, 9 + params::validate_scrobble_params, response::build_response, scrobbler::scrobble, 10 10 }; 11 11 12 12 pub async fn handle_scrobble( 13 13 form: BTreeMap<String, String>, 14 14 conn: &Pool<sqlx::Postgres>, 15 15 cache: &Cache, 16 + mb_client: &MusicbrainzClient, 16 17 ) -> Result<HttpResponse, Error> { 17 18 let params = match validate_scrobble_params(&form, &["api_key", "api_sig", "sk", "method"]) { 18 19 Ok(params) => params, ··· 31 32 }))); 32 33 } 33 34 34 - match scrobble(&conn, cache, &form).await { 35 + match scrobble(&conn, cache, mb_client, &form).await { 35 36 Ok(scrobbles) => Ok(HttpResponse::Ok().json(build_response(scrobbles))), 36 37 Err(e) => { 37 38 if e.to_string().contains("Timestamp") {
+4 -2
crates/scrobbler/src/handlers/v1/submission.rs
··· 4 4 use std::{collections::BTreeMap, sync::Arc}; 5 5 6 6 use crate::{ 7 - auth::verify_session_id, cache::Cache, params::validate_required_params, scrobbler::scrobble_v1, 7 + auth::verify_session_id, cache::Cache, musicbrainz::client::MusicbrainzClient, 8 + params::validate_required_params, scrobbler::scrobble_v1, 8 9 }; 9 10 10 11 pub async fn submission( 11 12 form: BTreeMap<String, String>, 12 13 cache: &Cache, 13 14 pool: &Arc<sqlx::Pool<sqlx::Postgres>>, 15 + mb_client: &Arc<MusicbrainzClient>, 14 16 ) -> Result<HttpResponse, Error> { 15 17 match validate_required_params(&form, &["s", "a[0]", "t[0]", "i[0]"]) { 16 18 Ok(_) => { ··· 30 32 let user_id = user_id.unwrap(); 31 33 tracing::info!(artist = %a, track = %t, timestamp = %i, user_id = %user_id, "Submission"); 32 34 33 - match scrobble_v1(pool, cache, &form).await { 35 + match scrobble_v1(pool, cache, mb_client, &form).await { 34 36 Ok(_) => Ok(HttpResponse::Ok().body("OK\n")), 35 37 Err(e) => Ok(HttpResponse::BadRequest().json(json!({ 36 38 "error": 4,
+5 -1
crates/scrobbler/src/lib.rs
··· 27 27 use owo_colors::OwoColorize; 28 28 use sqlx::postgres::PgPoolOptions; 29 29 30 - use crate::cache::Cache; 30 + use crate::{cache::Cache, musicbrainz::client::MusicbrainzClient}; 31 31 32 32 pub const BANNER: &str = r#" 33 33 ___ ___ _____ __ __ __ ··· 71 71 .unwrap(), 72 72 ); 73 73 74 + let mb_client = MusicbrainzClient::new().await?; 75 + let mb_client = Arc::new(mb_client); 76 + 74 77 HttpServer::new(move || { 75 78 App::new() 76 79 .wrap(RateLimiter::default()) 77 80 .app_data(limiter.clone()) 78 81 .app_data(Data::new(conn.clone())) 79 82 .app_data(Data::new(cache.clone())) 83 + .app_data(Data::new(mb_client.clone())) 80 84 .service(handlers::handle_methods) 81 85 .service(handlers::handle_nowplaying) 82 86 .service(handlers::handle_submission)
+3 -1
crates/scrobbler/src/listenbrainz/core/submit.rs
··· 5 5 use std::sync::Arc; 6 6 7 7 use crate::auth::decode_token; 8 + use crate::musicbrainz::client::MusicbrainzClient; 8 9 use crate::repo; 9 10 use crate::{cache::Cache, scrobbler::scrobble_listenbrainz}; 10 11 ··· 14 15 payload: SubmitListensRequest, 15 16 cache: &Cache, 16 17 pool: &Arc<sqlx::Pool<sqlx::Postgres>>, 18 + mb_client: &Arc<MusicbrainzClient>, 17 19 token: &str, 18 20 ) -> Result<HttpResponse, Error> { 19 21 if payload.listen_type != "playing_now" { ··· 29 31 30 32 const RETRIES: usize = 15; 31 33 for attempt in 1..=RETRIES { 32 - match scrobble_listenbrainz(pool, cache, &payload, token).await { 34 + match scrobble_listenbrainz(pool, cache, mb_client, &payload, token).await { 33 35 Ok(_) => { 34 36 return Ok(HttpResponse::Ok().json(json!({ 35 37 "status": "ok",
+4 -1
crates/scrobbler/src/listenbrainz/handlers.rs
··· 16 16 }, 17 17 types::SubmitListensRequest, 18 18 }, 19 + musicbrainz::client::MusicbrainzClient, 19 20 read_payload, repo, 20 21 }; 21 22 use tokio_stream::StreamExt; ··· 39 40 req: HttpRequest, 40 41 data: web::Data<Arc<Pool<Postgres>>>, 41 42 cache: web::Data<Cache>, 43 + mb_client: web::Data<Arc<MusicbrainzClient>>, 42 44 mut payload: web::Payload, 43 45 ) -> impl Responder { 44 46 let token = match req.headers().get("Authorization") { ··· 63 65 }) 64 66 .map_err(actix_web::error::ErrorBadRequest)?; 65 67 66 - submit_listens(req, cache.get_ref(), data.get_ref(), token) 68 + let mb_client = mb_client.get_ref(); 69 + submit_listens(req, cache.get_ref(), data.get_ref(), &mb_client, token) 67 70 .await 68 71 .map_err(actix_web::error::ErrorInternalServerError) 69 72 }
+2 -2
crates/scrobbler/src/musicbrainz/artist.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 2 3 - #[derive(Debug, Deserialize, Clone)] 3 + #[derive(Debug, Deserialize, Clone, Default)] 4 4 pub struct Artist { 5 5 pub name: String, 6 6 #[serde(rename = "sort-name")] ··· 29 29 pub aliases: Option<Vec<Alias>>, 30 30 } 31 31 32 - #[derive(Debug, Deserialize, Clone)] 32 + #[derive(Debug, Deserialize, Clone, Default)] 33 33 pub struct ArtistCredit { 34 34 pub joinphrase: Option<String>, 35 35 pub name: String,
+310 -24
crates/scrobbler/src/musicbrainz/client.rs
··· 1 + use std::env; 2 + 1 3 use super::recording::{Recording, Recordings}; 2 - use anyhow::Error; 4 + use anyhow::{anyhow, Context, Error}; 5 + use redis::aio::MultiplexedConnection; 6 + use redis::AsyncCommands; 7 + use serde::{Deserialize, Serialize}; 8 + use tokio::time::{timeout, Duration, Instant}; 3 9 4 10 pub const BASE_URL: &str = "https://musicbrainz.org/ws/2"; 5 - pub const USER_AGENT: &str = "Rocksky/0.1.0"; 11 + pub const USER_AGENT: &str = "Rocksky/0.1.0 (+https://rocksky.app)"; 12 + 13 + const Q_QUEUE: &str = "mb:queue:v1"; 14 + const CACHE_SEARCH_PREFIX: &str = "mb:cache:search:"; 15 + const CACHE_REC_PREFIX: &str = "mb:cache:rec:"; 16 + const INFLIGHT_PREFIX: &str = "mb:inflight:"; 17 + const RESP_PREFIX: &str = "mb:resp:"; 6 18 7 - pub struct MusicbrainzClient {} 19 + const CACHE_TTL_SECS: u64 = 60 * 60 * 24; 20 + const WAIT_TIMEOUT_SECS: u64 = 20; 21 + const INFLIGHT_TTL_SECS: i64 = 30; // de-dup window while the worker is fetching 22 + 23 + #[derive(Clone)] 24 + pub struct MusicbrainzClient { 25 + http: reqwest::Client, 26 + redis: MultiplexedConnection, 27 + cache_ttl: u64, 28 + } 29 + 30 + #[derive(Debug, Serialize, Deserialize)] 31 + #[serde(tag = "kind", rename_all = "snake_case")] 32 + enum Job { 33 + Search { id: String, query: String }, 34 + GetRecording { id: String, mbid: String }, 35 + } 8 36 9 37 impl MusicbrainzClient { 10 - pub fn new() -> Self { 11 - MusicbrainzClient {} 38 + pub async fn new() -> Result<Self, Error> { 39 + let client = 40 + redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))?; 41 + let redis = client.get_multiplexed_tokio_connection().await?; 42 + let http = reqwest::Client::builder() 43 + .user_agent(USER_AGENT) 44 + .build() 45 + .context("build http client")?; 46 + let me = MusicbrainzClient { 47 + http, 48 + redis, 49 + cache_ttl: CACHE_TTL_SECS, 50 + }; 51 + 52 + let mut worker_conn = client.get_multiplexed_async_connection().await?; 53 + 54 + let http = me.http.clone(); 55 + tokio::spawn(async move { worker_loop(http, &mut worker_conn).await }); 56 + 57 + Ok(me) 12 58 } 13 59 14 60 pub async fn search(&self, query: &str) -> Result<Recordings, Error> { 15 - let url = format!("{}/recording", BASE_URL); 16 - let client = reqwest::Client::new(); 17 - let response = client 18 - .get(&url) 19 - .header("Accept", "application/json") 20 - .header("User-Agent", USER_AGENT) 21 - .query(&[("query", query), ("inc", "artist-credits+releases")]) 22 - .send() 61 + if let Some(h) = self.get_cache(&cache_key_search(query)).await? { 62 + return Ok(serde_json::from_str(&h).context("decode cached search")?); 63 + } 64 + let id = nanoid::nanoid!(); 65 + let job = Job::Search { 66 + id: id.clone(), 67 + query: query.to_string(), 68 + }; 69 + self.enqueue_if_needed(&job, &infl_key_search(query)) 23 70 .await?; 24 71 25 - Ok(response.json().await?) 72 + let raw = self.wait_for_response(&id).await?; 73 + let parsed: Recordings = serde_json::from_str(&raw).context("decode search response")?; 74 + 75 + self.set_cache(&cache_key_search(query), &raw).await?; 76 + Ok(parsed) 26 77 } 27 78 28 79 pub async fn get_recording(&self, mbid: &str) -> Result<Recording, Error> { 29 - let url = format!("{}/recording/{}", BASE_URL, mbid); 30 - let client = reqwest::Client::new(); 31 - let response = client 32 - .get(&url) 33 - .header("Accept", "application/json") 34 - .header("User-Agent", USER_AGENT) 35 - .query(&[("inc", "artist-credits+releases")]) 36 - .send() 37 - .await?; 80 + if let Some(h) = self.get_cache(&cache_key_rec(mbid)).await? { 81 + return Ok(serde_json::from_str(&h).context("decode cached recording")?); 82 + } 83 + let id = nanoid::nanoid!(); 84 + let job = Job::GetRecording { 85 + id: id.clone(), 86 + mbid: mbid.to_string(), 87 + }; 88 + self.enqueue_if_needed(&job, &infl_key_rec(mbid)).await?; 89 + let raw = self.wait_for_response(&id).await?; 90 + let parsed: Recording = serde_json::from_str(&raw).context("decode recording response")?; 91 + self.set_cache(&cache_key_rec(mbid), &raw).await?; 92 + Ok(parsed) 93 + } 94 + 95 + // ---------- Redis helpers ---------- 96 + 97 + async fn get_cache(&self, key: &str) -> Result<Option<String>, Error> { 98 + let mut r = self.redis.clone(); 99 + let val: Option<String> = r.get(key).await?; 100 + Ok(val) 101 + } 102 + 103 + async fn set_cache(&self, key: &str, json: &str) -> Result<(), Error> { 104 + let mut r = self.redis.clone(); 105 + let _: () = r 106 + .set_ex(key, json, self.cache_ttl) 107 + .await 108 + .with_context(|| format!("cache set {key}"))?; 109 + Ok(()) 110 + } 111 + 112 + async fn enqueue_if_needed(&self, job: &Job, inflight_key: &str) -> Result<(), Error> { 113 + let mut r = self.redis.clone(); 114 + 115 + // set NX to avoid duplicate work; short TTL 116 + let set: bool = r.set_nx(inflight_key, "1").await.context("set in-flight")?; 117 + if set { 118 + let _: () = r 119 + .expire(inflight_key, INFLIGHT_TTL_SECS) 120 + .await 121 + .context("expire inflight")?; 122 + let payload = serde_json::to_string(job).expect("serialize job"); 123 + let _: () = r.rpush(Q_QUEUE, payload).await.context("enqueue job")?; 124 + } 125 + Ok(()) 126 + } 127 + 128 + async fn wait_for_response(&self, id: &str) -> Result<String, Error> { 129 + let mut r = self.redis.clone(); 130 + let resp_q = resp_key(id); 131 + 132 + let fut = async { 133 + loop { 134 + let popped: Option<(String, String)> = r.brpop(&resp_q, 2.0).await?; 135 + if let Some((_key, json)) = popped { 136 + return Ok::<String, Error>(json); 137 + } 138 + } 139 + }; 140 + 141 + match timeout(Duration::from_secs(WAIT_TIMEOUT_SECS), fut).await { 142 + Ok(res) => res, 143 + Err(_) => Err(anyhow!("timed out waiting for MusicBrainz response")), 144 + } 145 + } 146 + } 147 + 148 + async fn worker_loop( 149 + http: reqwest::Client, 150 + redis: &mut MultiplexedConnection, 151 + ) -> Result<(), Error> { 152 + // pacing ticker: strictly 1 request/second 153 + let mut next_allowed = Instant::now(); 38 154 39 - Ok(response.json().await?) 155 + loop { 156 + tokio::select! { 157 + res = async { 158 + 159 + // finite timeout pop 160 + let v: Option<Vec<String>> = redis.blpop(Q_QUEUE, 2.0).await.ok(); 161 + Ok::<_, Error>(v) 162 + } => { 163 + let Some(mut v) = res? else { 164 + continue }; 165 + if v.len() != 2 { 166 + continue; } 167 + let payload = v.pop().unwrap(); 168 + 169 + // 1 rps pacing 170 + let now = Instant::now(); 171 + if now < next_allowed { tokio::time::sleep(next_allowed - now).await; } 172 + next_allowed = Instant::now() + Duration::from_secs(1); 173 + 174 + let payload: Job = match serde_json::from_str(&payload) { 175 + Ok(j) => j, 176 + Err(e) => { 177 + tracing::error!(%e, "invalid job payload"); 178 + continue; 179 + } 180 + }; 181 + if let Err(e) = process_job(&http, redis, payload).await { 182 + tracing::error!(%e, "job failed"); 183 + } 184 + } 185 + } 186 + } 187 + } 188 + 189 + async fn process_job( 190 + http: &reqwest::Client, 191 + redis: &mut MultiplexedConnection, 192 + job: Job, 193 + ) -> Result<(), Error> { 194 + match job { 195 + Job::Search { id, query } => { 196 + let url = format!("{}/recording", BASE_URL); 197 + let resp = http 198 + .get(&url) 199 + .header("Accept", "application/json") 200 + .query(&[ 201 + ("query", query.as_str()), 202 + ("fmt", "json"), 203 + ("inc", "artists+releases+isrcs"), 204 + ]) 205 + .send() 206 + .await 207 + .context("http search")?; 208 + 209 + if !resp.status().is_success() { 210 + // Push an error payload so waiters don’t hang forever 211 + let _ = push_response( 212 + redis, 213 + &id, 214 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 215 + ) 216 + .await; 217 + return Err(anyhow!("musicbrainz search http {}", resp.status())); 218 + } 219 + 220 + let text = resp.text().await.context("read body")?; 221 + push_response(redis, &id, &text).await?; 222 + } 223 + Job::GetRecording { id, mbid } => { 224 + let url = format!("{}/recording/{}", BASE_URL, mbid); 225 + let resp = http 226 + .get(&url) 227 + .header("Accept", "application/json") 228 + .query(&[("fmt", "json"), ("inc", "artists+releases+isrcs")]) 229 + .send() 230 + .await 231 + .context("http get_recording")?; 232 + 233 + if !resp.status().is_success() { 234 + let _ = push_response( 235 + redis, 236 + &id, 237 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 238 + ) 239 + .await; 240 + return Err(anyhow!("musicbrainz get_recording http {}", resp.status())); 241 + } 242 + 243 + let text = resp.text().await.context("read body")?; 244 + push_response(redis, &id, &text).await?; 245 + } 246 + } 247 + Ok(()) 248 + } 249 + 250 + async fn push_response( 251 + redis: &mut MultiplexedConnection, 252 + id: &str, 253 + json: &str, 254 + ) -> Result<(), Error> { 255 + let q = resp_key(id); 256 + // RPUSH then EXPIRE to avoid leaks if a client never BRPOPs 257 + let _: () = redis.rpush(&q, json).await?; 258 + let _: () = redis.expire(&q, WAIT_TIMEOUT_SECS as i64 + 5).await?; 259 + Ok(()) 260 + } 261 + 262 + fn cache_key_search(query: &str) -> String { 263 + format!("{}{}", CACHE_SEARCH_PREFIX, fast_hash(query)) 264 + } 265 + fn cache_key_rec(mbid: &str) -> String { 266 + format!("{}{}", CACHE_REC_PREFIX, mbid) 267 + } 268 + fn infl_key_search(query: &str) -> String { 269 + format!("{}search:{}", INFLIGHT_PREFIX, fast_hash(query)) 270 + } 271 + fn infl_key_rec(mbid: &str) -> String { 272 + format!("{}rec:{}", INFLIGHT_PREFIX, mbid) 273 + } 274 + fn resp_key(id: &str) -> String { 275 + format!("{}{}", RESP_PREFIX, id) 276 + } 277 + 278 + fn fast_hash(s: &str) -> u64 { 279 + use std::hash::{Hash, Hasher}; 280 + let mut h = std::collections::hash_map::DefaultHasher::new(); 281 + s.hash(&mut h); 282 + h.finish() 283 + } 284 + 285 + #[cfg(test)] 286 + mod tests { 287 + use super::*; 288 + use serial_test::serial; 289 + 290 + #[test] 291 + fn test_fast_hash() { 292 + let h1 = fast_hash("hello"); 293 + let h2 = fast_hash("hello"); 294 + let h3 = fast_hash("world"); 295 + assert_eq!(h1, h2); 296 + assert_ne!(h1, h3); 297 + } 298 + 299 + #[test] 300 + fn test_cache_keys() { 301 + let q = "test query"; 302 + let mbid = "some-mbid"; 303 + assert!(cache_key_search(q).starts_with(CACHE_SEARCH_PREFIX)); 304 + assert!(cache_key_rec(mbid).starts_with(CACHE_REC_PREFIX)); 305 + assert!(infl_key_search(q).starts_with(INFLIGHT_PREFIX)); 306 + assert!(infl_key_rec(mbid).starts_with(INFLIGHT_PREFIX)); 307 + assert!(resp_key("id").starts_with(RESP_PREFIX)); 308 + } 309 + 310 + #[tokio::test] 311 + #[serial] 312 + async fn test_musicbrainz_client() -> Result<(), Error> { 313 + let client = MusicbrainzClient::new().await?; 314 + let query = format!( 315 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 316 + "Come As You Are", "Nirvana" 317 + ); 318 + let search_res = client.search(&query).await?; 319 + 320 + assert!(!search_res.recordings.is_empty()); 321 + let rec = &search_res.recordings[0]; 322 + let mbid = &rec.id; 323 + let rec_res = client.get_recording(mbid).await?; 324 + assert_eq!(rec_res.id, *mbid); 325 + Ok(()) 40 326 } 41 327 }
+276
crates/scrobbler/src/musicbrainz/mod.rs
··· 1 + use crate::musicbrainz::{recording::Recordings, release::Release}; 2 + use std::cmp::Ordering; 3 + 1 4 pub mod artist; 2 5 pub mod client; 3 6 pub mod label; 4 7 pub mod recording; 5 8 pub mod release; 9 + 10 + fn get_best_release(releases: &[Release]) -> Option<Release> { 11 + if releases.is_empty() { 12 + return None; 13 + } 14 + 15 + // Remove the single filtering - this was causing the issue 16 + let mut candidates: Vec<&Release> = releases.iter().collect(); 17 + 18 + if candidates.is_empty() { 19 + return None; 20 + } 21 + 22 + candidates.sort_by(|a, b| cmp_release(a, b)); 23 + candidates.first().cloned().cloned() 24 + } 25 + 26 + pub fn get_best_release_from_recordings(all: &Recordings, artist: &str) -> Option<Release> { 27 + use std::collections::HashSet; 28 + 29 + let mut pool: Vec<Release> = Vec::new(); 30 + let mut seen: HashSet<String> = HashSet::new(); 31 + 32 + let all_recordings: Vec<&recording::Recording> = all 33 + .recordings 34 + .iter() 35 + .filter(|rec| { 36 + if let Some(credits) = &rec.artist_credit { 37 + artist_credit_contains(credits, artist) 38 + } else { 39 + false 40 + } 41 + }) 42 + .collect(); 43 + 44 + for rec in &all_recordings { 45 + if let Some(rels) = &rec.releases { 46 + for r in rels { 47 + if seen.insert(r.id.clone()) { 48 + pool.push(r.clone()); 49 + } 50 + } 51 + } 52 + } 53 + 54 + get_best_release(&pool) 55 + } 56 + 57 + fn cmp_release(a: &Release, b: &Release) -> Ordering { 58 + // First priority: prefer albums over singles 59 + let sa = is_single_release_type(a); 60 + let sb = is_single_release_type(b); 61 + if sa != sb { 62 + return bool_true_last(sa, sb); // Albums (false) come before singles (true) 63 + } 64 + 65 + let ta = release_tier(a.status.as_deref()); 66 + let tb = release_tier(b.status.as_deref()); 67 + if ta != tb { 68 + return ta.cmp(&tb); 69 + } 70 + 71 + let pa = has_preferred_country(a, &["XW", "US"]); 72 + let pb = has_preferred_country(b, &["XW", "US"]); 73 + if pa != pb { 74 + return bool_true_first(pa, pb); 75 + } 76 + 77 + let la = is_live_release(a); 78 + let lb = is_live_release(b); 79 + if la != lb { 80 + return bool_true_last(la, lb); 81 + } 82 + 83 + let da = date_key(a.date.as_deref()); 84 + let db = date_key(b.date.as_deref()); 85 + if da != db { 86 + return da.cmp(&db); 87 + } 88 + 89 + match a.title.cmp(&b.title) { 90 + Ordering::Equal => a.id.cmp(&b.id), 91 + ord => ord, 92 + } 93 + } 94 + 95 + fn release_tier(status: Option<&str>) -> u8 { 96 + match status.map(|s| s.to_ascii_lowercase()) { 97 + Some(s) if s == "official" => 0, 98 + Some(s) if s == "bootleg" => 1, 99 + _ => 2, 100 + } 101 + } 102 + 103 + fn bool_true_first(a: bool, b: bool) -> Ordering { 104 + match (a, b) { 105 + (true, false) => Ordering::Less, 106 + (false, true) => Ordering::Greater, 107 + _ => Ordering::Equal, 108 + } 109 + } 110 + 111 + fn bool_true_last(a: bool, b: bool) -> Ordering { 112 + match (a, b) { 113 + (true, false) => Ordering::Greater, 114 + (false, true) => Ordering::Less, 115 + _ => Ordering::Equal, 116 + } 117 + } 118 + 119 + fn is_single_release_type(rel: &Release) -> bool { 120 + if let Some(release_group) = &rel.release_group { 121 + if let Some(primary_type) = &release_group.primary_type { 122 + if primary_type.to_ascii_lowercase() == "single" { 123 + return true; 124 + } 125 + } 126 + } 127 + 128 + if rel.track_count == Some(1) { 129 + return true; 130 + } 131 + if let Some(media) = &rel.media { 132 + if media.len() == 1 && media[0].track_count == 1 { 133 + return true; 134 + } 135 + let total: u32 = media.iter().map(|m| m.track_count).sum(); 136 + if total == 1 { 137 + return true; 138 + } 139 + } 140 + false 141 + } 142 + 143 + fn has_preferred_country(rel: &Release, prefs: &[&str]) -> bool { 144 + if let Some(c) = rel.country.as_deref() { 145 + if prefs.iter().any(|p| *p == c) { 146 + return true; 147 + } 148 + } 149 + if let Some(events) = rel.release_events.as_ref() { 150 + for ev in events { 151 + if let Some(area) = &ev.area { 152 + if area 153 + .iso_3166_1_codes 154 + .iter() 155 + .any(|codes| prefs.iter().any(|p| codes.contains(&p.to_string()))) 156 + { 157 + return true; 158 + } 159 + } 160 + } 161 + } 162 + false 163 + } 164 + 165 + /// Convert "YYYY[-MM[-DD]]" into YYYYMMDD (missing parts → 01). Unknown dates sort last. 166 + fn date_key(d: Option<&str>) -> i32 { 167 + if let Some(d) = d { 168 + let mut parts = d.split('-'); 169 + let y = parts.next().unwrap_or("9999"); 170 + let m = parts.next().unwrap_or("01"); 171 + let day = parts.next().unwrap_or("01"); 172 + 173 + let y: i32 = y.parse().unwrap_or(9999); 174 + let m: i32 = m.parse().unwrap_or(1); 175 + let day: i32 = day.parse().unwrap_or(1); 176 + 177 + return y * 10000 + m * 100 + day; 178 + } 179 + 9_999_01_01 180 + } 181 + 182 + fn is_live_release(rel: &Release) -> bool { 183 + let t_live = rel.title.to_ascii_lowercase().contains("live"); 184 + let d_live = rel 185 + .disambiguation 186 + .as_ref() 187 + .map(|d| d.to_ascii_lowercase().contains("live")) 188 + .unwrap_or(false); 189 + t_live || d_live 190 + } 191 + 192 + fn artist_credit_contains(credits: &[artist::ArtistCredit], name: &str) -> bool { 193 + credits.iter().any(|c| c.name.eq_ignore_ascii_case(name)) 194 + } 195 + 196 + #[cfg(test)] 197 + mod tests { 198 + use crate::musicbrainz::client::MusicbrainzClient; 199 + use crate::musicbrainz::release::Media; 200 + use anyhow::Error; 201 + use serial_test::serial; 202 + 203 + use super::*; 204 + 205 + #[test] 206 + fn test_date_key() { 207 + assert_eq!(date_key(Some("2020-05-15")), 20200515); 208 + assert_eq!(date_key(Some("2020-05")), 20200501); 209 + assert_eq!(date_key(Some("2020")), 20200101); 210 + assert_eq!(date_key(None), 99990101); 211 + assert_eq!(date_key(Some("invalid-date")), 99990101); 212 + } 213 + 214 + #[test] 215 + fn test_is_single() { 216 + let rel1 = Release { 217 + track_count: Some(1), 218 + media: None, 219 + ..Default::default() 220 + }; 221 + assert!(is_single_release_type(&rel1)); 222 + let rel2 = Release { 223 + track_count: Some(2), 224 + media: Some(vec![ 225 + Media { 226 + track_count: 1, 227 + ..Default::default() 228 + }, 229 + Media { 230 + track_count: 1, 231 + ..Default::default() 232 + }, 233 + ]), 234 + ..Default::default() 235 + }; 236 + assert!(!is_single_release_type(&rel2)); 237 + } 238 + 239 + #[tokio::test] 240 + #[serial] 241 + async fn test_get_best_release_from_recordings() -> Result<(), Error> { 242 + let client = MusicbrainzClient::new().await?; 243 + let query = format!( 244 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 245 + "Smells Like Teen Spirit", "Nirvana" 246 + ); 247 + let recordings = client.search(&query).await?; 248 + let best = get_best_release_from_recordings(&recordings, "Nirvana"); 249 + assert!(best.is_some()); 250 + let best = best.unwrap(); 251 + assert_eq!(best.title, "Nevermind"); 252 + assert_eq!(best.status.as_deref(), Some("Official")); 253 + assert_eq!(best.country.as_deref(), Some("US")); 254 + 255 + let query = format!( 256 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 257 + "Medicine", "Joji" 258 + ); 259 + let recordings = client.search(&query).await?; 260 + let best = get_best_release_from_recordings(&recordings, "Joji"); 261 + assert!(best.is_some()); 262 + let best = best.unwrap(); 263 + assert_eq!(best.title, "Chloe Burbank Vol. 1"); 264 + assert_eq!(best.status.as_deref(), Some("Bootleg")); 265 + assert_eq!(best.country.as_deref(), Some("XW")); 266 + 267 + let query = format!( 268 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 269 + "Don't Stay", "Linkin Park" 270 + ); 271 + let recordings = client.search(&query).await?; 272 + let best = get_best_release_from_recordings(&recordings, "Linkin Park"); 273 + assert!(best.is_some()); 274 + let best = best.unwrap(); 275 + assert_eq!(best.title, "Meteora"); 276 + assert_eq!(best.status.as_deref(), Some("Official")); 277 + assert_eq!(best.country.as_deref(), Some("US")); 278 + 279 + Ok(()) 280 + } 281 + }
+26 -7
crates/scrobbler/src/musicbrainz/release.rs
··· 6 6 recording::Recording, 7 7 }; 8 8 9 - #[derive(Debug, Deserialize, Clone)] 9 + #[derive(Debug, Deserialize, Clone, Default)] 10 10 pub struct Release { 11 11 #[serde(rename = "release-events")] 12 12 pub release_events: Option<Vec<ReleaseEvent>>, ··· 24 24 #[serde(rename = "cover-art-archive")] 25 25 pub cover_art_archive: Option<CoverArtArchive>, 26 26 #[serde(rename = "artist-credit")] 27 - pub artist_credit: Vec<ArtistCredit>, 27 + pub artist_credit: Option<Vec<ArtistCredit>>, 28 28 #[serde(rename = "status-id")] 29 29 pub status_id: Option<String>, 30 30 #[serde(rename = "label-info")] ··· 33 33 pub date: Option<String>, 34 34 pub country: Option<String>, 35 35 pub asin: Option<String>, 36 + #[serde(rename = "track-count")] 37 + pub track_count: Option<u32>, 38 + #[serde(rename = "release-group")] 39 + pub release_group: Option<ReleaseGroup>, 36 40 } 37 41 38 - #[derive(Debug, Deserialize, Clone)] 42 + #[derive(Debug, Deserialize, Clone, Default)] 39 43 pub struct CoverArtArchive { 40 44 pub back: bool, 41 45 pub artwork: bool, ··· 44 48 pub darkened: bool, 45 49 } 46 50 47 - #[derive(Debug, Deserialize, Clone)] 51 + #[derive(Debug, Deserialize, Clone, Default)] 48 52 pub struct ReleaseEvent { 49 53 pub area: Option<Area>, 50 54 pub date: String, 51 55 } 52 56 53 - #[derive(Debug, Deserialize, Clone)] 57 + #[derive(Debug, Deserialize, Clone, Default)] 54 58 pub struct TextRepresentation { 55 59 pub language: Option<String>, 56 60 pub script: Option<String>, 57 61 } 58 62 59 - #[derive(Debug, Deserialize, Clone)] 63 + #[derive(Debug, Deserialize, Clone, Default)] 60 64 pub struct Media { 61 65 #[serde(rename = "format-id")] 62 66 pub format_id: Option<String>, ··· 87 91 pub title: String, 88 92 pub recording: Recording, 89 93 #[serde(rename = "artist-credit")] 90 - pub artist_credit: Vec<ArtistCredit>, 94 + pub artist_credit: Option<Vec<ArtistCredit>>, 91 95 pub number: String, 92 96 } 97 + 98 + #[derive(Debug, Deserialize, Clone, Default)] 99 + pub struct ReleaseGroup { 100 + pub id: String, 101 + pub title: String, 102 + #[serde(rename = "primary-type")] 103 + pub primary_type: Option<String>, 104 + #[serde(rename = "secondary-types")] 105 + pub secondary_types: Option<Vec<String>>, 106 + pub disambiguation: Option<String>, 107 + #[serde(rename = "first-release-date")] 108 + pub first_release_date: Option<String>, 109 + #[serde(rename = "artist-credit")] 110 + pub artist_credit: Option<Vec<ArtistCredit>>, 111 + }
+59 -25
crates/scrobbler/src/scrobbler.rs
··· 9 9 cache::Cache, 10 10 crypto::decrypt_aes_256_ctr, 11 11 listenbrainz::types::SubmitListensRequest, 12 - musicbrainz::client::MusicbrainzClient, 12 + musicbrainz::{ 13 + client::MusicbrainzClient, get_best_release_from_recordings, recording::Recording, 14 + }, 13 15 repo, rocksky, 14 16 spotify::{client::SpotifyClient, refresh_token}, 15 17 types::{Scrobble, Track}, ··· 94 96 pub async fn scrobble( 95 97 pool: &Pool<Postgres>, 96 98 cache: &Cache, 99 + mb_client: &MusicbrainzClient, 97 100 form: &BTreeMap<String, String>, 98 101 ) -> Result<Vec<Scrobble>, Error> { 99 102 let mut scrobbles = parse_batch(form)?; ··· 110 113 return Err(Error::msg("No Spotify tokens found")); 111 114 } 112 115 113 - let mb_client = MusicbrainzClient::new(); 114 - 115 116 for scrobble in &mut scrobbles { 116 117 /* 117 118 0. check if scrobble is cached ··· 224 225 } 225 226 226 227 let query = format!( 227 - r#"recording:"{}" AND artist:"{}""#, 228 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 228 229 scrobble.track, scrobble.artist 229 230 ); 230 231 let result = mb_client.search(&query).await?; ··· 248 249 pub async fn scrobble_v1( 249 250 pool: &Pool<Postgres>, 250 251 cache: &Cache, 252 + mb_client: &MusicbrainzClient, 251 253 form: &BTreeMap<String, String>, 252 254 ) -> Result<(), Error> { 253 255 let session_id = form.get("s").unwrap().to_string(); ··· 269 271 return Err(Error::msg("No Spotify tokens found")); 270 272 } 271 273 272 - let mb_client = MusicbrainzClient::new(); 273 - 274 274 let mut scrobble = Scrobble { 275 275 artist: artist.trim().to_string(), 276 276 track: track.trim().to_string(), ··· 399 399 } 400 400 401 401 let query = format!( 402 - r#"recording:"{}" AND artist:"{}""#, 402 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 403 403 scrobble.track, scrobble.artist 404 404 ); 405 - let result = mb_client.search(&query).await?; 406 - 407 - if let Some(recording) = result.recordings.first() { 405 + let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await; 406 + if let Err(e) = result { 407 + tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Musicbrainz search error: {}", e); 408 + return Ok(()); 409 + } 410 + let result = result.unwrap(); 411 + if let Some(recording) = result { 408 412 let result = mb_client.get_recording(&recording.id).await?; 409 413 tracing::info!(%scrobble.artist, %scrobble.track, "Musicbrainz (recording)"); 410 414 scrobble.album = Some(Track::from(result.clone()).album); ··· 421 425 pub async fn scrobble_listenbrainz( 422 426 pool: &Pool<Postgres>, 423 427 cache: &Cache, 428 + mb_client: &MusicbrainzClient, 424 429 req: &SubmitListensRequest, 425 430 token: &str, 426 431 ) -> Result<(), Error> { ··· 502 507 if spofity_tokens.is_empty() { 503 508 return Err(Error::msg("No Spotify tokens found")); 504 509 } 505 - 506 - let mb_client = MusicbrainzClient::new(); 507 510 508 511 let mut scrobble = Scrobble { 509 512 artist: artist.trim().to_string(), ··· 620 623 621 624 // check if artists don't contain the scrobble artist (to avoid wrong matches) 622 625 if !artists.contains(&scrobble.artist.to_lowercase()) { 623 - tracing::warn!(artist = %artist, track = %track, "Artist mismatch, skipping"); 626 + tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping"); 624 627 return Ok(()); 625 628 } 626 629 ··· 644 647 return Ok(()); 645 648 } 646 649 647 - // Temporary disable Musicbrainz search to reduce rate limiting issues 648 - // and because it often returns wrong results 649 - // we can re-enable it later with a retry mechanism 650 - /* 651 650 let query = format!( 652 - r#"recording:"{}" AND artist:"{}""#, 651 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 653 652 scrobble.track, scrobble.artist 654 653 ); 655 - let result = mb_client.search(&query).await?; 656 - 657 - if let Some(recording) = result.recordings.first() { 658 - let result = mb_client.get_recording(&recording.id).await?; 659 - println!("{}", "Musicbrainz (recording)".yellow()); 660 - scrobble.album = Some(Track::from(result.clone()).album); 654 + let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await; 655 + if let Err(e) = result { 656 + tracing::warn!(artist = %artist, track = %track, "Musicbrainz search error: {}", e); 657 + return Ok(()); 658 + } 659 + let result = result.unwrap(); 660 + if let Some(result) = result { 661 + tracing::info!("Musicbrainz (recording)"); 661 662 rocksky::scrobble(cache, &did, result.into(), scrobble.timestamp).await?; 662 663 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 663 664 return Ok(()); 664 665 } 665 - */ 666 666 667 667 tracing::warn!(artist = %artist, track = %track, "Track not found, skipping"); 668 668 669 669 Ok(()) 670 670 } 671 + 672 + async fn search_musicbrainz_recording( 673 + query: &str, 674 + mb_client: &MusicbrainzClient, 675 + scrobble: &Scrobble, 676 + ) -> Result<Option<Recording>, Error> { 677 + let result = mb_client.search(&query).await; 678 + if let Err(e) = result { 679 + tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Musicbrainz search error: {}", e); 680 + return Ok(None); 681 + } 682 + let result = result.unwrap(); 683 + 684 + let release = get_best_release_from_recordings(&result, &scrobble.artist); 685 + 686 + if let Some(release) = release { 687 + let recording = result.recordings.into_iter().find(|r| { 688 + r.releases 689 + .as_ref() 690 + .map(|releases| releases.iter().any(|rel| rel.id == release.id)) 691 + .unwrap_or(false) 692 + }); 693 + if recording.is_none() { 694 + tracing::warn!(artist = %scrobble.artist, track = %scrobble.track, "Recording not found in MusicBrainz result, skipping"); 695 + return Ok(None); 696 + } 697 + let recording = recording.unwrap(); 698 + let result = mb_client.get_recording(&recording.id).await?; 699 + tracing::info!("Musicbrainz (recording)"); 700 + return Ok(Some(result)); 701 + } 702 + 703 + Ok(None) 704 + }
+4 -4
crates/scrobbler/src/types.rs
··· 69 69 .map(|credit| credit.name.clone()) 70 70 .unwrap_or_default(); 71 71 let releases = recording.releases.unwrap_or_default(); 72 - let album_artist = releases 73 - .first() 74 - .and_then(|release| release.artist_credit.first()) 75 - .map(|credit| credit.name.clone()); 72 + let album_artist = releases.first().and_then(|release| { 73 + let credits = release.artist_credit.clone().unwrap_or_default(); 74 + credits.first().map(|credit| credit.name.clone()) 75 + }); 76 76 let album = releases 77 77 .first() 78 78 .map(|release| release.title.clone())
+4
crates/webscrobbler/Cargo.toml
··· 39 39 actix-session = "0.10.1" 40 40 actix-limitation = "0.5.1" 41 41 tracing = "0.1.41" 42 + nanoid = "0.4.0" 43 + 44 + [dev-dependencies] 45 + serial_test = "3.0.0"
+7 -2
crates/webscrobbler/src/handlers.rs
··· 1 - use crate::{cache::Cache, consts::BANNER, repo, scrobbler::scrobble, types::ScrobbleRequest}; 1 + use crate::{ 2 + cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient, repo, 3 + scrobbler::scrobble, types::ScrobbleRequest, 4 + }; 2 5 use actix_web::{get, post, web, HttpRequest, HttpResponse, Responder}; 3 6 use owo_colors::OwoColorize; 4 7 use sqlx::{Pool, Postgres}; ··· 28 31 async fn handle_scrobble( 29 32 data: web::Data<Arc<Pool<Postgres>>>, 30 33 cache: web::Data<Cache>, 34 + mb_client: web::Data<Arc<MusicbrainzClient>>, 31 35 mut payload: web::Payload, 32 36 req: HttpRequest, 33 37 ) -> Result<impl Responder, actix_web::Error> { ··· 100 104 } 101 105 } 102 106 103 - scrobble(&pool, &cache, params, &user.did) 107 + let mb_client = mb_client.get_ref().as_ref(); 108 + scrobble(&pool, &cache, mb_client, params, &user.did) 104 109 .await 105 110 .map_err(|err| { 106 111 actix_web::error::ErrorInternalServerError(format!("Failed to scrobble: {}", err))
+5 -1
crates/webscrobbler/src/lib.rs
··· 11 11 use owo_colors::OwoColorize; 12 12 use sqlx::postgres::PgPoolOptions; 13 13 14 - use crate::{cache::Cache, consts::BANNER}; 14 + use crate::{cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient}; 15 15 16 16 pub mod auth; 17 17 pub mod cache; ··· 38 38 39 39 let conn = Arc::new(pool); 40 40 41 + let mb_client = MusicbrainzClient::new().await?; 42 + let mb_client = Arc::new(mb_client); 43 + 41 44 let host = env::var("WEBSCROBBLER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); 42 45 let port = env::var("WEBSCROBBLER_PORT") 43 46 .unwrap_or_else(|_| "7883".to_string()) ··· 65 68 .app_data(limiter.clone()) 66 69 .app_data(Data::new(conn.clone())) 67 70 .app_data(Data::new(cache.clone())) 71 + .app_data(Data::new(mb_client.clone())) 68 72 .service(handlers::index) 69 73 .service(handlers::handle_scrobble) 70 74 })
+2 -2
crates/webscrobbler/src/musicbrainz/artist.rs
··· 1 1 use serde::{Deserialize, Serialize}; 2 2 3 - #[derive(Debug, Deserialize, Clone)] 3 + #[derive(Debug, Deserialize, Clone, Default)] 4 4 pub struct Artist { 5 5 pub name: String, 6 6 #[serde(rename = "sort-name")] ··· 29 29 pub aliases: Option<Vec<Alias>>, 30 30 } 31 31 32 - #[derive(Debug, Deserialize, Clone)] 32 + #[derive(Debug, Deserialize, Clone, Default)] 33 33 pub struct ArtistCredit { 34 34 pub joinphrase: Option<String>, 35 35 pub name: String,
+310 -24
crates/webscrobbler/src/musicbrainz/client.rs
··· 1 + use std::env; 2 + 1 3 use super::recording::{Recording, Recordings}; 2 - use anyhow::Error; 4 + use anyhow::{anyhow, Context, Error}; 5 + use redis::aio::MultiplexedConnection; 6 + use redis::AsyncCommands; 7 + use serde::{Deserialize, Serialize}; 8 + use tokio::time::{timeout, Duration, Instant}; 3 9 4 10 pub const BASE_URL: &str = "https://musicbrainz.org/ws/2"; 5 - pub const USER_AGENT: &str = "Rocksky/0.1.0"; 11 + pub const USER_AGENT: &str = "Rocksky/0.1.0 (+https://rocksky.app)"; 12 + 13 + const Q_QUEUE: &str = "mb:queue:v1"; 14 + const CACHE_SEARCH_PREFIX: &str = "mb:cache:search:"; 15 + const CACHE_REC_PREFIX: &str = "mb:cache:rec:"; 16 + const INFLIGHT_PREFIX: &str = "mb:inflight:"; 17 + const RESP_PREFIX: &str = "mb:resp:"; 6 18 7 - pub struct MusicbrainzClient {} 19 + const CACHE_TTL_SECS: u64 = 60 * 60 * 24; 20 + const WAIT_TIMEOUT_SECS: u64 = 20; 21 + const INFLIGHT_TTL_SECS: i64 = 30; // de-dup window while the worker is fetching 22 + 23 + #[derive(Clone)] 24 + pub struct MusicbrainzClient { 25 + http: reqwest::Client, 26 + redis: MultiplexedConnection, 27 + cache_ttl: u64, 28 + } 29 + 30 + #[derive(Debug, Serialize, Deserialize)] 31 + #[serde(tag = "kind", rename_all = "snake_case")] 32 + enum Job { 33 + Search { id: String, query: String }, 34 + GetRecording { id: String, mbid: String }, 35 + } 8 36 9 37 impl MusicbrainzClient { 10 - pub fn new() -> Self { 11 - MusicbrainzClient {} 38 + pub async fn new() -> Result<Self, Error> { 39 + let client = 40 + redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))?; 41 + let redis = client.get_multiplexed_tokio_connection().await?; 42 + let http = reqwest::Client::builder() 43 + .user_agent(USER_AGENT) 44 + .build() 45 + .context("build http client")?; 46 + let me = MusicbrainzClient { 47 + http, 48 + redis, 49 + cache_ttl: CACHE_TTL_SECS, 50 + }; 51 + 52 + let mut worker_conn = client.get_multiplexed_async_connection().await?; 53 + 54 + let http = me.http.clone(); 55 + tokio::spawn(async move { worker_loop(http, &mut worker_conn).await }); 56 + 57 + Ok(me) 12 58 } 13 59 14 60 pub async fn search(&self, query: &str) -> Result<Recordings, Error> { 15 - let url = format!("{}/recording", BASE_URL); 16 - let client = reqwest::Client::new(); 17 - let response = client 18 - .get(&url) 19 - .header("Accept", "application/json") 20 - .header("User-Agent", USER_AGENT) 21 - .query(&[("query", query), ("inc", "artist-credits+releases")]) 22 - .send() 61 + if let Some(h) = self.get_cache(&cache_key_search(query)).await? { 62 + return Ok(serde_json::from_str(&h).context("decode cached search")?); 63 + } 64 + let id = nanoid::nanoid!(); 65 + let job = Job::Search { 66 + id: id.clone(), 67 + query: query.to_string(), 68 + }; 69 + self.enqueue_if_needed(&job, &infl_key_search(query)) 23 70 .await?; 24 71 25 - Ok(response.json().await?) 72 + let raw = self.wait_for_response(&id).await?; 73 + let parsed: Recordings = serde_json::from_str(&raw).context("decode search response")?; 74 + 75 + self.set_cache(&cache_key_search(query), &raw).await?; 76 + Ok(parsed) 26 77 } 27 78 28 79 pub async fn get_recording(&self, mbid: &str) -> Result<Recording, Error> { 29 - let url = format!("{}/recording/{}", BASE_URL, mbid); 30 - let client = reqwest::Client::new(); 31 - let response = client 32 - .get(&url) 33 - .header("Accept", "application/json") 34 - .header("User-Agent", USER_AGENT) 35 - .query(&[("inc", "artist-credits+releases")]) 36 - .send() 37 - .await?; 80 + if let Some(h) = self.get_cache(&cache_key_rec(mbid)).await? { 81 + return Ok(serde_json::from_str(&h).context("decode cached recording")?); 82 + } 83 + let id = nanoid::nanoid!(); 84 + let job = Job::GetRecording { 85 + id: id.clone(), 86 + mbid: mbid.to_string(), 87 + }; 88 + self.enqueue_if_needed(&job, &infl_key_rec(mbid)).await?; 89 + let raw = self.wait_for_response(&id).await?; 90 + let parsed: Recording = serde_json::from_str(&raw).context("decode recording response")?; 91 + self.set_cache(&cache_key_rec(mbid), &raw).await?; 92 + Ok(parsed) 93 + } 94 + 95 + // ---------- Redis helpers ---------- 96 + 97 + async fn get_cache(&self, key: &str) -> Result<Option<String>, Error> { 98 + let mut r = self.redis.clone(); 99 + let val: Option<String> = r.get(key).await?; 100 + Ok(val) 101 + } 102 + 103 + async fn set_cache(&self, key: &str, json: &str) -> Result<(), Error> { 104 + let mut r = self.redis.clone(); 105 + let _: () = r 106 + .set_ex(key, json, self.cache_ttl) 107 + .await 108 + .with_context(|| format!("cache set {key}"))?; 109 + Ok(()) 110 + } 111 + 112 + async fn enqueue_if_needed(&self, job: &Job, inflight_key: &str) -> Result<(), Error> { 113 + let mut r = self.redis.clone(); 114 + 115 + // set NX to avoid duplicate work; short TTL 116 + let set: bool = r.set_nx(inflight_key, "1").await.context("set in-flight")?; 117 + if set { 118 + let _: () = r 119 + .expire(inflight_key, INFLIGHT_TTL_SECS) 120 + .await 121 + .context("expire inflight")?; 122 + let payload = serde_json::to_string(job).expect("serialize job"); 123 + let _: () = r.rpush(Q_QUEUE, payload).await.context("enqueue job")?; 124 + } 125 + Ok(()) 126 + } 127 + 128 + async fn wait_for_response(&self, id: &str) -> Result<String, Error> { 129 + let mut r = self.redis.clone(); 130 + let resp_q = resp_key(id); 131 + 132 + let fut = async { 133 + loop { 134 + let popped: Option<(String, String)> = r.brpop(&resp_q, 2.0).await?; 135 + if let Some((_key, json)) = popped { 136 + return Ok::<String, Error>(json); 137 + } 138 + } 139 + }; 140 + 141 + match timeout(Duration::from_secs(WAIT_TIMEOUT_SECS), fut).await { 142 + Ok(res) => res, 143 + Err(_) => Err(anyhow!("timed out waiting for MusicBrainz response")), 144 + } 145 + } 146 + } 147 + 148 + async fn worker_loop( 149 + http: reqwest::Client, 150 + redis: &mut MultiplexedConnection, 151 + ) -> Result<(), Error> { 152 + // pacing ticker: strictly 1 request/second 153 + let mut next_allowed = Instant::now(); 38 154 39 - Ok(response.json().await?) 155 + loop { 156 + tokio::select! { 157 + res = async { 158 + 159 + // finite timeout pop 160 + let v: Option<Vec<String>> = redis.blpop(Q_QUEUE, 2.0).await.ok(); 161 + Ok::<_, Error>(v) 162 + } => { 163 + let Some(mut v) = res? else { 164 + continue }; 165 + if v.len() != 2 { 166 + continue; } 167 + let payload = v.pop().unwrap(); 168 + 169 + // 1 rps pacing 170 + let now = Instant::now(); 171 + if now < next_allowed { tokio::time::sleep(next_allowed - now).await; } 172 + next_allowed = Instant::now() + Duration::from_secs(1); 173 + 174 + let payload: Job = match serde_json::from_str(&payload) { 175 + Ok(j) => j, 176 + Err(e) => { 177 + tracing::error!(%e, "invalid job payload"); 178 + continue; 179 + } 180 + }; 181 + if let Err(e) = process_job(&http, redis, payload).await { 182 + tracing::error!(%e, "job failed"); 183 + } 184 + } 185 + } 186 + } 187 + } 188 + 189 + async fn process_job( 190 + http: &reqwest::Client, 191 + redis: &mut MultiplexedConnection, 192 + job: Job, 193 + ) -> Result<(), Error> { 194 + match job { 195 + Job::Search { id, query } => { 196 + let url = format!("{}/recording", BASE_URL); 197 + let resp = http 198 + .get(&url) 199 + .header("Accept", "application/json") 200 + .query(&[ 201 + ("query", query.as_str()), 202 + ("fmt", "json"), 203 + ("inc", "artists+releases+isrcs"), 204 + ]) 205 + .send() 206 + .await 207 + .context("http search")?; 208 + 209 + if !resp.status().is_success() { 210 + // Push an error payload so waiters don’t hang forever 211 + let _ = push_response( 212 + redis, 213 + &id, 214 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 215 + ) 216 + .await; 217 + return Err(anyhow!("musicbrainz search http {}", resp.status())); 218 + } 219 + 220 + let text = resp.text().await.context("read body")?; 221 + push_response(redis, &id, &text).await?; 222 + } 223 + Job::GetRecording { id, mbid } => { 224 + let url = format!("{}/recording/{}", BASE_URL, mbid); 225 + let resp = http 226 + .get(&url) 227 + .header("Accept", "application/json") 228 + .query(&[("fmt", "json"), ("inc", "artists+releases+isrcs")]) 229 + .send() 230 + .await 231 + .context("http get_recording")?; 232 + 233 + if !resp.status().is_success() { 234 + let _ = push_response( 235 + redis, 236 + &id, 237 + &format!(r#"{{"error":"http {}"}}"#, resp.status()), 238 + ) 239 + .await; 240 + return Err(anyhow!("musicbrainz get_recording http {}", resp.status())); 241 + } 242 + 243 + let text = resp.text().await.context("read body")?; 244 + push_response(redis, &id, &text).await?; 245 + } 246 + } 247 + Ok(()) 248 + } 249 + 250 + async fn push_response( 251 + redis: &mut MultiplexedConnection, 252 + id: &str, 253 + json: &str, 254 + ) -> Result<(), Error> { 255 + let q = resp_key(id); 256 + // RPUSH then EXPIRE to avoid leaks if a client never BRPOPs 257 + let _: () = redis.rpush(&q, json).await?; 258 + let _: () = redis.expire(&q, WAIT_TIMEOUT_SECS as i64 + 5).await?; 259 + Ok(()) 260 + } 261 + 262 + fn cache_key_search(query: &str) -> String { 263 + format!("{}{}", CACHE_SEARCH_PREFIX, fast_hash(query)) 264 + } 265 + fn cache_key_rec(mbid: &str) -> String { 266 + format!("{}{}", CACHE_REC_PREFIX, mbid) 267 + } 268 + fn infl_key_search(query: &str) -> String { 269 + format!("{}search:{}", INFLIGHT_PREFIX, fast_hash(query)) 270 + } 271 + fn infl_key_rec(mbid: &str) -> String { 272 + format!("{}rec:{}", INFLIGHT_PREFIX, mbid) 273 + } 274 + fn resp_key(id: &str) -> String { 275 + format!("{}{}", RESP_PREFIX, id) 276 + } 277 + 278 + fn fast_hash(s: &str) -> u64 { 279 + use std::hash::{Hash, Hasher}; 280 + let mut h = std::collections::hash_map::DefaultHasher::new(); 281 + s.hash(&mut h); 282 + h.finish() 283 + } 284 + 285 + #[cfg(test)] 286 + mod tests { 287 + use super::*; 288 + use serial_test::serial; 289 + 290 + #[test] 291 + fn test_fast_hash() { 292 + let h1 = fast_hash("hello"); 293 + let h2 = fast_hash("hello"); 294 + let h3 = fast_hash("world"); 295 + assert_eq!(h1, h2); 296 + assert_ne!(h1, h3); 297 + } 298 + 299 + #[test] 300 + fn test_cache_keys() { 301 + let q = "test query"; 302 + let mbid = "some-mbid"; 303 + assert!(cache_key_search(q).starts_with(CACHE_SEARCH_PREFIX)); 304 + assert!(cache_key_rec(mbid).starts_with(CACHE_REC_PREFIX)); 305 + assert!(infl_key_search(q).starts_with(INFLIGHT_PREFIX)); 306 + assert!(infl_key_rec(mbid).starts_with(INFLIGHT_PREFIX)); 307 + assert!(resp_key("id").starts_with(RESP_PREFIX)); 308 + } 309 + 310 + #[tokio::test] 311 + #[serial] 312 + async fn test_musicbrainz_client() -> Result<(), Error> { 313 + let client = MusicbrainzClient::new().await?; 314 + let query = format!( 315 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 316 + "Come As You Are", "Nirvana" 317 + ); 318 + let search_res = client.search(&query).await?; 319 + 320 + assert!(!search_res.recordings.is_empty()); 321 + let rec = &search_res.recordings[0]; 322 + let mbid = &rec.id; 323 + let rec_res = client.get_recording(mbid).await?; 324 + assert_eq!(rec_res.id, *mbid); 325 + Ok(()) 40 326 } 41 327 }
+275
crates/webscrobbler/src/musicbrainz/mod.rs
··· 1 + use crate::musicbrainz::{recording::Recordings, release::Release}; 2 + use std::cmp::Ordering; 3 + 1 4 pub mod artist; 2 5 pub mod client; 3 6 pub mod label; 4 7 pub mod recording; 5 8 pub mod release; 9 + 10 + fn get_best_release(releases: &[Release]) -> Option<Release> { 11 + if releases.is_empty() { 12 + return None; 13 + } 14 + 15 + let mut candidates: Vec<&Release> = releases.iter().collect(); 16 + 17 + if candidates.is_empty() { 18 + return None; 19 + } 20 + 21 + candidates.sort_by(|a, b| cmp_release(a, b)); 22 + candidates.first().cloned().cloned() 23 + } 24 + 25 + pub fn get_best_release_from_recordings(all: &Recordings, artist: &str) -> Option<Release> { 26 + use std::collections::HashSet; 27 + 28 + let mut pool: Vec<Release> = Vec::new(); 29 + let mut seen: HashSet<String> = HashSet::new(); 30 + 31 + let all_recordings: Vec<&recording::Recording> = all 32 + .recordings 33 + .iter() 34 + .filter(|rec| { 35 + if let Some(credits) = &rec.artist_credit { 36 + artist_credit_contains(credits, artist) 37 + } else { 38 + false 39 + } 40 + }) 41 + .collect(); 42 + 43 + for rec in &all_recordings { 44 + if let Some(rels) = &rec.releases { 45 + for r in rels { 46 + if seen.insert(r.id.clone()) { 47 + pool.push(r.clone()); 48 + } 49 + } 50 + } 51 + } 52 + 53 + get_best_release(&pool) 54 + } 55 + 56 + fn cmp_release(a: &Release, b: &Release) -> Ordering { 57 + // First priority: prefer albums over singles 58 + let sa = is_single_release_type(a); 59 + let sb = is_single_release_type(b); 60 + if sa != sb { 61 + return bool_true_last(sa, sb); // Albums (false) come before singles (true) 62 + } 63 + 64 + let ta = release_tier(a.status.as_deref()); 65 + let tb = release_tier(b.status.as_deref()); 66 + if ta != tb { 67 + return ta.cmp(&tb); 68 + } 69 + 70 + let pa = has_preferred_country(a, &["XW", "US"]); 71 + let pb = has_preferred_country(b, &["XW", "US"]); 72 + if pa != pb { 73 + return bool_true_first(pa, pb); 74 + } 75 + 76 + let la = is_live_release(a); 77 + let lb = is_live_release(b); 78 + if la != lb { 79 + return bool_true_last(la, lb); 80 + } 81 + 82 + let da = date_key(a.date.as_deref()); 83 + let db = date_key(b.date.as_deref()); 84 + if da != db { 85 + return da.cmp(&db); 86 + } 87 + 88 + match a.title.cmp(&b.title) { 89 + Ordering::Equal => a.id.cmp(&b.id), 90 + ord => ord, 91 + } 92 + } 93 + 94 + fn release_tier(status: Option<&str>) -> u8 { 95 + match status.map(|s| s.to_ascii_lowercase()) { 96 + Some(s) if s == "official" => 0, 97 + Some(s) if s == "bootleg" => 1, 98 + _ => 2, 99 + } 100 + } 101 + 102 + fn bool_true_first(a: bool, b: bool) -> Ordering { 103 + match (a, b) { 104 + (true, false) => Ordering::Less, 105 + (false, true) => Ordering::Greater, 106 + _ => Ordering::Equal, 107 + } 108 + } 109 + 110 + fn bool_true_last(a: bool, b: bool) -> Ordering { 111 + match (a, b) { 112 + (true, false) => Ordering::Greater, 113 + (false, true) => Ordering::Less, 114 + _ => Ordering::Equal, 115 + } 116 + } 117 + 118 + fn is_single_release_type(rel: &Release) -> bool { 119 + if let Some(release_group) = &rel.release_group { 120 + if let Some(primary_type) = &release_group.primary_type { 121 + if primary_type.to_ascii_lowercase() == "single" { 122 + return true; 123 + } 124 + } 125 + } 126 + 127 + if rel.track_count == Some(1) { 128 + return true; 129 + } 130 + if let Some(media) = &rel.media { 131 + if media.len() == 1 && media[0].track_count == 1 { 132 + return true; 133 + } 134 + let total: u32 = media.iter().map(|m| m.track_count).sum(); 135 + if total == 1 { 136 + return true; 137 + } 138 + } 139 + false 140 + } 141 + 142 + fn has_preferred_country(rel: &Release, prefs: &[&str]) -> bool { 143 + if let Some(c) = rel.country.as_deref() { 144 + if prefs.iter().any(|p| *p == c) { 145 + return true; 146 + } 147 + } 148 + if let Some(events) = rel.release_events.as_ref() { 149 + for ev in events { 150 + if let Some(area) = &ev.area { 151 + if area 152 + .iso_3166_1_codes 153 + .iter() 154 + .any(|codes| prefs.iter().any(|p| codes.contains(&p.to_string()))) 155 + { 156 + return true; 157 + } 158 + } 159 + } 160 + } 161 + false 162 + } 163 + 164 + /// Convert "YYYY[-MM[-DD]]" into YYYYMMDD (missing parts → 01). Unknown dates sort last. 165 + fn date_key(d: Option<&str>) -> i32 { 166 + if let Some(d) = d { 167 + let mut parts = d.split('-'); 168 + let y = parts.next().unwrap_or("9999"); 169 + let m = parts.next().unwrap_or("01"); 170 + let day = parts.next().unwrap_or("01"); 171 + 172 + let y: i32 = y.parse().unwrap_or(9999); 173 + let m: i32 = m.parse().unwrap_or(1); 174 + let day: i32 = day.parse().unwrap_or(1); 175 + 176 + return y * 10000 + m * 100 + day; 177 + } 178 + 9_999_01_01 179 + } 180 + 181 + fn is_live_release(rel: &Release) -> bool { 182 + let t_live = rel.title.to_ascii_lowercase().contains("live"); 183 + let d_live = rel 184 + .disambiguation 185 + .as_ref() 186 + .map(|d| d.to_ascii_lowercase().contains("live")) 187 + .unwrap_or(false); 188 + t_live || d_live 189 + } 190 + 191 + fn artist_credit_contains(credits: &[artist::ArtistCredit], name: &str) -> bool { 192 + credits.iter().any(|c| c.name.eq_ignore_ascii_case(name)) 193 + } 194 + 195 + #[cfg(test)] 196 + mod tests { 197 + use crate::musicbrainz::client::MusicbrainzClient; 198 + use crate::musicbrainz::release::Media; 199 + use anyhow::Error; 200 + use serial_test::serial; 201 + 202 + use super::*; 203 + 204 + #[test] 205 + fn test_date_key() { 206 + assert_eq!(date_key(Some("2020-05-15")), 20200515); 207 + assert_eq!(date_key(Some("2020-05")), 20200501); 208 + assert_eq!(date_key(Some("2020")), 20200101); 209 + assert_eq!(date_key(None), 99990101); 210 + assert_eq!(date_key(Some("invalid-date")), 99990101); 211 + } 212 + 213 + #[test] 214 + fn test_is_single() { 215 + let rel1 = Release { 216 + track_count: Some(1), 217 + media: None, 218 + ..Default::default() 219 + }; 220 + assert!(is_single_release_type(&rel1)); 221 + let rel2 = Release { 222 + track_count: Some(2), 223 + media: Some(vec![ 224 + Media { 225 + track_count: 1, 226 + ..Default::default() 227 + }, 228 + Media { 229 + track_count: 1, 230 + ..Default::default() 231 + }, 232 + ]), 233 + ..Default::default() 234 + }; 235 + assert!(!is_single_release_type(&rel2)); 236 + } 237 + 238 + #[tokio::test] 239 + #[serial] 240 + async fn test_get_best_release_from_recordings() -> Result<(), Error> { 241 + let client = MusicbrainzClient::new().await?; 242 + let query = format!( 243 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 244 + "Smells Like Teen Spirit", "Nirvana" 245 + ); 246 + let recordings = client.search(&query).await?; 247 + let best = get_best_release_from_recordings(&recordings, "Nirvana"); 248 + assert!(best.is_some()); 249 + let best = best.unwrap(); 250 + assert_eq!(best.title, "Nevermind"); 251 + assert_eq!(best.status.as_deref(), Some("Official")); 252 + assert_eq!(best.country.as_deref(), Some("US")); 253 + 254 + let query = format!( 255 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 256 + "Medicine", "Joji" 257 + ); 258 + let recordings = client.search(&query).await?; 259 + let best = get_best_release_from_recordings(&recordings, "Joji"); 260 + assert!(best.is_some()); 261 + let best = best.unwrap(); 262 + assert_eq!(best.title, "Chloe Burbank Vol. 1"); 263 + assert_eq!(best.status.as_deref(), Some("Bootleg")); 264 + assert_eq!(best.country.as_deref(), Some("XW")); 265 + 266 + let query = format!( 267 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 268 + "Don't Stay", "Linkin Park" 269 + ); 270 + let recordings = client.search(&query).await?; 271 + let best = get_best_release_from_recordings(&recordings, "Linkin Park"); 272 + assert!(best.is_some()); 273 + let best = best.unwrap(); 274 + assert_eq!(best.title, "Meteora"); 275 + assert_eq!(best.status.as_deref(), Some("Official")); 276 + assert_eq!(best.country.as_deref(), Some("US")); 277 + 278 + Ok(()) 279 + } 280 + }
+26 -7
crates/webscrobbler/src/musicbrainz/release.rs
··· 6 6 recording::Recording, 7 7 }; 8 8 9 - #[derive(Debug, Deserialize, Clone)] 9 + #[derive(Debug, Deserialize, Clone, Default)] 10 10 pub struct Release { 11 11 #[serde(rename = "release-events")] 12 12 pub release_events: Option<Vec<ReleaseEvent>>, ··· 24 24 #[serde(rename = "cover-art-archive")] 25 25 pub cover_art_archive: Option<CoverArtArchive>, 26 26 #[serde(rename = "artist-credit")] 27 - pub artist_credit: Vec<ArtistCredit>, 27 + pub artist_credit: Option<Vec<ArtistCredit>>, 28 28 #[serde(rename = "status-id")] 29 29 pub status_id: Option<String>, 30 30 #[serde(rename = "label-info")] ··· 33 33 pub date: Option<String>, 34 34 pub country: Option<String>, 35 35 pub asin: Option<String>, 36 + #[serde(rename = "track-count")] 37 + pub track_count: Option<u32>, 38 + #[serde(rename = "release-group")] 39 + pub release_group: Option<ReleaseGroup>, 36 40 } 37 41 38 - #[derive(Debug, Deserialize, Clone)] 42 + #[derive(Debug, Deserialize, Clone, Default)] 39 43 pub struct CoverArtArchive { 40 44 pub back: bool, 41 45 pub artwork: bool, ··· 44 48 pub darkened: bool, 45 49 } 46 50 47 - #[derive(Debug, Deserialize, Clone)] 51 + #[derive(Debug, Deserialize, Clone, Default)] 48 52 pub struct ReleaseEvent { 49 53 pub area: Option<Area>, 50 54 pub date: String, 51 55 } 52 56 53 - #[derive(Debug, Deserialize, Clone)] 57 + #[derive(Debug, Deserialize, Clone, Default)] 54 58 pub struct TextRepresentation { 55 59 pub language: Option<String>, 56 60 pub script: Option<String>, 57 61 } 58 62 59 - #[derive(Debug, Deserialize, Clone)] 63 + #[derive(Debug, Deserialize, Clone, Default)] 60 64 pub struct Media { 61 65 #[serde(rename = "format-id")] 62 66 pub format_id: Option<String>, ··· 87 91 pub title: String, 88 92 pub recording: Recording, 89 93 #[serde(rename = "artist-credit")] 90 - pub artist_credit: Vec<ArtistCredit>, 94 + pub artist_credit: Option<Vec<ArtistCredit>>, 91 95 pub number: String, 92 96 } 97 + 98 + #[derive(Debug, Deserialize, Clone, Default)] 99 + pub struct ReleaseGroup { 100 + pub id: String, 101 + pub title: String, 102 + #[serde(rename = "primary-type")] 103 + pub primary_type: Option<String>, 104 + #[serde(rename = "secondary-types")] 105 + pub secondary_types: Option<Vec<String>>, 106 + pub disambiguation: Option<String>, 107 + #[serde(rename = "first-release-date")] 108 + pub first_release_date: Option<String>, 109 + #[serde(rename = "artist-credit")] 110 + pub artist_credit: Option<Vec<ArtistCredit>>, 111 + }
+59 -6
crates/webscrobbler/src/scrobbler.rs
··· 3 3 use crate::cache::Cache; 4 4 use crate::crypto::decrypt_aes_256_ctr; 5 5 use crate::musicbrainz::client::MusicbrainzClient; 6 + use crate::musicbrainz::get_best_release_from_recordings; 7 + use crate::musicbrainz::recording::Recording; 6 8 use crate::spotify::client::SpotifyClient; 7 9 use crate::spotify::refresh_token; 8 10 use crate::types::{ScrobbleRequest, Track}; ··· 15 17 pub async fn scrobble( 16 18 pool: &Pool<Postgres>, 17 19 cache: &Cache, 20 + mb_client: &MusicbrainzClient, 18 21 scrobble: ScrobbleRequest, 19 22 did: &str, 20 23 ) -> Result<(), Error> { ··· 23 26 if spofity_tokens.is_empty() { 24 27 return Err(Error::msg("No Spotify tokens found")); 25 28 } 26 - 27 - let mb_client = MusicbrainzClient::new(); 28 29 29 30 let key = format!( 30 31 "{} - {}", ··· 127 128 let result = spotify_client.search(&query).await?; 128 129 129 130 if let Some(track) = result.tracks.items.first() { 131 + let artists = track 132 + .artists 133 + .iter() 134 + .map(|a| a.name.clone()) 135 + .collect::<Vec<_>>() 136 + .join(", ") 137 + .to_lowercase(); 138 + let artist = scrobble.data.song.parsed.artist.trim(); 139 + // check if artists don't contain the scrobble artist (to avoid wrong matches) 140 + if !artists.contains(&scrobble.data.song.parsed.artist.trim().to_lowercase()) { 141 + tracing::warn!(artist = %artist, track = ?track, "Artist mismatch, skipping"); 142 + return Ok(()); 143 + } 144 + 130 145 tracing::info!("Spotify (track)"); 131 146 let mut track = track.clone(); 132 147 ··· 147 162 } 148 163 149 164 let query = format!( 150 - r#"recording:"{}" AND artist:"{}""#, 165 + r#"recording:"{}" AND artist:"{}" AND status:Official"#, 151 166 scrobble.data.song.parsed.track, scrobble.data.song.parsed.artist 152 167 ); 153 - let result = mb_client.search(&query).await?; 154 168 155 - if let Some(recording) = result.recordings.first() { 156 - let result = mb_client.get_recording(&recording.id).await?; 169 + let result = search_musicbrainz_recording(&query, mb_client, &scrobble).await; 170 + if let Err(e) = result { 171 + tracing::warn!(artist = %scrobble.data.song.parsed.artist, track = %scrobble.data.song.parsed.track, "Musicbrainz search error: {}", e); 172 + return Ok(()); 173 + } 174 + let result = result.unwrap(); 175 + if let Some(result) = result { 157 176 tracing::info!("Musicbrainz (recording)"); 158 177 rocksky::scrobble(cache, &did, result.into(), scrobble.time).await?; 159 178 tokio::time::sleep(std::time::Duration::from_secs(1)).await; ··· 164 183 165 184 Ok(()) 166 185 } 186 + 187 + async fn search_musicbrainz_recording( 188 + query: &str, 189 + mb_client: &MusicbrainzClient, 190 + scrobble: &ScrobbleRequest, 191 + ) -> Result<Option<Recording>, Error> { 192 + let result = mb_client.search(&query).await; 193 + if let Err(e) = result { 194 + tracing::warn!(artist = %scrobble.data.song.parsed.artist, track = %scrobble.data.song.parsed.track, "Musicbrainz search error: {}", e); 195 + return Ok(None); 196 + } 197 + let result = result.unwrap(); 198 + 199 + let release = get_best_release_from_recordings(&result, &scrobble.data.song.parsed.artist); 200 + 201 + if let Some(release) = release { 202 + let recording = result.recordings.into_iter().find(|r| { 203 + r.releases 204 + .as_ref() 205 + .map(|releases| releases.iter().any(|rel| rel.id == release.id)) 206 + .unwrap_or(false) 207 + }); 208 + if recording.is_none() { 209 + tracing::warn!(artist = %scrobble.data.song.parsed.artist, track = %scrobble.data.song.parsed.track, "Recording not found in MusicBrainz result, skipping"); 210 + return Ok(None); 211 + } 212 + let recording = recording.unwrap(); 213 + let result = mb_client.get_recording(&recording.id).await?; 214 + tracing::info!("Musicbrainz (recording)"); 215 + return Ok(Some(result)); 216 + } 217 + 218 + Ok(None) 219 + }
+4 -1
crates/webscrobbler/src/types.rs
··· 163 163 let releases = recording.releases.unwrap_or_default(); 164 164 let album_artist = releases 165 165 .first() 166 - .and_then(|release| release.artist_credit.first()) 166 + .and_then(|release| { 167 + let credits = release.artist_credit.clone().unwrap_or_default(); 168 + credits.first().map(|credit| credit.clone()) 169 + }) 167 170 .map(|credit| credit.name.clone()); 168 171 let album = releases 169 172 .first()