this repo has no description
1use std::thread;
2use std::time::Duration;
3
4use futures_util::stream::StreamExt;
5use jacquard::api::com_atproto::sync::subscribe_repos::{SubscribeRepos, SubscribeReposMessage};
6use jacquard::url::Url;
7use jacquard::{common::xrpc::TungsteniteSubscriptionClient, xrpc::SubscriptionClient};
8use tokio::sync::broadcast;
9use tokio::task::{self, JoinHandle};
10
11use crate::config;
12
13pub async fn queue<'a>(tx: broadcast::Sender<SubscribeReposMessage<'static>>) -> JoinHandle<()> {
14 // let queue = Arc::new(Mutex::new(VecDeque::new()));
15
16 // USER_SUBSCRIBE_URL is formatted as a domain
17 let uri = Url::parse(&format!("wss://{}/", config::USER_SUBSCRIBE_URL))
18 .expect("Env var USER_SUBSCRIBE_URL should be formated as a domain.");
19 let client = TungsteniteSubscriptionClient::from_base_uri(uri);
20 let (_sink, mut messages) = client
21 .subscribe(&SubscribeRepos::new().build())
22 .await
23 .expect("Could not subscribe to new events")
24 .into_stream();
25
26 // let queue_clone = queue.clone();
27 task::spawn(async move {
28 loop {
29 if let Some(msg) = messages.next().await {
30 let msg = match msg {
31 Ok(val) => val,
32 Err(err) => {
33 eprintln!("Warning: Websocket error: {} ({:?})", err, err.source());
34 if &jacquard::StreamErrorKind::Closed == err.kind() {
35 let stream = client.subscribe(&SubscribeRepos::new().build()).await;
36 // if it reconnected successfully, just continue
37 let new_messages = match stream {
38 Ok(val) => val.into_stream().1,
39 // if it failed, try reconnect 10 times, waiting a second between each attempt
40 Err(_) => {
41 let mut new_messages = None;
42 for i in 0..config::MAX_RECONNECT_ATTEMPTS {
43 new_messages = match client
44 .subscribe(&SubscribeRepos::new().build())
45 .await
46 {
47 Ok(val) => Some(val.into_stream().1),
48 Err(err) => {
49 eprintln!(
50 "Warning: Error: {} ({}/{})",
51 err,
52 i + 1,
53 config::MAX_RECONNECT_ATTEMPTS
54 );
55 thread::sleep(Duration::from_secs(1));
56 continue;
57 }
58 }
59 }
60
61 if let Some(new_messages) = new_messages {
62 println!("Reconnected.");
63 new_messages
64 } else {
65 // could not reconnect so just die lmao
66 panic!("Could not reconnect to client. Fatal");
67 }
68 }
69 };
70 // for some reason new_messages doesnt need to be mut ?
71 messages = new_messages;
72 }
73 continue;
74 }
75 };
76
77 // filter messages by user did
78 // note that #identity #account #info and #unknown will probably be ignored
79 let ev = match msg.clone() {
80 SubscribeReposMessage::Commit(commit) => {
81 if commit.repo != *config::USER_DID {
82 continue;
83 } else {
84 SubscribeReposMessage::Commit(commit)
85 }
86 }
87 SubscribeReposMessage::Sync(sync) => {
88 if sync.did != *config::USER_DID {
89 continue;
90 } else {
91 SubscribeReposMessage::Sync(sync)
92 }
93 }
94
95 SubscribeReposMessage::Identity(identity) => {
96 if identity.did != *config::USER_DID {
97 continue;
98 } else {
99 eprintln!(
100 "Warning: Recieved #identity event. Configuration may be out of date"
101 );
102 SubscribeReposMessage::Identity(identity)
103 }
104 }
105 SubscribeReposMessage::Account(account) => {
106 if account.did != *config::USER_DID {
107 continue;
108 } else {
109 eprintln!(
110 "Warning: Recieved #account event. Account active: `{}`. Account status: `{}`",
111 account.active,
112 account.status.clone().unwrap_or("Unknown".into())
113 );
114 SubscribeReposMessage::Account(account)
115 }
116 }
117 SubscribeReposMessage::Info(info) => SubscribeReposMessage::Info(info),
118 SubscribeReposMessage::Unknown(data) => SubscribeReposMessage::Unknown(data),
119 };
120
121 let _ = tx.send(ev);
122 }
123 }
124 })
125}