A decentralized music tracking and discovery platform built on AT Protocol 馃幍
at main 47 lines 1.6 kB view raw
1use anyhow::Error; 2use std::{env, sync::Arc}; 3use subscriber::ScrobbleSubscriber; 4use tokio::sync::Mutex; 5 6use crate::webhook_worker::{start_worker, AppState}; 7 8pub mod profile; 9pub mod repo; 10pub mod subscriber; 11pub mod types; 12pub mod webhook; 13pub mod webhook_worker; 14pub mod xata; 15 16pub async fn subscribe() -> Result<(), Error> { 17 let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); 18 let redis = redis::Client::open(redis_url)?; 19 let queue_key = 20 env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string()); 21 22 let state = Arc::new(Mutex::new(AppState { redis, queue_key })); 23 24 start_worker(state.clone()).await?; 25 26 let jetstream_server = env::var("JETSTREAM_SERVER") 27 .unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network".to_string()); 28 let url = format!( 29 "{}/subscribe?wantedCollections=app.rocksky.*", 30 jetstream_server 31 ); 32 let subscriber = ScrobbleSubscriber::new(&url); 33 34 // loop, reconnecting on failure 35 loop { 36 match subscriber.run(state.clone()).await { 37 Ok(_) => tracing::info!("Connected to jetstream server"), 38 Err(e) => { 39 tracing::error!(error = %e, "Failed to connect to jetstream server, retrying in 1 second..."); 40 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 41 continue; 42 } 43 } 44 tracing::warn!("Disconnected from jetstream server, reconnecting in 1 second..."); 45 tokio::time::sleep(std::time::Duration::from_secs(1)).await; 46 } 47}