+43
-33
Diff
round #1
+43
-33
crates/tranquil-sync/src/subscribe_repos.rs
+43
-33
crates/tranquil-sync/src/subscribe_repos.rs
···
73
73
if let Some(cursor) = params.cursor {
74
74
let cursor_seq = SequenceNumber::from_raw(cursor);
75
75
let current_seq = state
76
-
.repos.repo
76
+
.repos
77
+
.repo
77
78
.get_max_seq()
78
79
.await
79
80
.unwrap_or(SequenceNumber::ZERO);
···
91
92
let backfill_time = chrono::Utc::now() - chrono::Duration::hours(get_backfill_hours());
92
93
93
94
let first_event = state
94
-
.repos.repo
95
+
.repos
96
+
.repo
95
97
.get_events_since_cursor(cursor_seq, 1)
96
98
.await
97
99
.ok()
···
110
112
}
111
113
112
114
let earliest = state
113
-
.repos.repo
115
+
.repos
116
+
.repo
114
117
.get_min_seq_since(backfill_time)
115
118
.await
116
119
.ok()
···
125
128
126
129
loop {
127
130
let events = state
128
-
.repos.repo
131
+
.repos
132
+
.repo
129
133
.get_events_since_cursor(current_cursor, BACKFILL_BATCH_SIZE)
130
134
.await;
131
135
match events {
···
204
208
let max_lag_before_disconnect: u64 = tranquil_config::get().firehose.max_lag;
205
209
loop {
206
210
tokio::select! {
207
-
result = rx.recv() => {
208
-
match result {
209
-
Ok(event) => {
210
-
if event.seq <= last_seen {
211
-
continue;
212
-
}
213
-
last_seen = event.seq;
214
-
if let Err(e) = send_event(socket, state, event).await {
215
-
warn!("Failed to send event: {}", e);
216
-
break;
217
-
}
218
-
tranquil_pds::metrics::record_firehose_event();
211
+
result = rx.recv() => match result {
212
+
Ok(event) => {
213
+
if event.seq <= last_seen {
214
+
continue;
219
215
}
220
-
Err(RecvError::Lagged(skipped)) => {
221
-
warn!(skipped = skipped, "Firehose subscriber lagged behind");
222
-
if skipped > max_lag_before_disconnect {
223
-
warn!(skipped = skipped, max_lag = max_lag_before_disconnect,
224
-
"Disconnecting slow firehose consumer");
225
-
break;
226
-
}
216
+
last_seen = event.seq;
217
+
if let Err(e) = send_event(socket, state, event).await {
218
+
warn!("Failed to send event: {}", e);
219
+
break;
227
220
}
228
-
Err(RecvError::Closed) => {
229
-
info!("Firehose channel closed");
221
+
tranquil_pds::metrics::record_firehose_event();
222
+
}
223
+
Err(RecvError::Lagged(skipped)) => {
224
+
warn!(skipped = skipped, "Firehose subscriber lagged behind");
225
+
if skipped > max_lag_before_disconnect {
226
+
warn!(skipped = skipped, max_lag = max_lag_before_disconnect,
227
+
"Disconnecting slow firehose consumer");
230
228
break;
231
229
}
232
230
}
233
-
}
234
-
Some(Ok(msg)) = socket.next() => {
235
-
if let Message::Close(_) = msg {
236
-
info!("Client closed connection");
231
+
Err(RecvError::Closed) => {
232
+
info!("Firehose channel closed");
237
233
break;
238
234
}
239
-
}
240
-
else => {
241
-
break;
242
-
}
235
+
},
236
+
next = socket.next() => match next {
237
+
None => {
238
+
info!("Client closed connection abruptly");
239
+
break;
240
+
}
241
+
Some(msg) => {
242
+
let Ok(msg) = msg else {
243
+
info!("Client closed connection abruptly");
244
+
break;
245
+
};
246
+
247
+
if let Message::Close(_) = msg {
248
+
info!("Client closed connection");
249
+
break;
250
+
}
251
+
}
252
+
},
243
253
}
244
254
}
245
255
Ok(())
History
2 rounds
0 comments
expand 0 comments
pull request successfully merged