+182
-332
crates/spotify/src/lib.rs
+182
-332
crates/spotify/src/lib.rs
···
1
+
use anyhow::Error;
2
+
use async_nats::connect;
3
+
use owo_colors::OwoColorize;
4
+
use reqwest::Client;
5
+
use serde::{Deserialize, Serialize};
6
+
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
1
7
use std::{
2
8
collections::HashMap,
3
9
env,
4
10
sync::{atomic::AtomicBool, Arc, Mutex},
5
11
thread,
12
+
time::{SystemTime, UNIX_EPOCH},
6
13
};
7
-
8
-
use anyhow::Error;
9
-
use async_nats::connect;
10
-
use owo_colors::OwoColorize;
11
-
use reqwest::Client;
12
-
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
13
14
use tokio_stream::StreamExt;
14
15
15
16
use crate::{
···
31
32
pub mod types;
32
33
33
34
pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
35
+
pub const MAX_USERS: usize = 100;
36
+
37
+
#[derive(Serialize, Deserialize, Debug, Clone)]
38
+
struct TrackState {
39
+
track_id: String,
40
+
progress_ms: u64,
41
+
scrobbled: bool,
42
+
last_updated: u64,
43
+
}
34
44
35
45
pub async fn run() -> Result<(), Error> {
36
46
let cache = Cache::new()?;
···
46
56
let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
47
57
println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
48
58
49
-
let users = find_spotify_users(&pool, 0, 100).await?;
59
+
let users = find_spotify_users(&pool, 0, MAX_USERS).await?;
50
60
println!("Found {} users", users.len().bright_green());
51
61
52
-
// Shared HashMap to manage threads and their stop flags
53
62
let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
54
63
Arc::new(Mutex::new(HashMap::new()));
55
64
56
-
// Start threads for all users
57
65
for user in users {
58
66
let email = user.0.clone();
59
67
let token = user.1.clone();
···
83
91
email.bright_green(),
84
92
e.to_string().bright_red()
85
93
);
86
-
87
94
// If there's an error, publish a message to restart the thread
88
95
match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
89
96
Ok(_) => {
···
106
113
});
107
114
}
108
115
109
-
// Handle subscription messages
110
116
while let Some(message) = sub.next().await {
111
117
let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
112
118
println!(
···
116
122
117
123
let mut thread_map = thread_map.lock().unwrap();
118
124
119
-
// Check if the user exists in the thread map
120
125
if let Some(stop_flag) = thread_map.get(&user_id) {
121
-
// Stop the existing thread
122
126
stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
123
127
124
-
// Create a new stop flag and restart the thread
125
128
let new_stop_flag = Arc::new(AtomicBool::new(false));
126
129
thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
127
130
128
131
let user = find_spotify_user(&pool, &user_id).await?;
129
-
130
132
if user.is_none() {
131
133
println!(
132
134
"Spotify user not found: {}, skipping",
···
136
138
}
137
139
138
140
let user = user.unwrap();
139
-
140
141
let email = user.0.clone();
141
142
let token = user.1.clone();
142
143
let did = user.2.clone();
···
166
167
}
167
168
}
168
169
});
169
-
170
170
println!("Restarted thread for user: {}", user_id.bright_green());
171
171
} else {
172
172
println!(
···
236
236
let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
237
237
238
238
let client = Client::new();
239
-
240
239
let response = client
241
240
.post("https://accounts.spotify.com/api/token")
242
241
.basic_auth(&client_id, Some(client_secret))
···
247
246
])
248
247
.send()
249
248
.await?;
249
+
250
250
let token = response.json::<AccessToken>().await?;
251
251
Ok(token)
252
252
}
···
256
256
user_id: &str,
257
257
token: &str,
258
258
) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
259
+
// Check if we have cached data
259
260
if let Ok(Some(data)) = cache.get(user_id) {
260
261
println!(
261
262
"{} {}",
···
265
266
if data == "No content" {
266
267
return Ok(None);
267
268
}
268
-
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
269
269
270
+
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
270
271
if decoded_data.is_err() {
271
272
println!(
272
273
"{} {} {}",
···
276
277
);
277
278
cache.setex(user_id, "No content", 10)?;
278
279
cache.del(&format!("{}:current", user_id))?;
280
+
cache.del(&format!("{}:track_state", user_id))?;
279
281
return Ok(None);
280
282
}
281
283
282
284
let data: CurrentlyPlaying = decoded_data.unwrap();
283
-
// detect if the song has changed
284
-
let previous = cache.get(&format!("{}:previous", user_id));
285
285
286
-
if previous.is_err() {
287
-
println!(
288
-
"{} redis error: {}",
289
-
format!("[{}]", user_id).bright_green(),
290
-
previous.unwrap_err().to_string().bright_red()
291
-
);
292
-
return Ok(None);
293
-
}
294
-
295
-
let previous = previous.unwrap();
296
-
297
-
let changed = match previous {
298
-
Some(previous) => {
299
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
300
-
println!(
301
-
"{} {} {}",
302
-
format!("[{}]", user_id).bright_green(),
303
-
"Previous cache is invalid",
304
-
previous
305
-
);
306
-
return Ok(None);
307
-
}
308
-
309
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
310
-
if previous.item.is_none() && data.item.is_some() {
311
-
return Ok(Some((data, true)));
312
-
}
313
-
314
-
if previous.item.is_some() && data.item.is_none() {
315
-
return Ok(Some((data, false)));
316
-
}
317
-
318
-
if previous.item.is_none() && data.item.is_none() {
319
-
return Ok(Some((data, false)));
320
-
}
321
-
322
-
let previous_item = previous.item.unwrap();
323
-
let data_item = data.clone().item.unwrap();
324
-
previous_item.id != data_item.id
325
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
326
-
}
327
-
_ => true,
328
-
};
286
+
let changed = detect_track_change(&cache, user_id, &data)?;
329
287
return Ok(Some((data, changed)));
330
288
}
331
289
···
343
301
344
302
if status == 429 {
345
303
println!(
346
-
"{} Too many requests, retry-after {}",
304
+
"{} Too many requests, retry-after {}",
347
305
format!("[{}]", user_id).bright_green(),
348
306
headers
349
307
.get("retry-after")
···
355
313
return Ok(None);
356
314
}
357
315
358
-
let previous = cache.get(&format!("{}:previous", user_id));
359
-
if previous.is_err() {
360
-
println!(
361
-
"{} redis error: {}",
362
-
format!("[{}]", user_id).bright_green(),
363
-
previous.unwrap_err().to_string().bright_red()
364
-
);
365
-
return Ok(None);
366
-
}
367
-
368
-
let previous = previous.unwrap();
369
-
370
-
// check if status code is 204
371
316
if status == 204 {
372
317
println!("No content");
373
-
match cache.setex(
374
-
user_id,
375
-
"No content",
376
-
match previous.is_none() {
377
-
true => 30,
378
-
false => 10,
379
-
},
380
-
) {
381
-
Ok(_) => {}
382
-
Err(e) => {
383
-
println!(
384
-
"{} redis error: {}",
385
-
format!("[{}]", user_id).bright_green(),
386
-
e.to_string().bright_red()
387
-
);
388
-
return Ok(None);
389
-
}
390
-
}
391
-
match cache.del(&format!("{}:current", user_id)) {
392
-
Ok(_) => {}
393
-
Err(e) => {
394
-
println!(
395
-
"{} redis error: {}",
396
-
format!("[{}]", user_id).bright_green(),
397
-
e.to_string().bright_red()
398
-
);
399
-
return Ok(None);
400
-
}
401
-
}
318
+
// Clear track state when nothing is playing
319
+
cache.del(&format!("{}:track_state", user_id))?;
320
+
321
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
322
+
30
323
+
} else {
324
+
10
325
+
};
326
+
327
+
cache.setex(user_id, "No content", ttl)?;
328
+
cache.del(&format!("{}:current", user_id))?;
402
329
return Ok(None);
403
330
}
404
331
···
409
336
"Invalid data received".red(),
410
337
data
411
338
);
412
-
match cache.setex(user_id, "No content", 10) {
413
-
Ok(_) => {}
414
-
Err(e) => {
415
-
println!(
416
-
"{} redis error: {}",
417
-
format!("[{}]", user_id).bright_green(),
418
-
e.to_string().bright_red()
419
-
);
420
-
return Ok(None);
421
-
}
422
-
}
423
-
match cache.del(&format!("{}:current", user_id)) {
424
-
Ok(_) => {}
425
-
Err(e) => {
426
-
println!(
427
-
"{} redis error: {}",
428
-
format!("[{}]", user_id).bright_green(),
429
-
e.to_string().bright_red()
430
-
);
431
-
return Ok(None);
432
-
}
433
-
}
339
+
cache.setex(user_id, "No content", 10)?;
340
+
cache.del(&format!("{}:current", user_id))?;
341
+
cache.del(&format!("{}:track_state", user_id))?;
434
342
return Ok(None);
435
343
}
436
344
437
-
let data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
438
-
439
-
match cache.setex(
440
-
user_id,
441
-
&serde_json::to_string(&data)?,
442
-
match previous.is_none() {
443
-
true => 30,
444
-
false => 15,
445
-
},
446
-
) {
447
-
Ok(_) => {}
448
-
Err(e) => {
449
-
println!(
450
-
"{} redis error: {}",
451
-
format!("[{}]", user_id).bright_green(),
452
-
e.to_string().bright_red()
453
-
);
454
-
return Ok(None);
455
-
}
456
-
}
457
-
match cache.del(&format!("{}:current", user_id)) {
458
-
Ok(_) => {}
459
-
Err(e) => {
460
-
println!(
461
-
"{} redis error: {}",
462
-
format!("[{}]", user_id).bright_green(),
463
-
e.to_string().bright_red()
464
-
);
465
-
return Ok(None);
466
-
}
467
-
}
345
+
let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
468
346
469
-
// detect if the song has changed
470
-
let previous = cache.get(&format!("{}:previous", user_id));
347
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
348
+
30
349
+
} else {
350
+
15
351
+
};
471
352
472
-
if previous.is_err() {
473
-
println!(
474
-
"{} redis error: {}",
475
-
format!("[{}]", user_id).bright_green(),
476
-
previous.unwrap_err().to_string().bright_red()
477
-
);
478
-
return Ok(None);
479
-
}
353
+
cache.setex(
354
+
user_id,
355
+
&serde_json::to_string(¤tly_playing_data)?,
356
+
ttl,
357
+
)?;
358
+
cache.del(&format!("{}:current", user_id))?;
480
359
481
-
let previous = previous.unwrap();
482
-
let changed = match previous {
483
-
Some(previous) => {
484
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
485
-
println!(
486
-
"{} {} {}",
487
-
format!("[{}]", user_id).bright_green(),
488
-
"Previous cache is invalid",
489
-
previous
490
-
);
491
-
return Ok(None);
492
-
}
360
+
// Detect track change and update track state
361
+
let changed = detect_track_change(&cache, user_id, ¤tly_playing_data)?;
493
362
494
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
495
-
if previous.item.is_none() || data.item.is_none() {
496
-
return Ok(Some((data, false)));
497
-
}
363
+
// Update previous song cache
364
+
cache.setex(
365
+
&format!("{}:previous", user_id),
366
+
&serde_json::to_string(¤tly_playing_data)?,
367
+
600,
368
+
)?;
498
369
499
-
let previous_item = previous.item.unwrap();
500
-
let data_item = data.clone().item.unwrap();
370
+
Ok(Some((currently_playing_data, changed)))
371
+
}
501
372
502
-
previous_item.id != data_item.id
503
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
373
+
fn detect_track_change(
374
+
cache: &Cache,
375
+
user_id: &str,
376
+
current: &CurrentlyPlaying,
377
+
) -> Result<bool, Error> {
378
+
let track_state_key = format!("{}:track_state", user_id);
379
+
380
+
let now = SystemTime::now()
381
+
.duration_since(UNIX_EPOCH)
382
+
.unwrap()
383
+
.as_secs();
384
+
385
+
let current_item = match ¤t.item {
386
+
Some(item) => item,
387
+
None => {
388
+
let _ = cache.del(&track_state_key);
389
+
return Ok(false);
504
390
}
505
-
_ => false,
506
391
};
507
392
508
-
// save as previous song
509
-
match cache.setex(
510
-
&format!("{}:previous", user_id),
511
-
&serde_json::to_string(&data)?,
512
-
600,
513
-
) {
514
-
Ok(_) => {}
515
-
Err(e) => {
516
-
println!(
517
-
"{} redis error: {}",
518
-
format!("[{}]", user_id).bright_green(),
519
-
e.to_string().bright_red()
520
-
);
521
-
return Ok(None);
393
+
let previous_state = cache.get(&track_state_key)?;
394
+
395
+
let changed = match previous_state {
396
+
Some(state_str) => {
397
+
if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) {
398
+
if prev_state.track_id != current_item.id {
399
+
true
400
+
} else {
401
+
// Same track - check if we should scrobble based on progress and time
402
+
let progress_diff =
403
+
current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64;
404
+
let time_diff = now - prev_state.last_updated;
405
+
406
+
// Only consider it changed if:
407
+
// 1. We haven't scrobbled this track yet
408
+
// 2. Significant progress was made (more than 10 seconds or reasonable time passed)
409
+
// 3. Track is actually playing
410
+
!prev_state.scrobbled
411
+
&& current.is_playing
412
+
&& (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0))
413
+
}
414
+
} else {
415
+
// Invalid previous state, treat as changed
416
+
true
417
+
}
522
418
}
523
-
}
419
+
None => {
420
+
// No previous state, treat as new track
421
+
current.is_playing
422
+
}
423
+
};
424
+
425
+
let new_state = TrackState {
426
+
track_id: current_item.id.clone(),
427
+
progress_ms: current.progress_ms.unwrap_or(0),
428
+
scrobbled: changed, // Mark as scrobbled if we're reporting a change
429
+
last_updated: now,
430
+
};
524
431
525
-
Ok(Some((data, changed)))
432
+
cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?;
433
+
434
+
Ok(changed)
526
435
}
527
436
528
437
pub async fn get_artist(
···
554
463
return Ok(None);
555
464
}
556
465
557
-
match cache.setex(artist_id, &data, 20) {
558
-
Ok(_) => {}
559
-
Err(e) => {
560
-
println!(
561
-
"{} redis error: {}",
562
-
format!("[{}]", artist_id).bright_green(),
563
-
e.to_string().bright_red()
564
-
);
565
-
return Ok(None);
566
-
}
567
-
}
568
-
466
+
cache.setex(artist_id, &data, 20)?;
569
467
Ok(Some(serde_json::from_str(&data)?))
570
468
}
571
469
···
594
492
return Ok(None);
595
493
}
596
494
597
-
match cache.setex(album_id, &data, 20) {
598
-
Ok(_) => {}
599
-
Err(e) => {
600
-
println!(
601
-
"{} redis error: {}",
602
-
format!("[{}]", album_id).bright_green(),
603
-
e.to_string().bright_red()
604
-
);
605
-
return Ok(None);
606
-
}
607
-
}
608
-
495
+
cache.setex(album_id, &data, 20)?;
609
496
Ok(Some(serde_json::from_str(&data)?))
610
497
}
611
498
···
637
524
638
525
let headers = response.headers().clone();
639
526
let data = response.text().await?;
527
+
640
528
if data == "Too many requests" {
641
529
println!(
642
530
"> retry-after {}",
···
647
535
}
648
536
649
537
let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
650
-
651
538
if album_tracks.items.is_empty() {
652
539
break;
653
540
}
···
657
544
}
658
545
659
546
let all_tracks_json = serde_json::to_string(&all_tracks)?;
660
-
match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) {
661
-
Ok(_) => {}
662
-
Err(e) => {
663
-
println!(
664
-
"{} redis error: {}",
665
-
format!("[{}]", album_id).bright_green(),
666
-
e.to_string().bright_red()
667
-
);
668
-
}
669
-
}
547
+
cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?;
670
548
671
549
Ok(AlbumTracks {
672
550
items: all_tracks,
···
681
559
) -> Result<Vec<(String, String, String, String)>, Error> {
682
560
let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
683
561
r#"
684
-
SELECT * FROM spotify_tokens
685
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
686
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
687
-
LIMIT $1 OFFSET $2
688
-
"#,
562
+
SELECT * FROM spotify_tokens
563
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
564
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
565
+
LIMIT $1 OFFSET $2
566
+
"#,
689
567
)
690
568
.bind(limit as i64)
691
569
.bind(offset as i64)
···
693
571
.await?;
694
572
695
573
let mut user_tokens = vec![];
696
-
697
574
for result in &results {
698
575
let token = decrypt_aes_256_ctr(
699
576
&result.refresh_token,
···
716
593
) -> Result<Option<(String, String, String)>, Error> {
717
594
let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
718
595
r#"
719
-
SELECT * FROM spotify_tokens
720
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
721
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
722
-
WHERE spotify_accounts.email = $1
723
-
"#,
596
+
SELECT * FROM spotify_tokens
597
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
598
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
599
+
WHERE spotify_accounts.email = $1
600
+
"#,
724
601
)
725
602
.bind(email)
726
603
.fetch_all(pool)
···
751
628
"Checking currently playing".cyan()
752
629
);
753
630
754
-
let stop_flag_clone = stop_flag.clone();
755
-
let spotify_email_clone = spotify_email.clone();
756
-
let cache_clone = cache.clone();
757
-
thread::spawn(move || {
758
-
loop {
759
-
if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
760
-
println!(
761
-
"{} Stopping Thread",
762
-
format!("[{}]", spotify_email_clone).bright_green()
763
-
);
764
-
break;
765
-
}
766
-
if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) {
767
-
if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() {
768
-
thread::sleep(std::time::Duration::from_millis(800));
769
-
continue;
770
-
}
771
-
772
-
let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?;
773
-
774
-
if let Some(item) = current_song.item.clone() {
775
-
if current_song.is_playing
776
-
&& current_song.progress_ms.unwrap_or(0) < item.duration_ms.into()
777
-
{
778
-
current_song.progress_ms =
779
-
Some(current_song.progress_ms.unwrap_or(0) + 800);
780
-
match cache_clone.setex(
781
-
&format!("{}:current", spotify_email_clone),
782
-
&serde_json::to_string(¤t_song)?,
783
-
16,
784
-
) {
785
-
Ok(_) => {}
786
-
Err(e) => {
787
-
println!(
788
-
"{} redis error: {}",
789
-
format!("[{}]", spotify_email_clone).bright_green(),
790
-
e.to_string().bright_red()
791
-
);
792
-
}
793
-
}
794
-
thread::sleep(std::time::Duration::from_millis(800));
795
-
continue;
796
-
}
797
-
}
798
-
continue;
799
-
}
800
-
801
-
if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) {
802
-
if cached == "No content" {
803
-
thread::sleep(std::time::Duration::from_millis(800));
804
-
continue;
805
-
}
806
-
match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) {
807
-
Ok(_) => {}
808
-
Err(e) => {
809
-
println!(
810
-
"{} redis error: {}",
811
-
format!("[{}]", spotify_email_clone).bright_green(),
812
-
e.to_string().bright_red()
813
-
);
814
-
}
815
-
}
816
-
}
817
-
818
-
thread::sleep(std::time::Duration::from_millis(800));
819
-
}
820
-
Ok::<(), Error>(())
821
-
});
631
+
// Remove the separate progress tracking thread - it was causing race conditions
632
+
// and unnecessary complexity
822
633
823
634
loop {
824
635
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
···
828
639
);
829
640
break;
830
641
}
642
+
831
643
let spotify_email = spotify_email.clone();
832
644
let token = token.clone();
833
645
let did = did.clone();
···
842
654
format!("[{}]", spotify_email).bright_green(),
843
655
e.to_string().bright_red()
844
656
);
845
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
657
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
846
658
continue;
847
659
}
848
660
};
···
854
666
format!("[{}]", spotify_email).bright_green(),
855
667
"No song playing".yellow()
856
668
);
857
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
669
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
858
670
continue;
859
671
}
672
+
860
673
let data_item = data.item.unwrap();
861
674
println!(
862
675
"{} {} is_playing: {} changed: {}",
···
866
679
changed
867
680
);
868
681
869
-
if changed {
870
-
scrobble(cache.clone(), &spotify_email, &did, &token).await?;
682
+
// Only scrobble if there's a genuine track change and the track is playing
683
+
if changed && data.is_playing {
684
+
// Add a small delay to prevent rapid duplicate scrobbles
685
+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
686
+
687
+
match scrobble(cache.clone(), &spotify_email, &did, &token).await {
688
+
Ok(_) => {
689
+
println!(
690
+
"{} {}",
691
+
format!("[{}]", spotify_email).bright_green(),
692
+
"Scrobbled successfully".green()
693
+
);
694
+
}
695
+
Err(e) => {
696
+
println!(
697
+
"{} Scrobble failed: {}",
698
+
format!("[{}]", spotify_email).bright_green(),
699
+
e.to_string().bright_red()
700
+
);
701
+
}
702
+
}
703
+
704
+
// Spawn background task for library updates
705
+
let cache_clone = cache.clone();
706
+
let token_clone = token.clone();
707
+
let spotify_email_clone = spotify_email.clone();
708
+
let did_clone = did.clone();
709
+
let album_id = data_item.album.id.clone();
871
710
872
711
thread::spawn(move || {
873
712
let rt = tokio::runtime::Runtime::new().unwrap();
874
713
match rt.block_on(async {
875
-
get_album_tracks(cache.clone(), &data_item.album.id, &token).await?;
876
-
get_album(cache.clone(), &data_item.album.id, &token).await?;
877
-
update_library(cache.clone(), &spotify_email, &did, &token).await?;
714
+
get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?;
715
+
get_album(cache_clone.clone(), &album_id, &token_clone).await?;
716
+
update_library(
717
+
cache_clone.clone(),
718
+
&spotify_email_clone,
719
+
&did_clone,
720
+
&token_clone,
721
+
)
722
+
.await?;
878
723
Ok::<(), Error>(())
879
724
}) {
880
-
Ok(_) => {}
725
+
Ok(_) => {
726
+
println!(
727
+
"{} Library updated successfully",
728
+
format!("[{}]", spotify_email_clone).bright_green()
729
+
);
730
+
}
881
731
Err(e) => {
882
732
println!(
883
-
"{} {}",
884
-
format!("[{}]", spotify_email).bright_green(),
733
+
"{} Library update failed: {}",
734
+
format!("[{}]", spotify_email_clone).bright_green(),
885
735
e.to_string().bright_red()
886
736
);
887
737
}
···
890
740
}
891
741
}
892
742
893
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
743
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
894
744
}
895
745
896
746
Ok(())