Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

jetstream: time out recv after a ttl

sometimes the socket gets disrupted and we otherwise just wait forever

this *could* (and maybe should) be done by detecting missed pongs instead.

Changed files
+31 -4
jetstream
+2
jetstream/src/error.rs
··· 40 40 CompressionDictionaryError(io::Error), 41 41 #[error("failed to send ping or pong: {0}")] 42 42 PingPongError(#[from] tokio_tungstenite::tungstenite::Error), 43 + #[error("no messages received within ttl")] 44 + NoMessagesReceived, 43 45 #[error("jetstream event receiver closed")] 44 46 ReceiverClosedError, 45 47 }
+29 -4
jetstream/src/lib.rs
··· 27 27 Receiver, 28 28 Sender, 29 29 }, 30 + time::timeout, 30 31 }; 31 32 use tokio_tungstenite::{ 32 33 connect_async, ··· 200 201 /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory 201 202 /// usage while events are buffered. 202 203 pub channel_size: usize, 204 + /// How long since the last jetstream message before we consider the connection dead 205 + /// 206 + /// Default: 15s 207 + pub liveliness_ttl: Duration, 203 208 } 204 209 205 210 impl Default for JetstreamConfig { ··· 213 218 omit_user_agent_jetstream_info: false, 214 219 replay_on_reconnect: false, 215 220 channel_size: 4096, // a few seconds of firehose buffer 221 + liveliness_ttl: Duration::from_secs(15), 216 222 } 217 223 } 218 224 } ··· 380 386 381 387 let (send_channel, receive_channel) = channel(self.config.channel_size); 382 388 let replay_on_reconnect = self.config.replay_on_reconnect; 389 + let liveliness_ttl = self.config.liveliness_ttl; 383 390 let build_request = self.config.get_request_builder(); 384 391 385 392 tokio::task::spawn(async move { ··· 414 421 if let Ok((ws_stream, _)) = connect_async(req).await { 415 422 let t_connected = Instant::now(); 416 423 log::info!("jetstream connected. starting websocket task..."); 417 - if let Err(e) = 418 - websocket_task(dict, ws_stream, send_channel.clone(), &mut last_cursor) 419 - .await 424 + if let Err(e) = websocket_task( 425 + dict, 426 + ws_stream, 427 + send_channel.clone(), 428 + &mut last_cursor, 429 + liveliness_ttl, 430 + ) 431 + .await 420 432 { 421 433 match e { 422 434 JetstreamEventError::ReceiverClosedError => { ··· 428 440 JetstreamEventError::CompressionDictionaryError(_) => { 429 441 #[cfg(feature="metrics")] 430 442 counter!("jetstream_disconnects", "reason" => "zstd", "fatal" => "no").increment(1); 443 + } 444 + JetstreamEventError::NoMessagesReceived => { 445 + #[cfg(feature="metrics")] 446 + counter!("jetstream_disconnects", "reason" => "ttl", "fatal" => "no").increment(1); 431 447 } 432 448 JetstreamEventError::PingPongError(_) => { 433 449 #[cfg(feature="metrics")] ··· 481 497 ws: WebSocketStream<MaybeTlsStream<TcpStream>>, 482 498 send_channel: JetstreamSender, 483 499 last_cursor: &mut Option<Cursor>, 500 + liveliness_ttl: Duration, 484 501 ) -> Result<(), JetstreamEventError> { 485 502 // TODO: Use the write half to allow the user to change configuration settings on the fly. 486 503 let (mut socket_write, mut socket_read) = ws.split(); 487 504 488 505 let mut closing_connection = false; 489 506 loop { 490 - match socket_read.next().await { 507 + let next = match timeout(liveliness_ttl, socket_read.next()).await { 508 + Ok(n) => n, 509 + Err(_) => { 510 + log::warn!("jetstream no events for {liveliness_ttl:?}, closing"); 511 + _ = socket_write.close().await; 512 + return Err(JetstreamEventError::NoMessagesReceived); 513 + } 514 + }; 515 + match next { 491 516 Some(Ok(message)) => match message { 492 517 Message::Text(json) => { 493 518 #[cfg(feature = "metrics")]