Our Personal Data Server from scratch! tranquil.farm
atproto pds rust postgresql fun oauth

fix(sync): dont keep websockets arround indefinetly if client abruptly closes connection #100

merged opened by nel.pet targeting main from fix/zombie-websocket-connections
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:h5wsnqetncv6lu2weom35lg2/sh.tangled.repo.pull/3mhupgudy7b22
+43 -33
Diff #1
+43 -33
crates/tranquil-sync/src/subscribe_repos.rs
··· 73 73 if let Some(cursor) = params.cursor { 74 74 let cursor_seq = SequenceNumber::from_raw(cursor); 75 75 let current_seq = state 76 - .repos.repo 76 + .repos 77 + .repo 77 78 .get_max_seq() 78 79 .await 79 80 .unwrap_or(SequenceNumber::ZERO); ··· 91 92 let backfill_time = chrono::Utc::now() - chrono::Duration::hours(get_backfill_hours()); 92 93 93 94 let first_event = state 94 - .repos.repo 95 + .repos 96 + .repo 95 97 .get_events_since_cursor(cursor_seq, 1) 96 98 .await 97 99 .ok() ··· 110 112 } 111 113 112 114 let earliest = state 113 - .repos.repo 115 + .repos 116 + .repo 114 117 .get_min_seq_since(backfill_time) 115 118 .await 116 119 .ok() ··· 125 128 126 129 loop { 127 130 let events = state 128 - .repos.repo 131 + .repos 132 + .repo 129 133 .get_events_since_cursor(current_cursor, BACKFILL_BATCH_SIZE) 130 134 .await; 131 135 match events { ··· 204 208 let max_lag_before_disconnect: u64 = tranquil_config::get().firehose.max_lag; 205 209 loop { 206 210 tokio::select! { 207 - result = rx.recv() => { 208 - match result { 209 - Ok(event) => { 210 - if event.seq <= last_seen { 211 - continue; 212 - } 213 - last_seen = event.seq; 214 - if let Err(e) = send_event(socket, state, event).await { 215 - warn!("Failed to send event: {}", e); 216 - break; 217 - } 218 - tranquil_pds::metrics::record_firehose_event(); 211 + result = rx.recv() => match result { 212 + Ok(event) => { 213 + if event.seq <= last_seen { 214 + continue; 219 215 } 220 - Err(RecvError::Lagged(skipped)) => { 221 - warn!(skipped = skipped, "Firehose subscriber lagged behind"); 222 - if skipped > max_lag_before_disconnect { 223 - warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 224 - "Disconnecting slow firehose consumer"); 225 - break; 226 - } 216 + last_seen = event.seq; 217 + if let Err(e) = send_event(socket, state, event).await { 218 + warn!("Failed to send event: {}", e); 219 + break; 227 220 } 228 - Err(RecvError::Closed) => { 229 - info!("Firehose channel closed"); 221 + tranquil_pds::metrics::record_firehose_event(); 222 + } 223 + Err(RecvError::Lagged(skipped)) => { 224 + warn!(skipped = skipped, "Firehose subscriber lagged behind"); 225 + if skipped > max_lag_before_disconnect { 226 + warn!(skipped = skipped, max_lag = max_lag_before_disconnect, 227 + "Disconnecting slow firehose consumer"); 230 228 break; 231 229 } 232 230 } 233 - } 234 - Some(Ok(msg)) = socket.next() => { 235 - if let Message::Close(_) = msg { 236 - info!("Client closed connection"); 231 + Err(RecvError::Closed) => { 232 + info!("Firehose channel closed"); 237 233 break; 238 234 } 239 - } 240 - else => { 241 - break; 242 - } 235 + }, 236 + next = socket.next() => match next { 237 + None => { 238 + info!("Client closed connection abruptly"); 239 + break; 240 + } 241 + Some(msg) => { 242 + let Ok(msg) = msg else { 243 + info!("Client closed connection abruptly"); 244 + break; 245 + }; 246 + 247 + if let Message::Close(_) = msg { 248 + info!("Client closed connection"); 249 + break; 250 + } 251 + } 252 + }, 243 253 } 244 254 } 245 255 Ok(())

History

2 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
fix(sync): dont keep websockets arround indefinetly if client abruptly closes connection
expand 0 comments
pull request successfully merged
nel.pet submitted #0
1 commit
expand
fix(sync): dont keep websockets arround indefinetly if client abruptly closes connection
expand 0 comments