A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 139 lines 3.9 kB view raw
1use crate::webhook::discord::{self, model::WebhookEnvelope}; 2use anyhow::Error; 3use std::{ 4 env, 5 sync::Arc, 6 time::{Duration, Instant}, 7}; 8use tokio::{sync::Mutex, time::interval}; 9 10#[derive(Clone)] 11pub struct AppState { 12 pub redis: redis::Client, 13 pub queue_key: String, 14} 15 16pub async fn start_worker(state: Arc<Mutex<AppState>>) -> Result<(), Error> { 17 let max_rps: u32 = env::var("MAX_REQUESTS_PER_SEC") 18 .ok() 19 .and_then(|s| s.parse().ok()) 20 .unwrap_or(5); 21 let max_embeds_per: usize = env::var("MAX_EMBEDS_PER_REQUEST") 22 .ok() 23 .and_then(|s| s.parse().ok()) 24 .unwrap_or(10); 25 let batch_window_ms: u64 = env::var("BATCH_WINDOW_MS") 26 .ok() 27 .and_then(|s| s.parse().ok()) 28 .unwrap_or(400); 29 let discord_webhook_url = env::var("DISCORD_WEBHOOK_URL").unwrap_or(String::new()); 30 31 tokio::spawn(run_worker( 32 state.clone(), 33 discord_webhook_url, 34 max_rps, 35 max_embeds_per, 36 Duration::from_millis(batch_window_ms), 37 )); 38 39 Ok(()) 40} 41 42async fn run_worker( 43 st: Arc<Mutex<AppState>>, 44 discord_webhook_url: String, 45 max_rps: u32, 46 max_embeds_per: usize, 47 batch_window: Duration, 48) { 49 let http = reqwest::Client::builder() 50 .user_agent("rocksky-discord-bridge/0.1") 51 .build() 52 .expect("http client"); 53 54 let mut tokens = max_rps as i32; 55 let mut refill = interval(Duration::from_secs(1)); 56 refill.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 57 58 loop { 59 tokio::select! { 60 _ = refill.tick() => { 61 tokens = (tokens + max_rps as i32).min(max_rps as i32); 62 } 63 _ = tokio::time::sleep(Duration::from_millis(10)) => { /* tick */ } 64 } 65 66 if tokens <= 0 { 67 continue; 68 } 69 70 let start = Instant::now(); 71 let mut embeds = Vec::with_capacity(max_embeds_per); 72 73 while embeds.len() < max_embeds_per && start.elapsed() < batch_window { 74 match brpop_once(st.clone(), 1).await { 75 Ok(Some(json_str)) => { 76 if let Ok(env) = serde_json::from_str::<WebhookEnvelope>(&json_str) { 77 embeds.push(discord::embed_from_scrobble(&env.data, &env.id)); 78 } 79 } 80 Ok(None) => break, 81 Err(e) => { 82 tracing::error!(error = %e, "Failed to pop from Redis"); 83 break; 84 } 85 } 86 } 87 88 if embeds.is_empty() { 89 tokio::time::sleep(Duration::from_millis(50)).await; 90 continue; 91 } 92 93 tokens -= 1; 94 95 if let Err(e) = discord::post_embeds(&http, &discord_webhook_url, embeds).await { 96 tracing::error!(error = %e, "Failed to post to Discord webhook"); 97 } 98 } 99} 100 101async fn brpop_once( 102 state: Arc<Mutex<AppState>>, 103 timeout_secs: u64, 104) -> redis::RedisResult<Option<String>> { 105 let AppState { 106 redis: client, 107 queue_key: key, 108 } = &*state.lock().await; 109 let mut conn = client.get_multiplexed_async_connection().await?; 110 let res: Option<(String, String)> = redis::cmd("BRPOP") 111 .arg(key) 112 .arg(timeout_secs as usize) 113 .query_async(&mut conn) 114 .await?; 115 Ok(res.map(|(_, v)| v)) 116} 117 118pub async fn push_to_queue( 119 state: Arc<Mutex<AppState>>, 120 item: &WebhookEnvelope, 121) -> redis::RedisResult<()> { 122 let payload = serde_json::to_string(item).unwrap(); 123 let AppState { 124 redis: client, 125 queue_key: key, 126 } = &*state.lock().await; 127 let mut conn = client.get_multiplexed_async_connection().await?; 128 let _: () = redis::pipe() 129 .cmd("RPUSH") 130 .arg(key) 131 .arg(payload) 132 .ignore() 133 .cmd("EXPIRE") 134 .arg(key) 135 .arg(60 * 60 * 24) // 24h 136 .query_async(&mut conn) 137 .await?; 138 Ok(()) 139}