+15
-7
api/src/jetstream.rs
+15
-7
api/src/jetstream.rs
···
864
864
}
865
865
})?;
866
866
867
-
// Start periodic status reporting
867
+
// Start periodic status reporting (with cancellation support)
868
868
let event_count_for_status = self.event_count.clone();
869
+
let cancellation_token_for_status = cancellation_token.clone();
869
870
tokio::spawn(async move {
870
871
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); // Every minute
871
872
loop {
872
-
interval.tick().await;
873
-
let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed);
874
-
info!(
875
-
"Jetstream consumer status: {} total events processed",
876
-
count
877
-
);
873
+
tokio::select! {
874
+
_ = interval.tick() => {
875
+
let count = event_count_for_status.load(std::sync::atomic::Ordering::Relaxed);
876
+
info!(
877
+
"Jetstream consumer status: {} total events processed",
878
+
count
879
+
);
880
+
}
881
+
_ = cancellation_token_for_status.cancelled() => {
882
+
info!("Status reporting task cancelled");
883
+
break;
884
+
}
885
+
}
878
886
}
879
887
});
880
888