+182
-332
crates/spotify/src/lib.rs
+182
-332
crates/spotify/src/lib.rs
···
1
use std::{
2
collections::HashMap,
3
env,
4
sync::{atomic::AtomicBool, Arc, Mutex},
5
thread,
6
};
7
-
8
-
use anyhow::Error;
9
-
use async_nats::connect;
10
-
use owo_colors::OwoColorize;
11
-
use reqwest::Client;
12
-
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
13
use tokio_stream::StreamExt;
14
15
use crate::{
···
31
pub mod types;
32
33
pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
34
35
pub async fn run() -> Result<(), Error> {
36
let cache = Cache::new()?;
···
46
let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
47
println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
48
49
-
let users = find_spotify_users(&pool, 0, 100).await?;
50
println!("Found {} users", users.len().bright_green());
51
52
-
// Shared HashMap to manage threads and their stop flags
53
let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
54
Arc::new(Mutex::new(HashMap::new()));
55
56
-
// Start threads for all users
57
for user in users {
58
let email = user.0.clone();
59
let token = user.1.clone();
···
83
email.bright_green(),
84
e.to_string().bright_red()
85
);
86
-
87
// If there's an error, publish a message to restart the thread
88
match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
89
Ok(_) => {
···
106
});
107
}
108
109
-
// Handle subscription messages
110
while let Some(message) = sub.next().await {
111
let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
112
println!(
···
116
117
let mut thread_map = thread_map.lock().unwrap();
118
119
-
// Check if the user exists in the thread map
120
if let Some(stop_flag) = thread_map.get(&user_id) {
121
-
// Stop the existing thread
122
stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
123
124
-
// Create a new stop flag and restart the thread
125
let new_stop_flag = Arc::new(AtomicBool::new(false));
126
thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
127
128
let user = find_spotify_user(&pool, &user_id).await?;
129
-
130
if user.is_none() {
131
println!(
132
"Spotify user not found: {}, skipping",
···
136
}
137
138
let user = user.unwrap();
139
-
140
let email = user.0.clone();
141
let token = user.1.clone();
142
let did = user.2.clone();
···
166
}
167
}
168
});
169
-
170
println!("Restarted thread for user: {}", user_id.bright_green());
171
} else {
172
println!(
···
236
let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
237
238
let client = Client::new();
239
-
240
let response = client
241
.post("https://accounts.spotify.com/api/token")
242
.basic_auth(&client_id, Some(client_secret))
···
247
])
248
.send()
249
.await?;
250
let token = response.json::<AccessToken>().await?;
251
Ok(token)
252
}
···
256
user_id: &str,
257
token: &str,
258
) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
259
if let Ok(Some(data)) = cache.get(user_id) {
260
println!(
261
"{} {}",
···
265
if data == "No content" {
266
return Ok(None);
267
}
268
-
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
269
270
if decoded_data.is_err() {
271
println!(
272
"{} {} {}",
···
276
);
277
cache.setex(user_id, "No content", 10)?;
278
cache.del(&format!("{}:current", user_id))?;
279
return Ok(None);
280
}
281
282
let data: CurrentlyPlaying = decoded_data.unwrap();
283
-
// detect if the song has changed
284
-
let previous = cache.get(&format!("{}:previous", user_id));
285
286
-
if previous.is_err() {
287
-
println!(
288
-
"{} redis error: {}",
289
-
format!("[{}]", user_id).bright_green(),
290
-
previous.unwrap_err().to_string().bright_red()
291
-
);
292
-
return Ok(None);
293
-
}
294
-
295
-
let previous = previous.unwrap();
296
-
297
-
let changed = match previous {
298
-
Some(previous) => {
299
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
300
-
println!(
301
-
"{} {} {}",
302
-
format!("[{}]", user_id).bright_green(),
303
-
"Previous cache is invalid",
304
-
previous
305
-
);
306
-
return Ok(None);
307
-
}
308
-
309
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
310
-
if previous.item.is_none() && data.item.is_some() {
311
-
return Ok(Some((data, true)));
312
-
}
313
-
314
-
if previous.item.is_some() && data.item.is_none() {
315
-
return Ok(Some((data, false)));
316
-
}
317
-
318
-
if previous.item.is_none() && data.item.is_none() {
319
-
return Ok(Some((data, false)));
320
-
}
321
-
322
-
let previous_item = previous.item.unwrap();
323
-
let data_item = data.clone().item.unwrap();
324
-
previous_item.id != data_item.id
325
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
326
-
}
327
-
_ => true,
328
-
};
329
return Ok(Some((data, changed)));
330
}
331
···
343
344
if status == 429 {
345
println!(
346
-
"{} Too many requests, retry-after {}",
347
format!("[{}]", user_id).bright_green(),
348
headers
349
.get("retry-after")
···
355
return Ok(None);
356
}
357
358
-
let previous = cache.get(&format!("{}:previous", user_id));
359
-
if previous.is_err() {
360
-
println!(
361
-
"{} redis error: {}",
362
-
format!("[{}]", user_id).bright_green(),
363
-
previous.unwrap_err().to_string().bright_red()
364
-
);
365
-
return Ok(None);
366
-
}
367
-
368
-
let previous = previous.unwrap();
369
-
370
-
// check if status code is 204
371
if status == 204 {
372
println!("No content");
373
-
match cache.setex(
374
-
user_id,
375
-
"No content",
376
-
match previous.is_none() {
377
-
true => 30,
378
-
false => 10,
379
-
},
380
-
) {
381
-
Ok(_) => {}
382
-
Err(e) => {
383
-
println!(
384
-
"{} redis error: {}",
385
-
format!("[{}]", user_id).bright_green(),
386
-
e.to_string().bright_red()
387
-
);
388
-
return Ok(None);
389
-
}
390
-
}
391
-
match cache.del(&format!("{}:current", user_id)) {
392
-
Ok(_) => {}
393
-
Err(e) => {
394
-
println!(
395
-
"{} redis error: {}",
396
-
format!("[{}]", user_id).bright_green(),
397
-
e.to_string().bright_red()
398
-
);
399
-
return Ok(None);
400
-
}
401
-
}
402
return Ok(None);
403
}
404
···
409
"Invalid data received".red(),
410
data
411
);
412
-
match cache.setex(user_id, "No content", 10) {
413
-
Ok(_) => {}
414
-
Err(e) => {
415
-
println!(
416
-
"{} redis error: {}",
417
-
format!("[{}]", user_id).bright_green(),
418
-
e.to_string().bright_red()
419
-
);
420
-
return Ok(None);
421
-
}
422
-
}
423
-
match cache.del(&format!("{}:current", user_id)) {
424
-
Ok(_) => {}
425
-
Err(e) => {
426
-
println!(
427
-
"{} redis error: {}",
428
-
format!("[{}]", user_id).bright_green(),
429
-
e.to_string().bright_red()
430
-
);
431
-
return Ok(None);
432
-
}
433
-
}
434
return Ok(None);
435
}
436
437
-
let data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
438
-
439
-
match cache.setex(
440
-
user_id,
441
-
&serde_json::to_string(&data)?,
442
-
match previous.is_none() {
443
-
true => 30,
444
-
false => 15,
445
-
},
446
-
) {
447
-
Ok(_) => {}
448
-
Err(e) => {
449
-
println!(
450
-
"{} redis error: {}",
451
-
format!("[{}]", user_id).bright_green(),
452
-
e.to_string().bright_red()
453
-
);
454
-
return Ok(None);
455
-
}
456
-
}
457
-
match cache.del(&format!("{}:current", user_id)) {
458
-
Ok(_) => {}
459
-
Err(e) => {
460
-
println!(
461
-
"{} redis error: {}",
462
-
format!("[{}]", user_id).bright_green(),
463
-
e.to_string().bright_red()
464
-
);
465
-
return Ok(None);
466
-
}
467
-
}
468
469
-
// detect if the song has changed
470
-
let previous = cache.get(&format!("{}:previous", user_id));
471
472
-
if previous.is_err() {
473
-
println!(
474
-
"{} redis error: {}",
475
-
format!("[{}]", user_id).bright_green(),
476
-
previous.unwrap_err().to_string().bright_red()
477
-
);
478
-
return Ok(None);
479
-
}
480
481
-
let previous = previous.unwrap();
482
-
let changed = match previous {
483
-
Some(previous) => {
484
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
485
-
println!(
486
-
"{} {} {}",
487
-
format!("[{}]", user_id).bright_green(),
488
-
"Previous cache is invalid",
489
-
previous
490
-
);
491
-
return Ok(None);
492
-
}
493
494
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
495
-
if previous.item.is_none() || data.item.is_none() {
496
-
return Ok(Some((data, false)));
497
-
}
498
499
-
let previous_item = previous.item.unwrap();
500
-
let data_item = data.clone().item.unwrap();
501
502
-
previous_item.id != data_item.id
503
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
504
}
505
-
_ => false,
506
};
507
508
-
// save as previous song
509
-
match cache.setex(
510
-
&format!("{}:previous", user_id),
511
-
&serde_json::to_string(&data)?,
512
-
600,
513
-
) {
514
-
Ok(_) => {}
515
-
Err(e) => {
516
-
println!(
517
-
"{} redis error: {}",
518
-
format!("[{}]", user_id).bright_green(),
519
-
e.to_string().bright_red()
520
-
);
521
-
return Ok(None);
522
}
523
-
}
524
525
-
Ok(Some((data, changed)))
526
}
527
528
pub async fn get_artist(
···
554
return Ok(None);
555
}
556
557
-
match cache.setex(artist_id, &data, 20) {
558
-
Ok(_) => {}
559
-
Err(e) => {
560
-
println!(
561
-
"{} redis error: {}",
562
-
format!("[{}]", artist_id).bright_green(),
563
-
e.to_string().bright_red()
564
-
);
565
-
return Ok(None);
566
-
}
567
-
}
568
-
569
Ok(Some(serde_json::from_str(&data)?))
570
}
571
···
594
return Ok(None);
595
}
596
597
-
match cache.setex(album_id, &data, 20) {
598
-
Ok(_) => {}
599
-
Err(e) => {
600
-
println!(
601
-
"{} redis error: {}",
602
-
format!("[{}]", album_id).bright_green(),
603
-
e.to_string().bright_red()
604
-
);
605
-
return Ok(None);
606
-
}
607
-
}
608
-
609
Ok(Some(serde_json::from_str(&data)?))
610
}
611
···
637
638
let headers = response.headers().clone();
639
let data = response.text().await?;
640
if data == "Too many requests" {
641
println!(
642
"> retry-after {}",
···
647
}
648
649
let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
650
-
651
if album_tracks.items.is_empty() {
652
break;
653
}
···
657
}
658
659
let all_tracks_json = serde_json::to_string(&all_tracks)?;
660
-
match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) {
661
-
Ok(_) => {}
662
-
Err(e) => {
663
-
println!(
664
-
"{} redis error: {}",
665
-
format!("[{}]", album_id).bright_green(),
666
-
e.to_string().bright_red()
667
-
);
668
-
}
669
-
}
670
671
Ok(AlbumTracks {
672
items: all_tracks,
···
681
) -> Result<Vec<(String, String, String, String)>, Error> {
682
let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
683
r#"
684
-
SELECT * FROM spotify_tokens
685
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
686
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
687
-
LIMIT $1 OFFSET $2
688
-
"#,
689
)
690
.bind(limit as i64)
691
.bind(offset as i64)
···
693
.await?;
694
695
let mut user_tokens = vec![];
696
-
697
for result in &results {
698
let token = decrypt_aes_256_ctr(
699
&result.refresh_token,
···
716
) -> Result<Option<(String, String, String)>, Error> {
717
let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
718
r#"
719
-
SELECT * FROM spotify_tokens
720
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
721
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
722
-
WHERE spotify_accounts.email = $1
723
-
"#,
724
)
725
.bind(email)
726
.fetch_all(pool)
···
751
"Checking currently playing".cyan()
752
);
753
754
-
let stop_flag_clone = stop_flag.clone();
755
-
let spotify_email_clone = spotify_email.clone();
756
-
let cache_clone = cache.clone();
757
-
thread::spawn(move || {
758
-
loop {
759
-
if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
760
-
println!(
761
-
"{} Stopping Thread",
762
-
format!("[{}]", spotify_email_clone).bright_green()
763
-
);
764
-
break;
765
-
}
766
-
if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) {
767
-
if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() {
768
-
thread::sleep(std::time::Duration::from_millis(800));
769
-
continue;
770
-
}
771
-
772
-
let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?;
773
-
774
-
if let Some(item) = current_song.item.clone() {
775
-
if current_song.is_playing
776
-
&& current_song.progress_ms.unwrap_or(0) < item.duration_ms.into()
777
-
{
778
-
current_song.progress_ms =
779
-
Some(current_song.progress_ms.unwrap_or(0) + 800);
780
-
match cache_clone.setex(
781
-
&format!("{}:current", spotify_email_clone),
782
-
&serde_json::to_string(¤t_song)?,
783
-
16,
784
-
) {
785
-
Ok(_) => {}
786
-
Err(e) => {
787
-
println!(
788
-
"{} redis error: {}",
789
-
format!("[{}]", spotify_email_clone).bright_green(),
790
-
e.to_string().bright_red()
791
-
);
792
-
}
793
-
}
794
-
thread::sleep(std::time::Duration::from_millis(800));
795
-
continue;
796
-
}
797
-
}
798
-
continue;
799
-
}
800
-
801
-
if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) {
802
-
if cached == "No content" {
803
-
thread::sleep(std::time::Duration::from_millis(800));
804
-
continue;
805
-
}
806
-
match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) {
807
-
Ok(_) => {}
808
-
Err(e) => {
809
-
println!(
810
-
"{} redis error: {}",
811
-
format!("[{}]", spotify_email_clone).bright_green(),
812
-
e.to_string().bright_red()
813
-
);
814
-
}
815
-
}
816
-
}
817
-
818
-
thread::sleep(std::time::Duration::from_millis(800));
819
-
}
820
-
Ok::<(), Error>(())
821
-
});
822
823
loop {
824
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
···
828
);
829
break;
830
}
831
let spotify_email = spotify_email.clone();
832
let token = token.clone();
833
let did = did.clone();
···
842
format!("[{}]", spotify_email).bright_green(),
843
e.to_string().bright_red()
844
);
845
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
846
continue;
847
}
848
};
···
854
format!("[{}]", spotify_email).bright_green(),
855
"No song playing".yellow()
856
);
857
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
858
continue;
859
}
860
let data_item = data.item.unwrap();
861
println!(
862
"{} {} is_playing: {} changed: {}",
···
866
changed
867
);
868
869
-
if changed {
870
-
scrobble(cache.clone(), &spotify_email, &did, &token).await?;
871
872
thread::spawn(move || {
873
let rt = tokio::runtime::Runtime::new().unwrap();
874
match rt.block_on(async {
875
-
get_album_tracks(cache.clone(), &data_item.album.id, &token).await?;
876
-
get_album(cache.clone(), &data_item.album.id, &token).await?;
877
-
update_library(cache.clone(), &spotify_email, &did, &token).await?;
878
Ok::<(), Error>(())
879
}) {
880
-
Ok(_) => {}
881
Err(e) => {
882
println!(
883
-
"{} {}",
884
-
format!("[{}]", spotify_email).bright_green(),
885
e.to_string().bright_red()
886
);
887
}
···
890
}
891
}
892
893
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
894
}
895
896
Ok(())
···
1
+
use anyhow::Error;
2
+
use async_nats::connect;
3
+
use owo_colors::OwoColorize;
4
+
use reqwest::Client;
5
+
use serde::{Deserialize, Serialize};
6
+
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
7
use std::{
8
collections::HashMap,
9
env,
10
sync::{atomic::AtomicBool, Arc, Mutex},
11
thread,
12
+
time::{SystemTime, UNIX_EPOCH},
13
};
14
use tokio_stream::StreamExt;
15
16
use crate::{
···
32
pub mod types;
33
34
pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
35
+
pub const MAX_USERS: usize = 100;
36
+
37
+
#[derive(Serialize, Deserialize, Debug, Clone)]
38
+
struct TrackState {
39
+
track_id: String,
40
+
progress_ms: u64,
41
+
scrobbled: bool,
42
+
last_updated: u64,
43
+
}
44
45
pub async fn run() -> Result<(), Error> {
46
let cache = Cache::new()?;
···
56
let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
57
println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
58
59
+
let users = find_spotify_users(&pool, 0, MAX_USERS).await?;
60
println!("Found {} users", users.len().bright_green());
61
62
let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
63
Arc::new(Mutex::new(HashMap::new()));
64
65
for user in users {
66
let email = user.0.clone();
67
let token = user.1.clone();
···
91
email.bright_green(),
92
e.to_string().bright_red()
93
);
94
// If there's an error, publish a message to restart the thread
95
match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
96
Ok(_) => {
···
113
});
114
}
115
116
while let Some(message) = sub.next().await {
117
let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
118
println!(
···
122
123
let mut thread_map = thread_map.lock().unwrap();
124
125
if let Some(stop_flag) = thread_map.get(&user_id) {
126
stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
127
128
let new_stop_flag = Arc::new(AtomicBool::new(false));
129
thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
130
131
let user = find_spotify_user(&pool, &user_id).await?;
132
if user.is_none() {
133
println!(
134
"Spotify user not found: {}, skipping",
···
138
}
139
140
let user = user.unwrap();
141
let email = user.0.clone();
142
let token = user.1.clone();
143
let did = user.2.clone();
···
167
}
168
}
169
});
170
println!("Restarted thread for user: {}", user_id.bright_green());
171
} else {
172
println!(
···
236
let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
237
238
let client = Client::new();
239
let response = client
240
.post("https://accounts.spotify.com/api/token")
241
.basic_auth(&client_id, Some(client_secret))
···
246
])
247
.send()
248
.await?;
249
+
250
let token = response.json::<AccessToken>().await?;
251
Ok(token)
252
}
···
256
user_id: &str,
257
token: &str,
258
) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
259
+
// Check if we have cached data
260
if let Ok(Some(data)) = cache.get(user_id) {
261
println!(
262
"{} {}",
···
266
if data == "No content" {
267
return Ok(None);
268
}
269
270
+
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
271
if decoded_data.is_err() {
272
println!(
273
"{} {} {}",
···
277
);
278
cache.setex(user_id, "No content", 10)?;
279
cache.del(&format!("{}:current", user_id))?;
280
+
cache.del(&format!("{}:track_state", user_id))?;
281
return Ok(None);
282
}
283
284
let data: CurrentlyPlaying = decoded_data.unwrap();
285
286
+
let changed = detect_track_change(&cache, user_id, &data)?;
287
return Ok(Some((data, changed)));
288
}
289
···
301
302
if status == 429 {
303
println!(
304
+
"{} Too many requests, retry-after {}",
305
format!("[{}]", user_id).bright_green(),
306
headers
307
.get("retry-after")
···
313
return Ok(None);
314
}
315
316
if status == 204 {
317
println!("No content");
318
+
// Clear track state when nothing is playing
319
+
cache.del(&format!("{}:track_state", user_id))?;
320
+
321
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
322
+
30
323
+
} else {
324
+
10
325
+
};
326
+
327
+
cache.setex(user_id, "No content", ttl)?;
328
+
cache.del(&format!("{}:current", user_id))?;
329
return Ok(None);
330
}
331
···
336
"Invalid data received".red(),
337
data
338
);
339
+
cache.setex(user_id, "No content", 10)?;
340
+
cache.del(&format!("{}:current", user_id))?;
341
+
cache.del(&format!("{}:track_state", user_id))?;
342
return Ok(None);
343
}
344
345
+
let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
346
347
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
348
+
30
349
+
} else {
350
+
15
351
+
};
352
353
+
cache.setex(
354
+
user_id,
355
+
&serde_json::to_string(¤tly_playing_data)?,
356
+
ttl,
357
+
)?;
358
+
cache.del(&format!("{}:current", user_id))?;
359
360
+
// Detect track change and update track state
361
+
let changed = detect_track_change(&cache, user_id, ¤tly_playing_data)?;
362
363
+
// Update previous song cache
364
+
cache.setex(
365
+
&format!("{}:previous", user_id),
366
+
&serde_json::to_string(¤tly_playing_data)?,
367
+
600,
368
+
)?;
369
370
+
Ok(Some((currently_playing_data, changed)))
371
+
}
372
373
+
fn detect_track_change(
374
+
cache: &Cache,
375
+
user_id: &str,
376
+
current: &CurrentlyPlaying,
377
+
) -> Result<bool, Error> {
378
+
let track_state_key = format!("{}:track_state", user_id);
379
+
380
+
let now = SystemTime::now()
381
+
.duration_since(UNIX_EPOCH)
382
+
.unwrap()
383
+
.as_secs();
384
+
385
+
let current_item = match ¤t.item {
386
+
Some(item) => item,
387
+
None => {
388
+
let _ = cache.del(&track_state_key);
389
+
return Ok(false);
390
}
391
};
392
393
+
let previous_state = cache.get(&track_state_key)?;
394
+
395
+
let changed = match previous_state {
396
+
Some(state_str) => {
397
+
if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) {
398
+
if prev_state.track_id != current_item.id {
399
+
true
400
+
} else {
401
+
// Same track - check if we should scrobble based on progress and time
402
+
let progress_diff =
403
+
current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64;
404
+
let time_diff = now - prev_state.last_updated;
405
+
406
+
// Only consider it changed if:
407
+
// 1. We haven't scrobbled this track yet
408
+
// 2. Significant progress was made (more than 10 seconds or reasonable time passed)
409
+
// 3. Track is actually playing
410
+
!prev_state.scrobbled
411
+
&& current.is_playing
412
+
&& (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0))
413
+
}
414
+
} else {
415
+
// Invalid previous state, treat as changed
416
+
true
417
+
}
418
}
419
+
None => {
420
+
// No previous state, treat as new track
421
+
current.is_playing
422
+
}
423
+
};
424
+
425
+
let new_state = TrackState {
426
+
track_id: current_item.id.clone(),
427
+
progress_ms: current.progress_ms.unwrap_or(0),
428
+
scrobbled: changed, // Mark as scrobbled if we're reporting a change
429
+
last_updated: now,
430
+
};
431
432
+
cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?;
433
+
434
+
Ok(changed)
435
}
436
437
pub async fn get_artist(
···
463
return Ok(None);
464
}
465
466
+
cache.setex(artist_id, &data, 20)?;
467
Ok(Some(serde_json::from_str(&data)?))
468
}
469
···
492
return Ok(None);
493
}
494
495
+
cache.setex(album_id, &data, 20)?;
496
Ok(Some(serde_json::from_str(&data)?))
497
}
498
···
524
525
let headers = response.headers().clone();
526
let data = response.text().await?;
527
+
528
if data == "Too many requests" {
529
println!(
530
"> retry-after {}",
···
535
}
536
537
let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
538
if album_tracks.items.is_empty() {
539
break;
540
}
···
544
}
545
546
let all_tracks_json = serde_json::to_string(&all_tracks)?;
547
+
cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?;
548
549
Ok(AlbumTracks {
550
items: all_tracks,
···
559
) -> Result<Vec<(String, String, String, String)>, Error> {
560
let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
561
r#"
562
+
SELECT * FROM spotify_tokens
563
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
564
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
565
+
LIMIT $1 OFFSET $2
566
+
"#,
567
)
568
.bind(limit as i64)
569
.bind(offset as i64)
···
571
.await?;
572
573
let mut user_tokens = vec![];
574
for result in &results {
575
let token = decrypt_aes_256_ctr(
576
&result.refresh_token,
···
593
) -> Result<Option<(String, String, String)>, Error> {
594
let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
595
r#"
596
+
SELECT * FROM spotify_tokens
597
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
598
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
599
+
WHERE spotify_accounts.email = $1
600
+
"#,
601
)
602
.bind(email)
603
.fetch_all(pool)
···
628
"Checking currently playing".cyan()
629
);
630
631
+
// Remove the separate progress tracking thread - it was causing race conditions
632
+
// and unnecessary complexity
633
634
loop {
635
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
···
639
);
640
break;
641
}
642
+
643
let spotify_email = spotify_email.clone();
644
let token = token.clone();
645
let did = did.clone();
···
654
format!("[{}]", spotify_email).bright_green(),
655
e.to_string().bright_red()
656
);
657
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
658
continue;
659
}
660
};
···
666
format!("[{}]", spotify_email).bright_green(),
667
"No song playing".yellow()
668
);
669
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
670
continue;
671
}
672
+
673
let data_item = data.item.unwrap();
674
println!(
675
"{} {} is_playing: {} changed: {}",
···
679
changed
680
);
681
682
+
// Only scrobble if there's a genuine track change and the track is playing
683
+
if changed && data.is_playing {
684
+
// Add a small delay to prevent rapid duplicate scrobbles
685
+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
686
+
687
+
match scrobble(cache.clone(), &spotify_email, &did, &token).await {
688
+
Ok(_) => {
689
+
println!(
690
+
"{} {}",
691
+
format!("[{}]", spotify_email).bright_green(),
692
+
"Scrobbled successfully".green()
693
+
);
694
+
}
695
+
Err(e) => {
696
+
println!(
697
+
"{} Scrobble failed: {}",
698
+
format!("[{}]", spotify_email).bright_green(),
699
+
e.to_string().bright_red()
700
+
);
701
+
}
702
+
}
703
+
704
+
// Spawn background task for library updates
705
+
let cache_clone = cache.clone();
706
+
let token_clone = token.clone();
707
+
let spotify_email_clone = spotify_email.clone();
708
+
let did_clone = did.clone();
709
+
let album_id = data_item.album.id.clone();
710
711
thread::spawn(move || {
712
let rt = tokio::runtime::Runtime::new().unwrap();
713
match rt.block_on(async {
714
+
get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?;
715
+
get_album(cache_clone.clone(), &album_id, &token_clone).await?;
716
+
update_library(
717
+
cache_clone.clone(),
718
+
&spotify_email_clone,
719
+
&did_clone,
720
+
&token_clone,
721
+
)
722
+
.await?;
723
Ok::<(), Error>(())
724
}) {
725
+
Ok(_) => {
726
+
println!(
727
+
"{} Library updated successfully",
728
+
format!("[{}]", spotify_email_clone).bright_green()
729
+
);
730
+
}
731
Err(e) => {
732
println!(
733
+
"{} Library update failed: {}",
734
+
format!("[{}]", spotify_email_clone).bright_green(),
735
e.to_string().bright_red()
736
);
737
}
···
740
}
741
}
742
743
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
744
}
745
746
Ok(())