A decentralized music tracking and discovery platform built on AT Protocol ๐ŸŽต
listenbrainz spotify atproto lastfm musicbrainz scrobbling

feat: implement Discord webhook integration for scrobble events #1

merged opened by tsiry-sandratraina.com targeting main from feat/discord-webhook
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/sh.tangled.repo.pull/3lzftgidljp22
+375 -18
Diff #0
+10 -8
Cargo.lock
··· 1509 1509 1510 1510 [[package]] 1511 1511 name = "deranged" 1512 - version = "0.4.0" 1512 + version = "0.5.3" 1513 1513 source = "registry+https://github.com/rust-lang/crates.io-index" 1514 - checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" 1514 + checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" 1515 1515 dependencies = [ 1516 1516 "powerfmt", 1517 1517 "serde", ··· 4931 4931 "dotenv", 4932 4932 "futures-util", 4933 4933 "owo-colors", 4934 + "redis 0.29.5", 4934 4935 "reqwest", 4935 4936 "serde", 4936 4937 "serde_json", 4937 4938 "sha256", 4938 4939 "sqlx", 4940 + "time", 4939 4941 "tokio", 4940 4942 "tokio-stream", 4941 4943 "tokio-tungstenite", ··· 6440 6442 6441 6443 [[package]] 6442 6444 name = "time" 6443 - version = "0.3.41" 6445 + version = "0.3.44" 6444 6446 source = "registry+https://github.com/rust-lang/crates.io-index" 6445 - checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" 6447 + checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" 6446 6448 dependencies = [ 6447 6449 "deranged", 6448 6450 "itoa", ··· 6455 6457 6456 6458 [[package]] 6457 6459 name = "time-core" 6458 - version = "0.1.4" 6460 + version = "0.1.6" 6459 6461 source = "registry+https://github.com/rust-lang/crates.io-index" 6460 - checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" 6462 + checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" 6461 6463 6462 6464 [[package]] 6463 6465 name = "time-macros" 6464 - version = "0.2.22" 6466 + version = "0.2.24" 6465 6467 source = "registry+https://github.com/rust-lang/crates.io-index" 6466 - checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" 6468 + checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" 6467 6469 dependencies = [ 6468 6470 "num-conv", 6469 6471 "time-core",
+2
crates/jetstream/Cargo.toml
··· 36 36 "json", 37 37 ], default-features = false } 38 38 sha256 = "1.6.0" 39 + time = { version = "0.3.44", features = ["formatting", "macros"] } 40 + redis = { version = "0.29.0", features = ["aio", "tokio-comp"] }
+16 -3
crates/jetstream/src/lib.rs
··· 1 1 use anyhow::Error; 2 - use std::env; 3 - 2 + use std::{env, sync::Arc}; 4 3 use subscriber::ScrobbleSubscriber; 4 + use tokio::sync::Mutex; 5 + 6 + use crate::webhook_worker::{start_worker, AppState}; 5 7 6 8 pub mod profile; 7 9 pub mod repo; 8 10 pub mod subscriber; 9 11 pub mod types; 12 + pub mod webhook; 13 + pub mod webhook_worker; 10 14 pub mod xata; 11 15 12 16 pub async fn subscribe() -> Result<(), Error> { 17 + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); 18 + let redis = redis::Client::open(redis_url)?; 19 + let queue_key = 20 + env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string()); 21 + 22 + let state = Arc::new(Mutex::new(AppState { redis, queue_key })); 23 + 24 + start_worker(state.clone()).await?; 25 + 13 26 let jetstream_server = env::var("JETSTREAM_SERVER") 14 27 .unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network".to_string()); 15 28 let url = format!( ··· 18 31 ); 19 32 let subscriber = ScrobbleSubscriber::new(&url); 20 33 21 - subscriber.run().await?; 34 + subscriber.run(state).await?; 22 35 23 36 Ok(()) 24 37 }
+14 -2
crates/jetstream/src/main.rs
··· 1 - use std::env; 1 + use std::{env, sync::Arc}; 2 2 3 3 use dotenv::dotenv; 4 4 use subscriber::ScrobbleSubscriber; 5 + use tokio::sync::Mutex; 6 + 7 + use crate::webhook_worker::AppState; 5 8 6 9 pub mod profile; 7 10 pub mod repo; 8 11 pub mod subscriber; 9 12 pub mod types; 13 + pub mod webhook; 14 + pub mod webhook_worker; 10 15 pub mod xata; 11 16 12 17 #[tokio::main] ··· 20 25 ); 21 26 let subscriber = ScrobbleSubscriber::new(&url); 22 27 23 - subscriber.run().await?; 28 + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); 29 + let redis = redis::Client::open(redis_url)?; 30 + let queue_key = 31 + env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string()); 32 + 33 + let state = Arc::new(Mutex::new(AppState { redis, queue_key })); 34 + 35 + subscriber.run(state).await?; 24 36 Ok(()) 25 37 }
+61
crates/jetstream/src/repo.rs
··· 10 10 profile::did_to_profile, 11 11 subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}, 12 12 types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord}, 13 + webhook::discord::{ 14 + self, 15 + model::{ScrobbleData, WebhookEnvelope}, 16 + }, 17 + webhook_worker::{push_to_queue, AppState}, 13 18 xata::{ 14 19 album::Album, album_track::AlbumTrack, artist::Artist, artist_album::ArtistAlbum, 15 20 artist_track::ArtistTrack, track::Track, user::User, user_album::UserAlbum, ··· 18 23 }; 19 24 20 25 pub async fn save_scrobble( 26 + state: Arc<Mutex<AppState>>, 21 27 pool: Arc<Mutex<Pool<Postgres>>>, 22 28 did: &str, 23 29 commit: Commit, ··· 85 91 .await?; 86 92 87 93 tx.commit().await?; 94 + 95 + let users: Vec<User> = 96 + sqlx::query_as::<_, User>("SELECT * FROM users WHERE did = $1") 97 + .bind(did) 98 + .fetch_all(&*pool) 99 + .await?; 100 + 101 + if users.is_empty() { 102 + return Err(anyhow::anyhow!( 103 + "User with DID {} not found in database", 104 + did 105 + )); 106 + } 107 + 108 + // Push to webhook queue (Discord) 109 + match push_to_queue( 110 + state, 111 + &WebhookEnvelope { 112 + r#type: "scrobble.created".to_string(), 113 + id: commit.rkey.clone(), 114 + data: ScrobbleData { 115 + user: discord::model::User { 116 + did: did.to_string(), 117 + display_name: users[0].display_name.clone(), 118 + handle: users[0].handle.clone(), 119 + }, 120 + track: discord::model::Track { 121 + title: scrobble_record.title.clone(), 122 + artist: scrobble_record.artist.clone(), 123 + album: scrobble_record.album.clone(), 124 + duration: scrobble_record.duration, 125 + artwork_url: scrobble_record.album_art.clone().map(|x| { 126 + format!( 127 + "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", 128 + did, 129 + x.r#ref.link, 130 + x.mime_type.split('/').last().unwrap_or("jpeg") 131 + ) 132 + }), 133 + spotify_url: scrobble_record.spotify_link.clone(), 134 + tidal_url: scrobble_record.tidal_link.clone(), 135 + youtube_url: scrobble_record.youtube_link.clone(), 136 + }, 137 + played_at: scrobble_record.created_at.clone(), 138 + }, 139 + delivered_at: Some(chrono::Utc::now().to_rfc3339()), 140 + }, 141 + ) 142 + .await 143 + { 144 + Ok(_) => {} 145 + Err(e) => { 146 + eprintln!("Failed to push to webhook queue: {}", e); 147 + } 148 + } 88 149 } 89 150 90 151 if commit.collection == ARTIST_NSID {
+9 -5
crates/jetstream/src/subscriber.rs
··· 7 7 use tokio::sync::Mutex; 8 8 use tokio_tungstenite::{connect_async, tungstenite::Message}; 9 9 10 - use crate::{repo::save_scrobble, types::Root}; 10 + use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState}; 11 11 12 12 pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; 13 13 pub const ARTIST_NSID: &str = "app.rocksky.artist"; ··· 28 28 } 29 29 } 30 30 31 - pub async fn run(&self) -> Result<(), Error> { 31 + pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> { 32 32 // Get the connection string outside of the task 33 33 let db_url = env::var("XATA_POSTGRES_URL") 34 34 .context("Failed to get XATA_POSTGRES_URL environment variable")?; ··· 48 48 while let Some(msg) = ws_stream.next().await { 49 49 match msg { 50 50 Ok(msg) => { 51 - if let Err(e) = handle_message(pool.clone(), msg).await { 51 + if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await { 52 52 eprintln!("Error handling message: {}", e); 53 53 } 54 54 } ··· 63 63 } 64 64 } 65 65 66 - async fn handle_message(pool: Arc<Mutex<sqlx::PgPool>>, msg: Message) -> Result<(), Error> { 66 + async fn handle_message( 67 + state: Arc<Mutex<AppState>>, 68 + pool: Arc<Mutex<sqlx::PgPool>>, 69 + msg: Message, 70 + ) -> Result<(), Error> { 67 71 tokio::spawn(async move { 68 72 if let Message::Text(text) = msg { 69 73 let message: Root = serde_json::from_str(&text)?; ··· 74 78 75 79 println!("Received message: {:#?}", message); 76 80 if let Some(commit) = message.commit { 77 - match save_scrobble(pool, &message.did, commit).await { 81 + match save_scrobble(state, pool, &message.did, commit).await { 78 82 Ok(_) => { 79 83 println!("Scrobble saved successfully"); 80 84 }
+48
crates/jetstream/src/webhook/discord/mod.rs
··· 1 + pub mod model; 2 + 3 + use crate::webhook::discord::model::*; 4 + use reqwest::Client; 5 + 6 + pub fn embed_from_scrobble(s: &ScrobbleData, rkey: &str) -> DiscordEmbed { 7 + let url = format!("https://rocksky.app/{}/scrobble/{}", s.user.did, rkey); 8 + 9 + let mut desc = format!("**{}**\nby {}", esc(&s.track.title), esc(&s.track.artist)); 10 + desc.push_str(&format!("\non *{}*", esc(&s.track.album))); 11 + 12 + DiscordEmbed { 13 + title: s.user.display_name.clone(), 14 + url, 15 + description: Some(desc), 16 + timestamp: Some(s.played_at.clone()), 17 + thumbnail: s.track.artwork_url.clone().map(|u| DiscordThumb { url: u }), 18 + footer: Some(DiscordFooter { 19 + text: format!("Rocksky โ€ข {}", s.user.handle.clone()), 20 + }), 21 + } 22 + } 23 + 24 + pub async fn post_embeds( 25 + http: &Client, 26 + discord_webhook_url: &str, 27 + embeds: Vec<DiscordEmbed>, 28 + ) -> reqwest::Result<()> { 29 + if discord_webhook_url.is_empty() { 30 + println!("DISCORD_WEBHOOK_URL is not set, skipping webhook post"); 31 + return Ok(()); 32 + } 33 + 34 + let body = DiscordWebhookPayload { 35 + content: String::new(), 36 + embeds, 37 + }; 38 + let res = http.post(discord_webhook_url).json(&body).send().await?; 39 + if !res.status().is_success() { 40 + let text = res.text().await.unwrap_or_default(); 41 + eprintln!("Failed to post to Discord webhook: {}", text); 42 + } 43 + Ok(()) 44 + } 45 + 46 + fn esc(s: &str) -> String { 47 + s.replace(['*', '_', '~', '`', '>'], "\\$0") 48 + }
+75
crates/jetstream/src/webhook/discord/model.rs
··· 1 + use serde::{Deserialize, Serialize}; 2 + 3 + #[derive(Debug, Deserialize, Serialize, Clone)] 4 + pub struct WebhookEnvelope { 5 + #[serde(default)] 6 + pub r#type: String, 7 + pub id: String, 8 + #[serde(default)] 9 + pub delivered_at: Option<String>, 10 + pub data: ScrobbleData, 11 + } 12 + 13 + #[derive(Debug, Deserialize, Serialize, Clone)] 14 + pub struct ScrobbleData { 15 + pub user: User, 16 + pub track: Track, 17 + pub played_at: String, // ISO 8601 18 + } 19 + 20 + #[derive(Debug, Deserialize, Serialize, Clone)] 21 + pub struct User { 22 + pub did: String, 23 + pub display_name: String, 24 + pub handle: String, 25 + } 26 + 27 + #[derive(Debug, Deserialize, Serialize, Clone)] 28 + pub struct Track { 29 + pub title: String, 30 + pub artist: String, 31 + pub album: String, 32 + pub duration: i32, 33 + #[serde(default)] 34 + pub artwork_url: Option<String>, 35 + #[serde(default)] 36 + pub spotify_url: Option<String>, 37 + #[serde(default)] 38 + pub tidal_url: Option<String>, 39 + #[serde(default)] 40 + pub youtube_url: Option<String>, 41 + } 42 + 43 + /* ---------- Discord payloads ---------- */ 44 + 45 + #[derive(Debug, Serialize)] 46 + pub struct DiscordWebhookPayload { 47 + #[serde(default)] 48 + pub content: String, 49 + #[serde(default)] 50 + pub embeds: Vec<DiscordEmbed>, 51 + } 52 + 53 + #[derive(Debug, Serialize)] 54 + pub struct DiscordEmbed { 55 + pub title: String, 56 + pub url: String, 57 + #[serde(skip_serializing_if = "Option::is_none")] 58 + pub description: Option<String>, 59 + #[serde(skip_serializing_if = "Option::is_none")] 60 + pub timestamp: Option<String>, 61 + #[serde(skip_serializing_if = "Option::is_none")] 62 + pub thumbnail: Option<DiscordThumb>, 63 + #[serde(skip_serializing_if = "Option::is_none")] 64 + pub footer: Option<DiscordFooter>, 65 + } 66 + 67 + #[derive(Debug, Serialize)] 68 + pub struct DiscordThumb { 69 + pub url: String, 70 + } 71 + 72 + #[derive(Debug, Serialize)] 73 + pub struct DiscordFooter { 74 + pub text: String, 75 + }
+1
crates/jetstream/src/webhook/mod.rs
··· 1 + pub mod discord;
+139
crates/jetstream/src/webhook_worker.rs
··· 1 + use crate::webhook::discord::{self, model::WebhookEnvelope}; 2 + use anyhow::Error; 3 + use std::{ 4 + env, 5 + sync::Arc, 6 + time::{Duration, Instant}, 7 + }; 8 + use tokio::{sync::Mutex, time::interval}; 9 + 10 + #[derive(Clone)] 11 + pub struct AppState { 12 + pub redis: redis::Client, 13 + pub queue_key: String, 14 + } 15 + 16 + pub async fn start_worker(state: Arc<Mutex<AppState>>) -> Result<(), Error> { 17 + let max_rps: u32 = env::var("MAX_REQUESTS_PER_SEC") 18 + .ok() 19 + .and_then(|s| s.parse().ok()) 20 + .unwrap_or(5); 21 + let max_embeds_per: usize = env::var("MAX_EMBEDS_PER_REQUEST") 22 + .ok() 23 + .and_then(|s| s.parse().ok()) 24 + .unwrap_or(10); 25 + let batch_window_ms: u64 = env::var("BATCH_WINDOW_MS") 26 + .ok() 27 + .and_then(|s| s.parse().ok()) 28 + .unwrap_or(400); 29 + let discord_webhook_url = env::var("DISCORD_WEBHOOK_URL").unwrap_or(String::new()); 30 + 31 + tokio::spawn(run_worker( 32 + state.clone(), 33 + discord_webhook_url, 34 + max_rps, 35 + max_embeds_per, 36 + Duration::from_millis(batch_window_ms), 37 + )); 38 + 39 + Ok(()) 40 + } 41 + 42 + async fn run_worker( 43 + st: Arc<Mutex<AppState>>, 44 + discord_webhook_url: String, 45 + max_rps: u32, 46 + max_embeds_per: usize, 47 + batch_window: Duration, 48 + ) { 49 + let http = reqwest::Client::builder() 50 + .user_agent("rocksky-discord-bridge/0.1") 51 + .build() 52 + .expect("http client"); 53 + 54 + let mut tokens = max_rps as i32; 55 + let mut refill = interval(Duration::from_secs(1)); 56 + refill.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 57 + 58 + loop { 59 + tokio::select! { 60 + _ = refill.tick() => { 61 + tokens = (tokens + max_rps as i32).min(max_rps as i32); 62 + } 63 + _ = tokio::time::sleep(Duration::from_millis(10)) => { /* tick */ } 64 + } 65 + 66 + if tokens <= 0 { 67 + continue; 68 + } 69 + 70 + let start = Instant::now(); 71 + let mut embeds = Vec::with_capacity(max_embeds_per); 72 + 73 + while embeds.len() < max_embeds_per && start.elapsed() < batch_window { 74 + match brpop_once(st.clone(), 1).await { 75 + Ok(Some(json_str)) => { 76 + if let Ok(env) = serde_json::from_str::<WebhookEnvelope>(&json_str) { 77 + embeds.push(discord::embed_from_scrobble(&env.data, &env.id)); 78 + } 79 + } 80 + Ok(None) => break, 81 + Err(e) => { 82 + eprintln!("Failed to pop from Redis: {}", e); 83 + break; 84 + } 85 + } 86 + } 87 + 88 + if embeds.is_empty() { 89 + tokio::time::sleep(Duration::from_millis(50)).await; 90 + continue; 91 + } 92 + 93 + tokens -= 1; 94 + 95 + if let Err(e) = discord::post_embeds(&http, &discord_webhook_url, embeds).await { 96 + eprintln!("Failed to post to Discord webhook: {}", e); 97 + } 98 + } 99 + } 100 + 101 + async fn brpop_once( 102 + state: Arc<Mutex<AppState>>, 103 + timeout_secs: u64, 104 + ) -> redis::RedisResult<Option<String>> { 105 + let AppState { 106 + redis: client, 107 + queue_key: key, 108 + } = &*state.lock().await; 109 + let mut conn = client.get_multiplexed_async_connection().await?; 110 + let res: Option<(String, String)> = redis::cmd("BRPOP") 111 + .arg(key) 112 + .arg(timeout_secs as usize) 113 + .query_async(&mut conn) 114 + .await?; 115 + Ok(res.map(|(_, v)| v)) 116 + } 117 + 118 + pub async fn push_to_queue( 119 + state: Arc<Mutex<AppState>>, 120 + item: &WebhookEnvelope, 121 + ) -> redis::RedisResult<()> { 122 + let payload = serde_json::to_string(item).unwrap(); 123 + let AppState { 124 + redis: client, 125 + queue_key: key, 126 + } = &*state.lock().await; 127 + let mut conn = client.get_multiplexed_async_connection().await?; 128 + let _: () = redis::pipe() 129 + .cmd("RPUSH") 130 + .arg(key) 131 + .arg(payload) 132 + .ignore() 133 + .cmd("EXPIRE") 134 + .arg(key) 135 + .arg(60 * 60 * 24) // 24h 136 + .query_async(&mut conn) 137 + .await?; 138 + Ok(()) 139 + }

Submissions

sign up or login to add to the discussion
tsiry-sandratraina.com submitted #0
2 commits
expand
feat: implement Discord webhook integration for scrobble events
fix: update Discord webhook integration to use non-optional IDs and improve embed generation
pull request successfully merged