atproto blogging
1use std::time::Duration;
2
3use futures_util::{SinkExt, StreamExt};
4use tokio::sync::mpsc;
5use tokio_tungstenite::{connect_async, tungstenite::Message};
6use tracing::{debug, error, info, trace, warn};
7use url::Url;
8
9use crate::error::IndexError;
10
11use super::{TapAck, TapEvent};
12
13/// Messages sent to the writer task
14enum WriteCommand {
15 #[allow(dead_code)]
16 Ack(u64),
17 Pong(bytes::Bytes),
18}
19
20/// Configuration for tap consumer
21#[derive(Debug, Clone)]
22pub struct TapConfig {
23 /// WebSocket URL for tap (e.g., ws://localhost:2480/channel)
24 pub url: Url,
25 /// Whether to send acks (disable for fire-and-forget mode)
26 pub send_acks: bool,
27 /// Reconnect delay on connection failure
28 pub reconnect_delay: Duration,
29}
30
31impl TapConfig {
32 pub fn new(url: Url) -> Self {
33 Self {
34 url,
35 send_acks: true,
36 reconnect_delay: Duration::from_secs(5),
37 }
38 }
39
40 pub fn with_acks(mut self, send_acks: bool) -> Self {
41 self.send_acks = send_acks;
42 self
43 }
44}
45
46/// Consumer that connects to tap's websocket and yields events
47pub struct TapConsumer {
48 config: TapConfig,
49}
50
51impl TapConsumer {
52 pub fn new(config: TapConfig) -> Self {
53 Self { config }
54 }
55
56 /// Connect to tap and return channels for events and acks
57 ///
58 /// Returns a receiver for events and a sender for acks.
59 /// The consumer handles reconnection internally.
60 pub async fn connect(
61 &self,
62 ) -> Result<(mpsc::Receiver<TapEvent>, mpsc::Sender<u64>), IndexError> {
63 let (event_tx, event_rx) = mpsc::channel::<TapEvent>(10000);
64 let (ack_tx, ack_rx) = mpsc::channel::<u64>(10000);
65
66 let config = self.config.clone();
67 tokio::spawn(async move {
68 run_connection_loop(config, event_tx, ack_rx).await;
69 });
70
71 Ok((event_rx, ack_tx))
72 }
73}
74
75async fn run_connection_loop(
76 config: TapConfig,
77 event_tx: mpsc::Sender<TapEvent>,
78 ack_rx: mpsc::Receiver<u64>,
79) {
80 loop {
81 info!(url = %config.url, "connecting to tap");
82
83 match connect_async(config.url.as_str()).await {
84 Ok((ws_stream, _response)) => {
85 info!("connected to tap");
86
87 let (write, read) = ws_stream.split();
88
89 // Channel for reader -> writer communication (pongs, etc)
90 let (write_tx, write_rx) = mpsc::channel::<WriteCommand>(10000);
91
92 // Spawn writer task
93 let send_acks = config.send_acks;
94 let writer_handle = tokio::spawn(run_writer(write, write_rx, ack_rx, send_acks));
95
96 // Run reader in current task
97 let reader_result = run_reader(read, event_tx.clone(), write_tx, send_acks).await;
98
99 // Reader finished - abort writer and wait for it
100 writer_handle.abort();
101 let _ = writer_handle.await;
102
103 // Get back the ack_rx from... wait, we moved it. Need to restructure.
104 // For now, if reader dies we'll reconnect with a fresh ack channel state
105
106 match reader_result {
107 ReaderResult::Closed => {
108 info!("tap connection closed");
109 }
110 ReaderResult::Error(e) => {
111 warn!(error = %e, "tap reader error");
112 }
113 ReaderResult::ChannelClosed => {
114 error!("event channel closed, stopping tap consumer");
115 return;
116 }
117 }
118
119 // We lost the ack_rx to the writer task, need to break out
120 // and let caller reconnect if needed
121 break;
122 }
123 Err(e) => {
124 error!(error = ?e, "failed to connect to tap");
125 }
126 }
127
128 // Reconnect after delay
129 info!(delay = ?config.reconnect_delay, "reconnecting to tap");
130 tokio::time::sleep(config.reconnect_delay).await;
131 }
132}
133
134enum ReaderResult {
135 Closed,
136 Error(String),
137 ChannelClosed,
138}
139
140async fn run_reader<S>(
141 mut read: S,
142 event_tx: mpsc::Sender<TapEvent>,
143 write_tx: mpsc::Sender<WriteCommand>,
144 send_acks: bool,
145) -> ReaderResult
146where
147 S: StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin,
148{
149 while let Some(msg) = read.next().await {
150 match msg {
151 Ok(Message::Text(text)) => match serde_json::from_str::<TapEvent>(&text) {
152 Ok(event) => {
153 let event_id = event.id();
154 if event_tx.send(event).await.is_err() {
155 return ReaderResult::ChannelClosed;
156 }
157
158 if !send_acks {
159 debug!(id = event_id, "event received (fire-and-forget)");
160 }
161 }
162 Err(e) => {
163 warn!(error = ?e, text = %text, "failed to parse tap event");
164 }
165 },
166 Ok(Message::Ping(data)) => {
167 if write_tx.send(WriteCommand::Pong(data)).await.is_err() {
168 return ReaderResult::Error("writer channel closed".into());
169 }
170 }
171 Ok(Message::Close(_)) => {
172 return ReaderResult::Closed;
173 }
174 Ok(_) => {
175 // Ignore binary, pong, etc.
176 }
177 Err(e) => {
178 return ReaderResult::Error(e.to_string());
179 }
180 }
181 }
182 ReaderResult::Closed
183}
184
185async fn run_writer<S>(
186 mut write: S,
187 mut write_rx: mpsc::Receiver<WriteCommand>,
188 mut ack_rx: mpsc::Receiver<u64>,
189 send_acks: bool,
190) where
191 S: SinkExt<Message> + Unpin,
192 S::Error: std::fmt::Display,
193{
194 loop {
195 tokio::select! {
196 biased;
197
198 // Handle pongs and other write commands from reader
199 cmd = write_rx.recv() => {
200 match cmd {
201 Some(WriteCommand::Pong(data)) => {
202 if let Err(e) = write.send(Message::Pong(data)).await {
203 warn!(error = %e, "failed to send pong");
204 return;
205 }
206 }
207 Some(WriteCommand::Ack(id)) => {
208 if send_acks {
209 if let Err(e) = send_ack(&mut write, id).await {
210 warn!(error = %e, id, "failed to send ack");
211 return;
212 }
213 }
214 }
215 None => {
216 // Reader closed the channel, we're done
217 return;
218 }
219 }
220 }
221
222 // Handle acks from the indexer
223 id = ack_rx.recv(), if send_acks => {
224 match id {
225 Some(id) => {
226 if let Err(e) = send_ack(&mut write, id).await {
227 warn!(error = %e, id, "failed to send ack");
228 return;
229 }
230 }
231 None => {
232 // Ack channel closed, indexer is done
233 return;
234 }
235 }
236 }
237 }
238 }
239}
240
241async fn send_ack<S>(write: &mut S, id: u64) -> Result<(), String>
242where
243 S: SinkExt<Message> + Unpin,
244 S::Error: std::fmt::Display,
245{
246 let ack = TapAck::new(id);
247 let json = serde_json::to_string(&ack).map_err(|e| e.to_string())?;
248 write
249 .send(Message::Text(json.into()))
250 .await
251 .map_err(|e| e.to_string())?;
252 trace!(id, "sent ack");
253 Ok(())
254}