at main 254 lines 7.8 kB view raw
1use std::time::Duration; 2 3use futures_util::{SinkExt, StreamExt}; 4use tokio::sync::mpsc; 5use tokio_tungstenite::{connect_async, tungstenite::Message}; 6use tracing::{debug, error, info, trace, warn}; 7use url::Url; 8 9use crate::error::IndexError; 10 11use super::{TapAck, TapEvent}; 12 13/// Messages sent to the writer task 14enum WriteCommand { 15 #[allow(dead_code)] 16 Ack(u64), 17 Pong(bytes::Bytes), 18} 19 20/// Configuration for tap consumer 21#[derive(Debug, Clone)] 22pub struct TapConfig { 23 /// WebSocket URL for tap (e.g., ws://localhost:2480/channel) 24 pub url: Url, 25 /// Whether to send acks (disable for fire-and-forget mode) 26 pub send_acks: bool, 27 /// Reconnect delay on connection failure 28 pub reconnect_delay: Duration, 29} 30 31impl TapConfig { 32 pub fn new(url: Url) -> Self { 33 Self { 34 url, 35 send_acks: true, 36 reconnect_delay: Duration::from_secs(5), 37 } 38 } 39 40 pub fn with_acks(mut self, send_acks: bool) -> Self { 41 self.send_acks = send_acks; 42 self 43 } 44} 45 46/// Consumer that connects to tap's websocket and yields events 47pub struct TapConsumer { 48 config: TapConfig, 49} 50 51impl TapConsumer { 52 pub fn new(config: TapConfig) -> Self { 53 Self { config } 54 } 55 56 /// Connect to tap and return channels for events and acks 57 /// 58 /// Returns a receiver for events and a sender for acks. 59 /// The consumer handles reconnection internally. 60 pub async fn connect( 61 &self, 62 ) -> Result<(mpsc::Receiver<TapEvent>, mpsc::Sender<u64>), IndexError> { 63 let (event_tx, event_rx) = mpsc::channel::<TapEvent>(10000); 64 let (ack_tx, ack_rx) = mpsc::channel::<u64>(10000); 65 66 let config = self.config.clone(); 67 tokio::spawn(async move { 68 run_connection_loop(config, event_tx, ack_rx).await; 69 }); 70 71 Ok((event_rx, ack_tx)) 72 } 73} 74 75async fn run_connection_loop( 76 config: TapConfig, 77 event_tx: mpsc::Sender<TapEvent>, 78 ack_rx: mpsc::Receiver<u64>, 79) { 80 loop { 81 info!(url = %config.url, "connecting to tap"); 82 83 match connect_async(config.url.as_str()).await { 84 Ok((ws_stream, _response)) => { 85 info!("connected to tap"); 86 87 let (write, read) = ws_stream.split(); 88 89 // Channel for reader -> writer communication (pongs, etc) 90 let (write_tx, write_rx) = mpsc::channel::<WriteCommand>(10000); 91 92 // Spawn writer task 93 let send_acks = config.send_acks; 94 let writer_handle = tokio::spawn(run_writer(write, write_rx, ack_rx, send_acks)); 95 96 // Run reader in current task 97 let reader_result = run_reader(read, event_tx.clone(), write_tx, send_acks).await; 98 99 // Reader finished - abort writer and wait for it 100 writer_handle.abort(); 101 let _ = writer_handle.await; 102 103 // Get back the ack_rx from... wait, we moved it. Need to restructure. 104 // For now, if reader dies we'll reconnect with a fresh ack channel state 105 106 match reader_result { 107 ReaderResult::Closed => { 108 info!("tap connection closed"); 109 } 110 ReaderResult::Error(e) => { 111 warn!(error = %e, "tap reader error"); 112 } 113 ReaderResult::ChannelClosed => { 114 error!("event channel closed, stopping tap consumer"); 115 return; 116 } 117 } 118 119 // We lost the ack_rx to the writer task, need to break out 120 // and let caller reconnect if needed 121 break; 122 } 123 Err(e) => { 124 error!(error = ?e, "failed to connect to tap"); 125 } 126 } 127 128 // Reconnect after delay 129 info!(delay = ?config.reconnect_delay, "reconnecting to tap"); 130 tokio::time::sleep(config.reconnect_delay).await; 131 } 132} 133 134enum ReaderResult { 135 Closed, 136 Error(String), 137 ChannelClosed, 138} 139 140async fn run_reader<S>( 141 mut read: S, 142 event_tx: mpsc::Sender<TapEvent>, 143 write_tx: mpsc::Sender<WriteCommand>, 144 send_acks: bool, 145) -> ReaderResult 146where 147 S: StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin, 148{ 149 while let Some(msg) = read.next().await { 150 match msg { 151 Ok(Message::Text(text)) => match serde_json::from_str::<TapEvent>(&text) { 152 Ok(event) => { 153 let event_id = event.id(); 154 if event_tx.send(event).await.is_err() { 155 return ReaderResult::ChannelClosed; 156 } 157 158 if !send_acks { 159 debug!(id = event_id, "event received (fire-and-forget)"); 160 } 161 } 162 Err(e) => { 163 warn!(error = ?e, text = %text, "failed to parse tap event"); 164 } 165 }, 166 Ok(Message::Ping(data)) => { 167 if write_tx.send(WriteCommand::Pong(data)).await.is_err() { 168 return ReaderResult::Error("writer channel closed".into()); 169 } 170 } 171 Ok(Message::Close(_)) => { 172 return ReaderResult::Closed; 173 } 174 Ok(_) => { 175 // Ignore binary, pong, etc. 176 } 177 Err(e) => { 178 return ReaderResult::Error(e.to_string()); 179 } 180 } 181 } 182 ReaderResult::Closed 183} 184 185async fn run_writer<S>( 186 mut write: S, 187 mut write_rx: mpsc::Receiver<WriteCommand>, 188 mut ack_rx: mpsc::Receiver<u64>, 189 send_acks: bool, 190) where 191 S: SinkExt<Message> + Unpin, 192 S::Error: std::fmt::Display, 193{ 194 loop { 195 tokio::select! { 196 biased; 197 198 // Handle pongs and other write commands from reader 199 cmd = write_rx.recv() => { 200 match cmd { 201 Some(WriteCommand::Pong(data)) => { 202 if let Err(e) = write.send(Message::Pong(data)).await { 203 warn!(error = %e, "failed to send pong"); 204 return; 205 } 206 } 207 Some(WriteCommand::Ack(id)) => { 208 if send_acks { 209 if let Err(e) = send_ack(&mut write, id).await { 210 warn!(error = %e, id, "failed to send ack"); 211 return; 212 } 213 } 214 } 215 None => { 216 // Reader closed the channel, we're done 217 return; 218 } 219 } 220 } 221 222 // Handle acks from the indexer 223 id = ack_rx.recv(), if send_acks => { 224 match id { 225 Some(id) => { 226 if let Err(e) = send_ack(&mut write, id).await { 227 warn!(error = %e, id, "failed to send ack"); 228 return; 229 } 230 } 231 None => { 232 // Ack channel closed, indexer is done 233 return; 234 } 235 } 236 } 237 } 238 } 239} 240 241async fn send_ack<S>(write: &mut S, id: u64) -> Result<(), String> 242where 243 S: SinkExt<Message> + Unpin, 244 S::Error: std::fmt::Display, 245{ 246 let ack = TapAck::new(id); 247 let json = serde_json::to_string(&ack).map_err(|e| e.to_string())?; 248 write 249 .send(Message::Text(json.into())) 250 .await 251 .map_err(|e| e.to_string())?; 252 trace!(id, "sent ack"); 253 Ok(()) 254}