A decentralized music tracking and discovery platform built on AT Protocol 🎵

Add Spotify API retries and HTTP client timeouts

+208 -47
+108 -30
crates/scrobbler/src/scrobbler.rs
··· 19 xata::user::User, 20 }; 21 22 fn parse_batch(form: &BTreeMap<String, String>) -> Result<Vec<Scrobble>, Error> { 23 let mut result = vec![]; 24 let mut index = 0; ··· 203 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?; 204 let spotify_client = SpotifyClient::new(&spotify_token.access_token); 205 206 - let result = spotify_client 207 - .search(&format!( 208 - r#"track:"{}" artist:"{}""#, 209 - scrobble.track, scrobble.artist 210 - )) 211 - .await?; 212 213 if let Some(track) = result.tracks.items.first() { 214 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Spotify (track)"); 215 scrobble.album = Some(track.album.name.clone()); 216 let mut track = track.clone(); 217 218 - if let Some(album) = spotify_client.get_album(&track.album.id).await? { 219 track.album = album; 220 } 221 222 - if let Some(artist) = spotify_client 223 - .get_artist(&track.album.artists[0].id) 224 - .await? 225 { 226 track.album.artists[0] = artist; 227 } ··· 383 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?; 384 let spotify_client = SpotifyClient::new(&spotify_token.access_token); 385 386 - let result = spotify_client 387 - .search(&format!( 388 - r#"track:"{}" artist:"{}""#, 389 - scrobble.track, scrobble.artist 390 - )) 391 - .await?; 392 393 if let Some(track) = result.tracks.items.first() { 394 let normalize = |s: &str| -> String { ··· 434 scrobble.album = Some(track.album.name.clone()); 435 let mut track = track.clone(); 436 437 - if let Some(album) = spotify_client.get_album(&track.album.id).await? { 438 track.album = album; 439 } 440 441 - if let Some(artist) = spotify_client 442 - .get_artist(&track.album.artists[0].id) 443 - .await? 444 { 445 track.album.artists[0] = artist; 446 } ··· 668 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?; 669 let spotify_client = SpotifyClient::new(&spotify_token.access_token); 670 671 - let result = spotify_client 672 - .search(&format!( 673 - r#"track:"{}" artist:"{}""#, 674 - scrobble.track, scrobble.artist 675 - )) 676 - .await?; 677 678 if let Some(track) = result.tracks.items.first() { 679 let normalize = |s: &str| -> String { ··· 712 scrobble.album = Some(track.album.name.clone()); 713 let mut track = track.clone(); 714 715 - if let Some(album) = spotify_client.get_album(&track.album.id).await? { 716 track.album = album; 717 } 718 719 - if let Some(artist) = spotify_client 720 - .get_artist(&track.album.artists[0].id) 721 - .await? 722 { 723 track.album.artists[0] = artist; 724 } ··· 785 786 Ok(None) 787 }
··· 19 xata::user::User, 20 }; 21 22 + const MAX_SPOTIFY_RETRIES: u32 = 3; 23 + const INITIAL_RETRY_DELAY_MS: u64 = 1000; 24 + 25 fn parse_batch(form: &BTreeMap<String, String>) -> Result<Vec<Scrobble>, Error> { 26 let mut result = vec![]; 27 let mut index = 0; ··· 206 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?; 207 let spotify_client = SpotifyClient::new(&spotify_token.access_token); 208 209 + let result = retry_spotify_call( 210 + || async { 211 + spotify_client 212 + .search(&format!( 213 + r#"track:"{}" artist:"{}""#, 214 + scrobble.track, scrobble.artist 215 + )) 216 + .await 217 + }, 218 + "search", 219 + ) 220 + .await?; 221 222 if let Some(track) = result.tracks.items.first() { 223 tracing::info!(artist = %scrobble.artist, track = %scrobble.track, "Spotify (track)"); 224 scrobble.album = Some(track.album.name.clone()); 225 let mut track = track.clone(); 226 227 + if let Some(album) = retry_spotify_call( 228 + || async { spotify_client.get_album(&track.album.id).await }, 229 + "get_album", 230 + ) 231 + .await? 232 + { 233 track.album = album; 234 } 235 236 + if let Some(artist) = retry_spotify_call( 237 + || async { spotify_client.get_artist(&track.album.artists[0].id).await }, 238 + "get_artist", 239 + ) 240 + .await? 241 { 242 track.album.artists[0] = artist; 243 } ··· 399 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?; 400 let spotify_client = SpotifyClient::new(&spotify_token.access_token); 401 402 + let result = retry_spotify_call( 403 + || async { 404 + spotify_client 405 + .search(&format!( 406 + r#"track:"{}" artist:"{}""#, 407 + scrobble.track, scrobble.artist 408 + )) 409 + .await 410 + }, 411 + "search", 412 + ) 413 + .await?; 414 415 if let Some(track) = result.tracks.items.first() { 416 let normalize = |s: &str| -> String { ··· 456 scrobble.album = Some(track.album.name.clone()); 457 let mut track = track.clone(); 458 459 + if let Some(album) = retry_spotify_call( 460 + || async { spotify_client.get_album(&track.album.id).await }, 461 + "get_album", 462 + ) 463 + .await? 464 + { 465 track.album = album; 466 } 467 468 + if let Some(artist) = retry_spotify_call( 469 + || async { spotify_client.get_artist(&track.album.artists[0].id).await }, 470 + "get_artist", 471 + ) 472 + .await? 473 { 474 track.album.artists[0] = artist; 475 } ··· 697 let spotify_token = refresh_token(&spotify_token, &client_id, &client_secret).await?; 698 let spotify_client = SpotifyClient::new(&spotify_token.access_token); 699 700 + let result = retry_spotify_call( 701 + || async { 702 + spotify_client 703 + .search(&format!( 704 + r#"track:"{}" artist:"{}""#, 705 + scrobble.track, scrobble.artist 706 + )) 707 + .await 708 + }, 709 + "search", 710 + ) 711 + .await?; 712 713 if let Some(track) = result.tracks.items.first() { 714 let normalize = |s: &str| -> String { ··· 747 scrobble.album = Some(track.album.name.clone()); 748 let mut track = track.clone(); 749 750 + if let Some(album) = retry_spotify_call( 751 + || async { spotify_client.get_album(&track.album.id).await }, 752 + "get_album", 753 + ) 754 + .await? 755 + { 756 track.album = album; 757 } 758 759 + if let Some(artist) = retry_spotify_call( 760 + || async { spotify_client.get_artist(&track.album.artists[0].id).await }, 761 + "get_artist", 762 + ) 763 + .await? 764 { 765 track.album.artists[0] = artist; 766 } ··· 827 828 Ok(None) 829 } 830 + 831 + async fn retry_spotify_call<F, Fut, T>(mut f: F, operation: &str) -> Result<T, Error> 832 + where 833 + F: FnMut() -> Fut, 834 + Fut: std::future::Future<Output = Result<T, Error>>, 835 + { 836 + let mut last_error = None; 837 + 838 + for attempt in 0..MAX_SPOTIFY_RETRIES { 839 + match f().await { 840 + Ok(result) => return Ok(result), 841 + Err(e) => { 842 + let is_timeout = e.to_string().contains("timed out") 843 + || e.to_string().contains("timeout") 844 + || e.to_string().contains("operation timed out"); 845 + 846 + if is_timeout && attempt < MAX_SPOTIFY_RETRIES - 1 { 847 + let delay = INITIAL_RETRY_DELAY_MS * 2_u64.pow(attempt); 848 + tracing::warn!( 849 + attempt = attempt + 1, 850 + max_attempts = MAX_SPOTIFY_RETRIES, 851 + delay_ms = delay, 852 + operation = operation, 853 + "Spotify API timeout, retrying..." 854 + ); 855 + tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await; 856 + last_error = Some(e); 857 + } else { 858 + return Err(e); 859 + } 860 + } 861 + } 862 + } 863 + 864 + Err(last_error.unwrap_or_else(|| Error::msg("Max retries exceeded"))) 865 + }
+23 -6
crates/scrobbler/src/spotify/client.rs
··· 1 use super::types::{Album, Artist, SearchResponse}; 2 use anyhow::Error; 3 4 pub const BASE_URL: &str = "https://api.spotify.com/v1"; 5 6 pub struct SpotifyClient { 7 token: String, 8 } 9 10 impl SpotifyClient { 11 pub fn new(token: &str) -> Self { 12 SpotifyClient { 13 token: token.to_string(), 14 } 15 } 16 17 pub async fn search(&self, query: &str) -> Result<SearchResponse, Error> { 18 let url = format!("{}/search", BASE_URL); 19 - let client = reqwest::Client::new(); 20 - let response = client 21 .get(&url) 22 .bearer_auth(&self.token) 23 .query(&[("type", "track"), ("q", query)]) ··· 29 30 pub async fn get_album(&self, id: &str) -> Result<Option<Album>, Error> { 31 let url = format!("{}/albums/{}", BASE_URL, id); 32 - let client = reqwest::Client::new(); 33 - let response = client.get(&url).bearer_auth(&self.token).send().await?; 34 35 let headers = response.headers().clone(); 36 let data = response.text().await?; ··· 45 46 pub async fn get_artist(&self, id: &str) -> Result<Option<Artist>, Error> { 47 let url = format!("{}/artists/{}", BASE_URL, id); 48 - let client = reqwest::Client::new(); 49 - let response = client.get(&url).bearer_auth(&self.token).send().await?; 50 51 let headers = response.headers().clone(); 52 let data = response.text().await?;
··· 1 use super::types::{Album, Artist, SearchResponse}; 2 use anyhow::Error; 3 + use std::time::Duration; 4 5 pub const BASE_URL: &str = "https://api.spotify.com/v1"; 6 7 pub struct SpotifyClient { 8 token: String, 9 + client: reqwest::Client, 10 } 11 12 impl SpotifyClient { 13 pub fn new(token: &str) -> Self { 14 + let client = reqwest::Client::builder() 15 + .timeout(Duration::from_secs(30)) 16 + .connect_timeout(Duration::from_secs(10)) 17 + .build() 18 + .expect("Failed to build HTTP client"); 19 + 20 SpotifyClient { 21 token: token.to_string(), 22 + client, 23 } 24 } 25 26 pub async fn search(&self, query: &str) -> Result<SearchResponse, Error> { 27 let url = format!("{}/search", BASE_URL); 28 + let response = self 29 + .client 30 .get(&url) 31 .bearer_auth(&self.token) 32 .query(&[("type", "track"), ("q", query)]) ··· 38 39 pub async fn get_album(&self, id: &str) -> Result<Option<Album>, Error> { 40 let url = format!("{}/albums/{}", BASE_URL, id); 41 + let response = self 42 + .client 43 + .get(&url) 44 + .bearer_auth(&self.token) 45 + .send() 46 + .await?; 47 48 let headers = response.headers().clone(); 49 let data = response.text().await?; ··· 58 59 pub async fn get_artist(&self, id: &str) -> Result<Option<Artist>, Error> { 60 let url = format!("{}/artists/{}", BASE_URL, id); 61 + let response = self 62 + .client 63 + .get(&url) 64 + .bearer_auth(&self.token) 65 + .send() 66 + .await?; 67 68 let headers = response.headers().clone(); 69 let data = response.text().await?;
+2
crates/webscrobbler/src/musicbrainz/client.rs
··· 41 let redis = client.get_multiplexed_tokio_connection().await?; 42 let http = reqwest::Client::builder() 43 .user_agent(USER_AGENT) 44 .build() 45 .context("build http client")?; 46 let me = MusicbrainzClient {
··· 41 let redis = client.get_multiplexed_tokio_connection().await?; 42 let http = reqwest::Client::builder() 43 .user_agent(USER_AGENT) 44 + .timeout(Duration::from_secs(30)) 45 + .connect_timeout(Duration::from_secs(10)) 46 .build() 47 .context("build http client")?; 48 let me = MusicbrainzClient {
+52 -5
crates/webscrobbler/src/scrobbler.rs
··· 14 use rand::Rng; 15 use sqlx::{Pool, Postgres}; 16 17 pub async fn scrobble( 18 pool: &Pool<Postgres>, 19 cache: &Cache, ··· 135 136 tracing::info!(query = %query, "Searching on Spotify"); 137 138 - let result = spotify_client.search(&query).await?; 139 140 tracing::info!(total = %result.tracks.total, "Spotify search results"); 141 ··· 186 tracing::info!("Spotify (track)"); 187 let mut track = track.clone(); 188 189 - if let Some(album) = spotify_client.get_album(&track.album.id).await? { 190 track.album = album; 191 } 192 193 - if let Some(artist) = spotify_client 194 - .get_artist(&track.album.artists[0].id) 195 - .await? 196 { 197 track.album.artists[0] = artist; 198 } ··· 260 261 Ok(None) 262 }
··· 14 use rand::Rng; 15 use sqlx::{Pool, Postgres}; 16 17 + const MAX_SPOTIFY_RETRIES: u32 = 3; 18 + const INITIAL_RETRY_DELAY_MS: u64 = 1000; 19 + 20 pub async fn scrobble( 21 pool: &Pool<Postgres>, 22 cache: &Cache, ··· 138 139 tracing::info!(query = %query, "Searching on Spotify"); 140 141 + let result = 142 + retry_spotify_call(|| async { spotify_client.search(&query).await }, "search").await?; 143 144 tracing::info!(total = %result.tracks.total, "Spotify search results"); 145 ··· 190 tracing::info!("Spotify (track)"); 191 let mut track = track.clone(); 192 193 + if let Some(album) = retry_spotify_call( 194 + || async { spotify_client.get_album(&track.album.id).await }, 195 + "get_album", 196 + ) 197 + .await? 198 + { 199 track.album = album; 200 } 201 202 + if let Some(artist) = retry_spotify_call( 203 + || async { spotify_client.get_artist(&track.album.artists[0].id).await }, 204 + "get_artist", 205 + ) 206 + .await? 207 { 208 track.album.artists[0] = artist; 209 } ··· 271 272 Ok(None) 273 } 274 + 275 + async fn retry_spotify_call<F, Fut, T>(mut f: F, operation: &str) -> Result<T, Error> 276 + where 277 + F: FnMut() -> Fut, 278 + Fut: std::future::Future<Output = Result<T, Error>>, 279 + { 280 + let mut last_error = None; 281 + 282 + for attempt in 0..MAX_SPOTIFY_RETRIES { 283 + match f().await { 284 + Ok(result) => return Ok(result), 285 + Err(e) => { 286 + let is_timeout = e.to_string().contains("timed out") 287 + || e.to_string().contains("timeout") 288 + || e.to_string().contains("operation timed out"); 289 + 290 + if is_timeout && attempt < MAX_SPOTIFY_RETRIES - 1 { 291 + let delay = INITIAL_RETRY_DELAY_MS * 2_u64.pow(attempt); 292 + tracing::warn!( 293 + attempt = attempt + 1, 294 + max_attempts = MAX_SPOTIFY_RETRIES, 295 + delay_ms = delay, 296 + operation = operation, 297 + "Spotify API timeout, retrying..." 298 + ); 299 + tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await; 300 + last_error = Some(e); 301 + } else { 302 + return Err(e); 303 + } 304 + } 305 + } 306 + } 307 + 308 + Err(last_error.unwrap_or_else(|| Error::msg("Max retries exceeded"))) 309 + }
+23 -6
crates/webscrobbler/src/spotify/client.rs
··· 1 use super::types::{Album, Artist, SearchResponse}; 2 use anyhow::Error; 3 4 pub const BASE_URL: &str = "https://api.spotify.com/v1"; 5 6 pub struct SpotifyClient { 7 token: String, 8 } 9 10 impl SpotifyClient { 11 pub fn new(token: &str) -> Self { 12 SpotifyClient { 13 token: token.to_string(), 14 } 15 } 16 17 pub async fn search(&self, query: &str) -> Result<SearchResponse, Error> { 18 let url = format!("{}/search", BASE_URL); 19 - let client = reqwest::Client::new(); 20 - let response = client 21 .get(&url) 22 .bearer_auth(&self.token) 23 .query(&[("type", "track"), ("q", query)]) ··· 29 30 pub async fn get_album(&self, id: &str) -> Result<Option<Album>, Error> { 31 let url = format!("{}/albums/{}", BASE_URL, id); 32 - let client = reqwest::Client::new(); 33 - let response = client.get(&url).bearer_auth(&self.token).send().await?; 34 35 let headers = response.headers().clone(); 36 let data = response.text().await?; ··· 45 46 pub async fn get_artist(&self, id: &str) -> Result<Option<Artist>, Error> { 47 let url = format!("{}/artists/{}", BASE_URL, id); 48 - let client = reqwest::Client::new(); 49 - let response = client.get(&url).bearer_auth(&self.token).send().await?; 50 51 let headers = response.headers().clone(); 52 let data = response.text().await?;
··· 1 use super::types::{Album, Artist, SearchResponse}; 2 use anyhow::Error; 3 + use std::time::Duration; 4 5 pub const BASE_URL: &str = "https://api.spotify.com/v1"; 6 7 pub struct SpotifyClient { 8 token: String, 9 + client: reqwest::Client, 10 } 11 12 impl SpotifyClient { 13 pub fn new(token: &str) -> Self { 14 + let client = reqwest::Client::builder() 15 + .timeout(Duration::from_secs(30)) 16 + .connect_timeout(Duration::from_secs(10)) 17 + .build() 18 + .expect("Failed to build HTTP client"); 19 + 20 SpotifyClient { 21 token: token.to_string(), 22 + client, 23 } 24 } 25 26 pub async fn search(&self, query: &str) -> Result<SearchResponse, Error> { 27 let url = format!("{}/search", BASE_URL); 28 + let response = self 29 + .client 30 .get(&url) 31 .bearer_auth(&self.token) 32 .query(&[("type", "track"), ("q", query)]) ··· 38 39 pub async fn get_album(&self, id: &str) -> Result<Option<Album>, Error> { 40 let url = format!("{}/albums/{}", BASE_URL, id); 41 + let response = self 42 + .client 43 + .get(&url) 44 + .bearer_auth(&self.token) 45 + .send() 46 + .await?; 47 48 let headers = response.headers().clone(); 49 let data = response.text().await?; ··· 58 59 pub async fn get_artist(&self, id: &str) -> Result<Option<Artist>, Error> { 60 let url = format!("{}/artists/{}", BASE_URL, id); 61 + let response = self 62 + .client 63 + .get(&url) 64 + .bearer_auth(&self.token) 65 + .send() 66 + .await?; 67 68 let headers = response.headers().clone(); 69 let data = response.text().await?;