use std::thread; use std::time::Duration; use futures_util::stream::StreamExt; use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; use jacquard::url::Url; use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient}; use tokio::sync::broadcast; use tokio::task::{self, JoinHandle}; use crate::config; pub async fn queue<'a>(tx: broadcast::Sender>) -> JoinHandle<()> { // let queue = Arc::new(Mutex::new(VecDeque::new())); // USER_SUBSCRIBE_URL is formatted as a domain let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL)) .expect("Env var USER_SUBSCRIBE_URL should be formated as a domain."); let client = TungsteniteSubscriptionClient::from_base_uri(uri); let (_sink, mut messages) = client .subscribe(&SubscribeRepos::new().build()) .await .expect("Could not subscribe to new events") .into_stream(); // let queue_clone = queue.clone(); task::spawn(async move { loop { if let Some(msg) = messages.next().await { let msg = match msg { Ok(val) => val, Err(err) => { eprintln!("Warning: Websocket error: {} ({:?})", err, err.source()); if &jacquard::StreamErrorKind::Closed == err.kind() { let stream = client.subscribe(&SubscribeRepos::new().build()).await; // if it reconnected successfully, just continue let new_messages = match stream { Ok(val) => val.into_stream().1, // if it failed, try reconnect 10 times, waiting a second between each attempt Err(_) => { let mut new_messages = None; for i in 0..config::MAX_RECONNECT_ATTEMPTS { new_messages = match client .subscribe(&SubscribeRepos::new().build()) .await { Ok(val) => Some(val.into_stream().1), Err(err) => { eprintln!( "Warning: Error: {} ({}/{})", err, i + 1, config::MAX_RECONNECT_ATTEMPTS ); thread::sleep(Duration::from_secs(1)); continue; } } } if let Some(new_messages) = new_messages { println!("Reconnected."); new_messages } else { // could not reconnect so just die lmao panic!("Could not reconnect to client. Fatal"); } } }; // for some reason new_messages doesnt need to be mut ? messages = new_messages; } continue; } }; // filter messages by user did // note that #identity #account #info and #unknown will probably be ignored let ev = match msg.clone() { SubscribeReposMessage::Commit(commit) => { if commit.repo != *config::USER_DID { continue; } else { SubscribeReposMessage::Commit(commit) } } SubscribeReposMessage::Sync(sync) => { if sync.did != *config::USER_DID { continue; } else { SubscribeReposMessage::Sync(sync) } } SubscribeReposMessage::Identity(identity) => { if identity.did != *config::USER_DID { continue; } else { eprintln!( "Warning: Recieved #identity event. Configuration may be out of date" ); SubscribeReposMessage::Identity(identity) } } SubscribeReposMessage::Account(account) => { if account.did != *config::USER_DID { continue; } else { eprintln!( "Warning: Recieved #account event. Account active: `{}`. Account status: `{}`", account.active, account.status.clone().unwrap_or("Unknown".into()) ); SubscribeReposMessage::Account(account) } } SubscribeReposMessage::Info(info) => SubscribeReposMessage::Info(info), SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data), }; let _ = tx.send(ev); } } }) }