this repo has no description
at main 125 lines 6.0 kB view raw
1use std::thread; 2use std::time::Duration; 3 4use futures_util::stream::StreamExt; 5use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage}; 6use jacquard::url::Url; 7use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient}; 8use tokio::sync::broadcast; 9use tokio::task::{self, JoinHandle}; 10 11use crate::config; 12 13pub async fn queue<'a>(tx: broadcast::Sender<SubscribeReposMessage<'static>>) -> JoinHandle<()> { 14 // let queue = Arc::new(Mutex::new(VecDeque::new())); 15 16 // USER_SUBSCRIBE_URL is formatted as a domain 17 let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL)) 18 .expect("Env var USER_SUBSCRIBE_URL should be formated as a domain."); 19 let client = TungsteniteSubscriptionClient::from_base_uri(uri); 20 let (_sink, mut messages) = client 21 .subscribe(&SubscribeRepos::new().build()) 22 .await 23 .expect("Could not subscribe to new events") 24 .into_stream(); 25 26 // let queue_clone = queue.clone(); 27 task::spawn(async move { 28 loop { 29 if let Some(msg) = messages.next().await { 30 let msg = match msg { 31 Ok(val) => val, 32 Err(err) => { 33 eprintln!("Warning: Websocket error: {} ({:?})", err, err.source()); 34 if &jacquard::StreamErrorKind::Closed == err.kind() { 35 let stream = client.subscribe(&SubscribeRepos::new().build()).await; 36 // if it reconnected successfully, just continue 37 let new_messages = match stream { 38 Ok(val) => val.into_stream().1, 39 // if it failed, try reconnect 10 times, waiting a second between each attempt 40 Err(_) => { 41 let mut new_messages = None; 42 for i in 0..config::MAX_RECONNECT_ATTEMPTS { 43 new_messages = match client 44 .subscribe(&SubscribeRepos::new().build()) 45 .await 46 { 47 Ok(val) => Some(val.into_stream().1), 48 Err(err) => { 49 eprintln!( 50 "Warning: Error: {} ({}/{})", 51 err, 52 i + 1, 53 config::MAX_RECONNECT_ATTEMPTS 54 ); 55 thread::sleep(Duration::from_secs(1)); 56 continue; 57 } 58 } 59 } 60 61 if let Some(new_messages) = new_messages { 62 println!("Reconnected."); 63 new_messages 64 } else { 65 // could not reconnect so just die lmao 66 panic!("Could not reconnect to client. Fatal"); 67 } 68 } 69 }; 70 // for some reason new_messages doesnt need to be mut ? 71 messages = new_messages; 72 } 73 continue; 74 } 75 }; 76 77 // filter messages by user did 78 // note that #identity #account #info and #unknown will probably be ignored 79 let ev = match msg.clone() { 80 SubscribeReposMessage::Commit(commit) => { 81 if commit.repo != *config::USER_DID { 82 continue; 83 } else { 84 SubscribeReposMessage::Commit(commit) 85 } 86 } 87 SubscribeReposMessage::Sync(sync) => { 88 if sync.did != *config::USER_DID { 89 continue; 90 } else { 91 SubscribeReposMessage::Sync(sync) 92 } 93 } 94 95 SubscribeReposMessage::Identity(identity) => { 96 if identity.did != *config::USER_DID { 97 continue; 98 } else { 99 eprintln!( 100 "Warning: Recieved #identity event. Configuration may be out of date" 101 ); 102 SubscribeReposMessage::Identity(identity) 103 } 104 } 105 SubscribeReposMessage::Account(account) => { 106 if account.did != *config::USER_DID { 107 continue; 108 } else { 109 eprintln!( 110 "Warning: Recieved #account event. Account active: `{}`. Account status: `{}`", 111 account.active, 112 account.status.clone().unwrap_or("Unknown".into()) 113 ); 114 SubscribeReposMessage::Account(account) 115 } 116 } 117 SubscribeReposMessage::Info(info) => SubscribeReposMessage::Info(info), 118 SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data), 119 }; 120 121 let _ = tx.send(ev); 122 } 123 } 124 }) 125}