+6
-2
api/src/main.rs
+6
-2
api/src/main.rs
···
187
if now.duration_since(window_start) >= RECONNECT_WINDOW {
188
reconnect_count = 0;
189
window_start = now;
190
}
191
192
// Check rate limit
···
202
}
203
204
reconnect_count += 1;
205
206
// Read cursor position from database
207
let initial_cursor =
···
265
let cancellation_token = atproto_jetstream::CancellationToken::new();
266
match consumer_arc.start_consuming(cancellation_token).await {
267
Ok(_) => {
268
-
tracing::info!("Jetstream consumer shut down normally");
269
jetstream_connected_clone
270
.store(false, std::sync::atomic::Ordering::Relaxed);
271
}
272
Err(e) => {
273
-
tracing::error!("Jetstream consumer failed: {} - will reconnect", e);
274
jetstream_connected_clone
275
.store(false, std::sync::atomic::Ordering::Relaxed);
276
tokio::time::sleep(retry_delay).await;
···
187
if now.duration_since(window_start) >= RECONNECT_WINDOW {
188
reconnect_count = 0;
189
window_start = now;
190
+
retry_delay = tokio::time::Duration::from_secs(5); // Reset delay after window passes
191
}
192
193
// Check rate limit
···
203
}
204
205
reconnect_count += 1;
206
+
tracing::info!("Jetstream connection attempt #{} (retry delay: {:?})", reconnect_count, retry_delay);
207
208
// Read cursor position from database
209
let initial_cursor =
···
267
let cancellation_token = atproto_jetstream::CancellationToken::new();
268
match consumer_arc.start_consuming(cancellation_token).await {
269
Ok(_) => {
270
+
tracing::info!("Jetstream consumer shut down normally - reconnecting in {:?}", retry_delay);
271
jetstream_connected_clone
272
.store(false, std::sync::atomic::Ordering::Relaxed);
273
+
tokio::time::sleep(retry_delay).await;
274
+
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
275
}
276
Err(e) => {
277
+
tracing::error!("Jetstream consumer failed: {} - reconnecting in {:?}", e, retry_delay);
278
jetstream_connected_clone
279
.store(false, std::sync::atomic::Ordering::Relaxed);
280
tokio::time::sleep(retry_delay).await;