A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต
listenbrainz spotify atproto lastfm musicbrainz scrobbling

fix(spotify): refactor run function and improve track change detection logic #6

merged opened by tsiry-sandratraina.com targeting main from fix/spotify
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/sh.tangled.repo.pull/3lzmtmdd5fm22
+182 -332
Diff #0
+182 -332
crates/spotify/src/lib.rs
··· 1 + use anyhow::Error; 2 + use async_nats::connect; 3 + use owo_colors::OwoColorize; 4 + use reqwest::Client; 5 + use serde::{Deserialize, Serialize}; 6 + use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; 1 7 use std::{ 2 8 collections::HashMap, 3 9 env, 4 10 sync::{atomic::AtomicBool, Arc, Mutex}, 5 11 thread, 12 + time::{SystemTime, UNIX_EPOCH}, 6 13 }; 7 - 8 - use anyhow::Error; 9 - use async_nats::connect; 10 - use owo_colors::OwoColorize; 11 - use reqwest::Client; 12 - use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; 13 14 use tokio_stream::StreamExt; 14 15 15 16 use crate::{ ··· 31 32 pub mod types; 32 33 33 34 pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1"; 35 + pub const MAX_USERS: usize = 100; 36 + 37 + #[derive(Serialize, Deserialize, Debug, Clone)] 38 + struct TrackState { 39 + track_id: String, 40 + progress_ms: u64, 41 + scrobbled: bool, 42 + last_updated: u64, 43 + } 34 44 35 45 pub async fn run() -> Result<(), Error> { 36 46 let cache = Cache::new()?; ··· 46 56 let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?; 47 57 println!("Subscribed to {}", "rocksky.spotify.user".bright_green()); 48 58 49 - let users = find_spotify_users(&pool, 0, 100).await?; 59 + let users = find_spotify_users(&pool, 0, MAX_USERS).await?; 50 60 println!("Found {} users", users.len().bright_green()); 51 61 52 - // Shared HashMap to manage threads and their stop flags 53 62 let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> = 54 63 Arc::new(Mutex::new(HashMap::new())); 55 64 56 - // Start threads for all users 57 65 for user in users { 58 66 let email = user.0.clone(); 59 67 let token = user.1.clone(); ··· 83 91 email.bright_green(), 84 92 e.to_string().bright_red() 85 93 ); 86 - 87 94 // If there's an error, publish a message to restart the thread 88 95 match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) { 89 96 Ok(_) => { ··· 106 113 }); 107 114 } 108 115 109 - // Handle subscription messages 110 116 while let Some(message) = sub.next().await { 111 117 let user_id = String::from_utf8(message.payload.to_vec()).unwrap(); 112 118 println!( ··· 116 122 117 123 let mut thread_map = thread_map.lock().unwrap(); 118 124 119 - // Check if the user exists in the thread map 120 125 if let Some(stop_flag) = thread_map.get(&user_id) { 121 - // Stop the existing thread 122 126 stop_flag.store(true, std::sync::atomic::Ordering::Relaxed); 123 127 124 - // Create a new stop flag and restart the thread 125 128 let new_stop_flag = Arc::new(AtomicBool::new(false)); 126 129 thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag)); 127 130 128 131 let user = find_spotify_user(&pool, &user_id).await?; 129 - 130 132 if user.is_none() { 131 133 println!( 132 134 "Spotify user not found: {}, skipping", ··· 136 138 } 137 139 138 140 let user = user.unwrap(); 139 - 140 141 let email = user.0.clone(); 141 142 let token = user.1.clone(); 142 143 let did = user.2.clone(); ··· 166 167 } 167 168 } 168 169 }); 169 - 170 170 println!("Restarted thread for user: {}", user_id.bright_green()); 171 171 } else { 172 172 println!( ··· 236 236 let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?; 237 237 238 238 let client = Client::new(); 239 - 240 239 let response = client 241 240 .post("https://accounts.spotify.com/api/token") 242 241 .basic_auth(&client_id, Some(client_secret)) ··· 247 246 ]) 248 247 .send() 249 248 .await?; 249 + 250 250 let token = response.json::<AccessToken>().await?; 251 251 Ok(token) 252 252 } ··· 256 256 user_id: &str, 257 257 token: &str, 258 258 ) -> Result<Option<(CurrentlyPlaying, bool)>, Error> { 259 + // Check if we have cached data 259 260 if let Ok(Some(data)) = cache.get(user_id) { 260 261 println!( 261 262 "{} {}", ··· 265 266 if data == "No content" { 266 267 return Ok(None); 267 268 } 268 - let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data); 269 269 270 + let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data); 270 271 if decoded_data.is_err() { 271 272 println!( 272 273 "{} {} {}", ··· 276 277 ); 277 278 cache.setex(user_id, "No content", 10)?; 278 279 cache.del(&format!("{}:current", user_id))?; 280 + cache.del(&format!("{}:track_state", user_id))?; 279 281 return Ok(None); 280 282 } 281 283 282 284 let data: CurrentlyPlaying = decoded_data.unwrap(); 283 - // detect if the song has changed 284 - let previous = cache.get(&format!("{}:previous", user_id)); 285 285 286 - if previous.is_err() { 287 - println!( 288 - "{} redis error: {}", 289 - format!("[{}]", user_id).bright_green(), 290 - previous.unwrap_err().to_string().bright_red() 291 - ); 292 - return Ok(None); 293 - } 294 - 295 - let previous = previous.unwrap(); 296 - 297 - let changed = match previous { 298 - Some(previous) => { 299 - if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() { 300 - println!( 301 - "{} {} {}", 302 - format!("[{}]", user_id).bright_green(), 303 - "Previous cache is invalid", 304 - previous 305 - ); 306 - return Ok(None); 307 - } 308 - 309 - let previous: CurrentlyPlaying = serde_json::from_str(&previous)?; 310 - if previous.item.is_none() && data.item.is_some() { 311 - return Ok(Some((data, true))); 312 - } 313 - 314 - if previous.item.is_some() && data.item.is_none() { 315 - return Ok(Some((data, false))); 316 - } 317 - 318 - if previous.item.is_none() && data.item.is_none() { 319 - return Ok(Some((data, false))); 320 - } 321 - 322 - let previous_item = previous.item.unwrap(); 323 - let data_item = data.clone().item.unwrap(); 324 - previous_item.id != data_item.id 325 - && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0) 326 - } 327 - _ => true, 328 - }; 286 + let changed = detect_track_change(&cache, user_id, &data)?; 329 287 return Ok(Some((data, changed))); 330 288 } 331 289 ··· 343 301 344 302 if status == 429 { 345 303 println!( 346 - "{} Too many requests, retry-after {}", 304 + "{} Too many requests, retry-after {}", 347 305 format!("[{}]", user_id).bright_green(), 348 306 headers 349 307 .get("retry-after") ··· 355 313 return Ok(None); 356 314 } 357 315 358 - let previous = cache.get(&format!("{}:previous", user_id)); 359 - if previous.is_err() { 360 - println!( 361 - "{} redis error: {}", 362 - format!("[{}]", user_id).bright_green(), 363 - previous.unwrap_err().to_string().bright_red() 364 - ); 365 - return Ok(None); 366 - } 367 - 368 - let previous = previous.unwrap(); 369 - 370 - // check if status code is 204 371 316 if status == 204 { 372 317 println!("No content"); 373 - match cache.setex( 374 - user_id, 375 - "No content", 376 - match previous.is_none() { 377 - true => 30, 378 - false => 10, 379 - }, 380 - ) { 381 - Ok(_) => {} 382 - Err(e) => { 383 - println!( 384 - "{} redis error: {}", 385 - format!("[{}]", user_id).bright_green(), 386 - e.to_string().bright_red() 387 - ); 388 - return Ok(None); 389 - } 390 - } 391 - match cache.del(&format!("{}:current", user_id)) { 392 - Ok(_) => {} 393 - Err(e) => { 394 - println!( 395 - "{} redis error: {}", 396 - format!("[{}]", user_id).bright_green(), 397 - e.to_string().bright_red() 398 - ); 399 - return Ok(None); 400 - } 401 - } 318 + // Clear track state when nothing is playing 319 + cache.del(&format!("{}:track_state", user_id))?; 320 + 321 + let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() { 322 + 30 323 + } else { 324 + 10 325 + }; 326 + 327 + cache.setex(user_id, "No content", ttl)?; 328 + cache.del(&format!("{}:current", user_id))?; 402 329 return Ok(None); 403 330 } 404 331 ··· 409 336 "Invalid data received".red(), 410 337 data 411 338 ); 412 - match cache.setex(user_id, "No content", 10) { 413 - Ok(_) => {} 414 - Err(e) => { 415 - println!( 416 - "{} redis error: {}", 417 - format!("[{}]", user_id).bright_green(), 418 - e.to_string().bright_red() 419 - ); 420 - return Ok(None); 421 - } 422 - } 423 - match cache.del(&format!("{}:current", user_id)) { 424 - Ok(_) => {} 425 - Err(e) => { 426 - println!( 427 - "{} redis error: {}", 428 - format!("[{}]", user_id).bright_green(), 429 - e.to_string().bright_red() 430 - ); 431 - return Ok(None); 432 - } 433 - } 339 + cache.setex(user_id, "No content", 10)?; 340 + cache.del(&format!("{}:current", user_id))?; 341 + cache.del(&format!("{}:track_state", user_id))?; 434 342 return Ok(None); 435 343 } 436 344 437 - let data = serde_json::from_str::<CurrentlyPlaying>(&data)?; 438 - 439 - match cache.setex( 440 - user_id, 441 - &serde_json::to_string(&data)?, 442 - match previous.is_none() { 443 - true => 30, 444 - false => 15, 445 - }, 446 - ) { 447 - Ok(_) => {} 448 - Err(e) => { 449 - println!( 450 - "{} redis error: {}", 451 - format!("[{}]", user_id).bright_green(), 452 - e.to_string().bright_red() 453 - ); 454 - return Ok(None); 455 - } 456 - } 457 - match cache.del(&format!("{}:current", user_id)) { 458 - Ok(_) => {} 459 - Err(e) => { 460 - println!( 461 - "{} redis error: {}", 462 - format!("[{}]", user_id).bright_green(), 463 - e.to_string().bright_red() 464 - ); 465 - return Ok(None); 466 - } 467 - } 345 + let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?; 468 346 469 - // detect if the song has changed 470 - let previous = cache.get(&format!("{}:previous", user_id)); 347 + let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() { 348 + 30 349 + } else { 350 + 15 351 + }; 471 352 472 - if previous.is_err() { 473 - println!( 474 - "{} redis error: {}", 475 - format!("[{}]", user_id).bright_green(), 476 - previous.unwrap_err().to_string().bright_red() 477 - ); 478 - return Ok(None); 479 - } 353 + cache.setex( 354 + user_id, 355 + &serde_json::to_string(&currently_playing_data)?, 356 + ttl, 357 + )?; 358 + cache.del(&format!("{}:current", user_id))?; 480 359 481 - let previous = previous.unwrap(); 482 - let changed = match previous { 483 - Some(previous) => { 484 - if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() { 485 - println!( 486 - "{} {} {}", 487 - format!("[{}]", user_id).bright_green(), 488 - "Previous cache is invalid", 489 - previous 490 - ); 491 - return Ok(None); 492 - } 360 + // Detect track change and update track state 361 + let changed = detect_track_change(&cache, user_id, &currently_playing_data)?; 493 362 494 - let previous: CurrentlyPlaying = serde_json::from_str(&previous)?; 495 - if previous.item.is_none() || data.item.is_none() { 496 - return Ok(Some((data, false))); 497 - } 363 + // Update previous song cache 364 + cache.setex( 365 + &format!("{}:previous", user_id), 366 + &serde_json::to_string(&currently_playing_data)?, 367 + 600, 368 + )?; 498 369 499 - let previous_item = previous.item.unwrap(); 500 - let data_item = data.clone().item.unwrap(); 370 + Ok(Some((currently_playing_data, changed))) 371 + } 501 372 502 - previous_item.id != data_item.id 503 - && previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0) 373 + fn detect_track_change( 374 + cache: &Cache, 375 + user_id: &str, 376 + current: &CurrentlyPlaying, 377 + ) -> Result<bool, Error> { 378 + let track_state_key = format!("{}:track_state", user_id); 379 + 380 + let now = SystemTime::now() 381 + .duration_since(UNIX_EPOCH) 382 + .unwrap() 383 + .as_secs(); 384 + 385 + let current_item = match &current.item { 386 + Some(item) => item, 387 + None => { 388 + let _ = cache.del(&track_state_key); 389 + return Ok(false); 504 390 } 505 - _ => false, 506 391 }; 507 392 508 - // save as previous song 509 - match cache.setex( 510 - &format!("{}:previous", user_id), 511 - &serde_json::to_string(&data)?, 512 - 600, 513 - ) { 514 - Ok(_) => {} 515 - Err(e) => { 516 - println!( 517 - "{} redis error: {}", 518 - format!("[{}]", user_id).bright_green(), 519 - e.to_string().bright_red() 520 - ); 521 - return Ok(None); 393 + let previous_state = cache.get(&track_state_key)?; 394 + 395 + let changed = match previous_state { 396 + Some(state_str) => { 397 + if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) { 398 + if prev_state.track_id != current_item.id { 399 + true 400 + } else { 401 + // Same track - check if we should scrobble based on progress and time 402 + let progress_diff = 403 + current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64; 404 + let time_diff = now - prev_state.last_updated; 405 + 406 + // Only consider it changed if: 407 + // 1. We haven't scrobbled this track yet 408 + // 2. Significant progress was made (more than 10 seconds or reasonable time passed) 409 + // 3. Track is actually playing 410 + !prev_state.scrobbled 411 + && current.is_playing 412 + && (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0)) 413 + } 414 + } else { 415 + // Invalid previous state, treat as changed 416 + true 417 + } 522 418 } 523 - } 419 + None => { 420 + // No previous state, treat as new track 421 + current.is_playing 422 + } 423 + }; 424 + 425 + let new_state = TrackState { 426 + track_id: current_item.id.clone(), 427 + progress_ms: current.progress_ms.unwrap_or(0), 428 + scrobbled: changed, // Mark as scrobbled if we're reporting a change 429 + last_updated: now, 430 + }; 524 431 525 - Ok(Some((data, changed))) 432 + cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?; 433 + 434 + Ok(changed) 526 435 } 527 436 528 437 pub async fn get_artist( ··· 554 463 return Ok(None); 555 464 } 556 465 557 - match cache.setex(artist_id, &data, 20) { 558 - Ok(_) => {} 559 - Err(e) => { 560 - println!( 561 - "{} redis error: {}", 562 - format!("[{}]", artist_id).bright_green(), 563 - e.to_string().bright_red() 564 - ); 565 - return Ok(None); 566 - } 567 - } 568 - 466 + cache.setex(artist_id, &data, 20)?; 569 467 Ok(Some(serde_json::from_str(&data)?)) 570 468 } 571 469 ··· 594 492 return Ok(None); 595 493 } 596 494 597 - match cache.setex(album_id, &data, 20) { 598 - Ok(_) => {} 599 - Err(e) => { 600 - println!( 601 - "{} redis error: {}", 602 - format!("[{}]", album_id).bright_green(), 603 - e.to_string().bright_red() 604 - ); 605 - return Ok(None); 606 - } 607 - } 608 - 495 + cache.setex(album_id, &data, 20)?; 609 496 Ok(Some(serde_json::from_str(&data)?)) 610 497 } 611 498 ··· 637 524 638 525 let headers = response.headers().clone(); 639 526 let data = response.text().await?; 527 + 640 528 if data == "Too many requests" { 641 529 println!( 642 530 "> retry-after {}", ··· 647 535 } 648 536 649 537 let album_tracks: AlbumTracks = serde_json::from_str(&data)?; 650 - 651 538 if album_tracks.items.is_empty() { 652 539 break; 653 540 } ··· 657 544 } 658 545 659 546 let all_tracks_json = serde_json::to_string(&all_tracks)?; 660 - match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) { 661 - Ok(_) => {} 662 - Err(e) => { 663 - println!( 664 - "{} redis error: {}", 665 - format!("[{}]", album_id).bright_green(), 666 - e.to_string().bright_red() 667 - ); 668 - } 669 - } 547 + cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?; 670 548 671 549 Ok(AlbumTracks { 672 550 items: all_tracks, ··· 681 559 ) -> Result<Vec<(String, String, String, String)>, Error> { 682 560 let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 683 561 r#" 684 - SELECT * FROM spotify_tokens 685 - LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 686 - LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 687 - LIMIT $1 OFFSET $2 688 - "#, 562 + SELECT * FROM spotify_tokens 563 + LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 564 + LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 565 + LIMIT $1 OFFSET $2 566 + "#, 689 567 ) 690 568 .bind(limit as i64) 691 569 .bind(offset as i64) ··· 693 571 .await?; 694 572 695 573 let mut user_tokens = vec![]; 696 - 697 574 for result in &results { 698 575 let token = decrypt_aes_256_ctr( 699 576 &result.refresh_token, ··· 716 593 ) -> Result<Option<(String, String, String)>, Error> { 717 594 let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as( 718 595 r#" 719 - SELECT * FROM spotify_tokens 720 - LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 721 - LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 722 - WHERE spotify_accounts.email = $1 723 - "#, 596 + SELECT * FROM spotify_tokens 597 + LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id 598 + LEFT JOIN users ON spotify_accounts.user_id = users.xata_id 599 + WHERE spotify_accounts.email = $1 600 + "#, 724 601 ) 725 602 .bind(email) 726 603 .fetch_all(pool) ··· 751 628 "Checking currently playing".cyan() 752 629 ); 753 630 754 - let stop_flag_clone = stop_flag.clone(); 755 - let spotify_email_clone = spotify_email.clone(); 756 - let cache_clone = cache.clone(); 757 - thread::spawn(move || { 758 - loop { 759 - if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) { 760 - println!( 761 - "{} Stopping Thread", 762 - format!("[{}]", spotify_email_clone).bright_green() 763 - ); 764 - break; 765 - } 766 - if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) { 767 - if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() { 768 - thread::sleep(std::time::Duration::from_millis(800)); 769 - continue; 770 - } 771 - 772 - let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?; 773 - 774 - if let Some(item) = current_song.item.clone() { 775 - if current_song.is_playing 776 - && current_song.progress_ms.unwrap_or(0) < item.duration_ms.into() 777 - { 778 - current_song.progress_ms = 779 - Some(current_song.progress_ms.unwrap_or(0) + 800); 780 - match cache_clone.setex( 781 - &format!("{}:current", spotify_email_clone), 782 - &serde_json::to_string(&current_song)?, 783 - 16, 784 - ) { 785 - Ok(_) => {} 786 - Err(e) => { 787 - println!( 788 - "{} redis error: {}", 789 - format!("[{}]", spotify_email_clone).bright_green(), 790 - e.to_string().bright_red() 791 - ); 792 - } 793 - } 794 - thread::sleep(std::time::Duration::from_millis(800)); 795 - continue; 796 - } 797 - } 798 - continue; 799 - } 800 - 801 - if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) { 802 - if cached == "No content" { 803 - thread::sleep(std::time::Duration::from_millis(800)); 804 - continue; 805 - } 806 - match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) { 807 - Ok(_) => {} 808 - Err(e) => { 809 - println!( 810 - "{} redis error: {}", 811 - format!("[{}]", spotify_email_clone).bright_green(), 812 - e.to_string().bright_red() 813 - ); 814 - } 815 - } 816 - } 817 - 818 - thread::sleep(std::time::Duration::from_millis(800)); 819 - } 820 - Ok::<(), Error>(()) 821 - }); 631 + // Remove the separate progress tracking thread - it was causing race conditions 632 + // and unnecessary complexity 822 633 823 634 loop { 824 635 if stop_flag.load(std::sync::atomic::Ordering::Relaxed) { ··· 828 639 ); 829 640 break; 830 641 } 642 + 831 643 let spotify_email = spotify_email.clone(); 832 644 let token = token.clone(); 833 645 let did = did.clone(); ··· 842 654 format!("[{}]", spotify_email).bright_green(), 843 655 e.to_string().bright_red() 844 656 ); 845 - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 657 + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; 846 658 continue; 847 659 } 848 660 }; ··· 854 666 format!("[{}]", spotify_email).bright_green(), 855 667 "No song playing".yellow() 856 668 ); 857 - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 669 + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; 858 670 continue; 859 671 } 672 + 860 673 let data_item = data.item.unwrap(); 861 674 println!( 862 675 "{} {} is_playing: {} changed: {}", ··· 866 679 changed 867 680 ); 868 681 869 - if changed { 870 - scrobble(cache.clone(), &spotify_email, &did, &token).await?; 682 + // Only scrobble if there's a genuine track change and the track is playing 683 + if changed && data.is_playing { 684 + // Add a small delay to prevent rapid duplicate scrobbles 685 + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; 686 + 687 + match scrobble(cache.clone(), &spotify_email, &did, &token).await { 688 + Ok(_) => { 689 + println!( 690 + "{} {}", 691 + format!("[{}]", spotify_email).bright_green(), 692 + "Scrobbled successfully".green() 693 + ); 694 + } 695 + Err(e) => { 696 + println!( 697 + "{} Scrobble failed: {}", 698 + format!("[{}]", spotify_email).bright_green(), 699 + e.to_string().bright_red() 700 + ); 701 + } 702 + } 703 + 704 + // Spawn background task for library updates 705 + let cache_clone = cache.clone(); 706 + let token_clone = token.clone(); 707 + let spotify_email_clone = spotify_email.clone(); 708 + let did_clone = did.clone(); 709 + let album_id = data_item.album.id.clone(); 871 710 872 711 thread::spawn(move || { 873 712 let rt = tokio::runtime::Runtime::new().unwrap(); 874 713 match rt.block_on(async { 875 - get_album_tracks(cache.clone(), &data_item.album.id, &token).await?; 876 - get_album(cache.clone(), &data_item.album.id, &token).await?; 877 - update_library(cache.clone(), &spotify_email, &did, &token).await?; 714 + get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?; 715 + get_album(cache_clone.clone(), &album_id, &token_clone).await?; 716 + update_library( 717 + cache_clone.clone(), 718 + &spotify_email_clone, 719 + &did_clone, 720 + &token_clone, 721 + ) 722 + .await?; 878 723 Ok::<(), Error>(()) 879 724 }) { 880 - Ok(_) => {} 725 + Ok(_) => { 726 + println!( 727 + "{} Library updated successfully", 728 + format!("[{}]", spotify_email_clone).bright_green() 729 + ); 730 + } 881 731 Err(e) => { 882 732 println!( 883 - "{} {}", 884 - format!("[{}]", spotify_email).bright_green(), 733 + "{} Library update failed: {}", 734 + format!("[{}]", spotify_email_clone).bright_green(), 885 735 e.to_string().bright_red() 886 736 ); 887 737 } ··· 890 740 } 891 741 } 892 742 893 - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; 743 + tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; 894 744 } 895 745 896 746 Ok(())

Submissions

sign up or login to add to the discussion
tsiry-sandratraina.com submitted #0
1 commit
expand
fix: refactor run function and improve track change detection logic
pull request successfully merged