Tap drinker

fix: modify ping behaviour

Signed-off-by: tjh <x@tjh.dev>

tjh.dev 4745e9a9 80375940

verified
+11 -21
+11 -21
src/tap/channel.rs
··· 1 - use std::{ 2 - collections::HashSet, 3 - time::{Duration, SystemTime}, 4 - }; 1 + use std::time::{Duration, SystemTime}; 5 2 6 3 use futures_util::{SinkExt as _, StreamExt}; 7 4 use serde::Serialize; ··· 13 10 use tokio_util::sync::{CancellationToken, DropGuard}; 14 11 15 12 use crate::tap::{TapClient, TapEvent}; 16 - 17 - /// Maximum number of unanswered Ping messages to allow before the connection is 18 - /// considered broken. 19 - const MAX_INFLIGHT_PINGS: usize = 2; 20 13 21 14 const TIMEOUT: Duration = Duration::from_secs(30); 22 15 ··· 142 135 ClearAcks, 143 136 } 144 137 145 - let mut pings: HashSet<Bytes> = HashSet::with_capacity(MAX_INFLIGHT_PINGS + 1); 146 - let mut timeout = tokio::time::interval(TIMEOUT); 147 - timeout.tick().await; 148 - 149 138 let (ack_tx, mut ack_rx) = mpsc::channel(capacity); 150 139 let mut acks: Vec<_> = Default::default(); 151 140 152 141 'outer: loop { 142 + let mut ping_inflight = false; 143 + let mut timeout = tokio::time::interval(TIMEOUT); 144 + timeout.tick().await; 145 + 153 146 let request = request_builder.clone(); 154 147 let (mut socket, _) = match tokio_tungstenite::connect_async(request).await { 155 148 Ok(result) => result, ··· 214 207 Message::Ping(bytes) => { 215 208 if let Err(error) = socket.send(Message::Pong(bytes)).await { 216 209 tracing::error!(?error, "failed to send Pong"); 217 - continue 'outer; 210 + break; 218 211 } 219 212 } 220 213 Message::Pong(bytes) => { 221 214 tracing::trace!(?bytes, "received Pong from server"); 222 - if !pings.remove(&bytes) { 223 - tracing::error!("unsolicited Pong"); 224 - break; 225 - } 215 + ping_inflight = false; 226 216 } 227 217 Message::Close(close_frame) => { 228 218 tracing::debug!(?close_frame, "received close frame"); ··· 236 226 } 237 227 } 238 228 Action::Timeout => { 239 - if pings.len() > MAX_INFLIGHT_PINGS { 240 - tracing::error!("too many missed pings"); 229 + if ping_inflight { 230 + tracing::error!("missed ping"); 241 231 break; 242 232 } 243 233 ··· 248 238 249 239 let payload = format!("{timestamp}"); 250 240 let payload: Bytes = payload.into(); 251 - pings.insert(payload.clone()); 252 - 253 241 if socket.send(Message::Ping(payload)).await.is_err() { 254 242 tracing::error!("failed to send Ping to server"); 255 243 break; 256 244 } 245 + 246 + ping_inflight = true; 257 247 } 258 248 Action::ClearAcks => { 259 249 drop(ack_tx);