High-performance implementation of plcbundle written in Rust
1//! Graceful shutdown coordination for server and background tasks, with unified shutdown future and fatal-error handling
2// Runtime module - shutdown coordination for server and background tasks
3use std::future::Future;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use tokio::signal;
7use tokio::sync::watch;
8use tokio::task::JoinSet;
9
10/// Lightweight coordination for bundle operations shutdown and background tasks
11/// Used by both server and sync continuous mode for graceful shutdown handling
12#[derive(Clone)]
13pub struct BundleRuntime {
14 shutdown_tx: watch::Sender<bool>,
15 shutdown_rx: watch::Receiver<bool>,
16 fatal_error: Arc<AtomicBool>,
17}
18
19impl BundleRuntime {
20 /// Create a new bundle runtime coordinator
21 pub fn new() -> Self {
22 let (shutdown_tx, shutdown_rx) = watch::channel(false);
23 Self {
24 shutdown_tx,
25 shutdown_rx,
26 fatal_error: Arc::new(AtomicBool::new(false)),
27 }
28 }
29
30 /// Get a receiver to watch for shutdown signals
31 /// Clone this and pass it to background tasks
32 pub fn shutdown_signal(&self) -> watch::Receiver<bool> {
33 self.shutdown_rx.clone()
34 }
35
36 /// Get a sender to trigger shutdown programmatically
37 /// Use this when you need to pass the sender to other components
38 pub fn shutdown_sender(&self) -> watch::Sender<bool> {
39 self.shutdown_tx.clone()
40 }
41
42 /// Trigger a programmatic shutdown
43 /// Call this from background tasks when they encounter fatal errors
44 pub fn trigger_shutdown(&self) {
45 let _ = self.shutdown_tx.send(true);
46 }
47
48 /// Trigger shutdown due to a fatal error
49 /// This marks the shutdown as fatal, which will cause tasks to be aborted immediately
50 pub fn trigger_fatal_shutdown(&self) {
51 self.fatal_error.store(true, Ordering::Relaxed);
52 self.trigger_shutdown();
53 }
54
55 /// Check if shutdown was triggered by a fatal error
56 pub fn is_fatal_shutdown(&self) -> bool {
57 self.fatal_error.load(Ordering::Relaxed)
58 }
59
60 /// Create a unified shutdown future that responds to both Ctrl+C and programmatic shutdown
61 /// Use this with axum's `with_graceful_shutdown()`
62 pub fn create_shutdown_future(&self) -> impl Future<Output = ()> + Send + 'static {
63 let mut shutdown_rx = self.shutdown_rx.clone();
64
65 async move {
66 tokio::select! {
67 _ = signal::ctrl_c() => {
68 eprintln!("\nShutdown signal (Ctrl+C) received...");
69 }
70 _ = shutdown_rx.changed() => {
71 if *shutdown_rx.borrow() {
72 eprintln!("\nShutdown triggered by background task...");
73 }
74 }
75 }
76 }
77 }
78
79 /// Common shutdown cleanup handler for server and sync commands
80 ///
81 /// This method handles the common pattern of:
82 /// 1. Triggering shutdown (if not already triggered)
83 /// 2. Aborting resolver tasks immediately (if any)
84 /// 3. Handling background tasks based on shutdown type (fatal vs normal)
85 /// 4. Printing completion message
86 ///
87 /// # Arguments
88 /// * `service_name` - Name of the service (e.g., "Server", "Sync") for messages
89 /// * `resolver_tasks` - Optional resolver tasks to abort immediately
90 /// * `background_tasks` - Optional background tasks to wait for or abort
91 pub async fn wait_for_shutdown_cleanup<T: 'static>(
92 &self,
93 service_name: &str,
94 resolver_tasks: Option<&mut JoinSet<T>>,
95 background_tasks: Option<&mut JoinSet<T>>,
96 ) {
97 // Ensure every background task sees the shutdown flag
98 self.trigger_shutdown();
99
100 // Always abort resolver tasks immediately - they're just keep-alive pings
101 if let Some(resolver_tasks) = resolver_tasks
102 && !resolver_tasks.is_empty()
103 {
104 resolver_tasks.abort_all();
105 while let Some(result) = resolver_tasks.join_next().await {
106 if let Err(e) = result
107 && !e.is_cancelled()
108 {
109 eprintln!("Resolver task error: {}", e);
110 }
111 }
112 }
113
114 // If shutdown was triggered by a fatal error, abort all tasks immediately
115 // Otherwise, wait for them to finish gracefully
116 if let Some(background_tasks) = background_tasks {
117 if self.is_fatal_shutdown() {
118 eprintln!("\nFatal error detected - aborting background tasks...");
119 background_tasks.abort_all();
120 // Wait briefly for aborted tasks to finish
121 while let Some(result) = background_tasks.join_next().await {
122 if let Err(e) = result
123 && !e.is_cancelled()
124 {
125 eprintln!("Background task error: {}", e);
126 }
127 }
128 } else {
129 // Normal shutdown - wait for tasks to finish gracefully
130 if !background_tasks.is_empty() {
131 eprintln!("\nWaiting for background tasks to finish...");
132 while let Some(result) = background_tasks.join_next().await {
133 if let Err(e) = result {
134 eprintln!("Background task error: {}", e);
135 }
136 }
137 }
138 }
139 }
140
141 eprintln!("{} stopped", service_name);
142 }
143}
144
145impl Default for BundleRuntime {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use tokio::time::{Duration, sleep};
155
156 #[tokio::test]
157 async fn test_programmatic_shutdown() {
158 let runtime = BundleRuntime::new();
159 let mut rx = runtime.shutdown_signal();
160
161 // Spawn task to trigger shutdown
162 let rt_clone = runtime.clone();
163 tokio::spawn(async move {
164 sleep(Duration::from_millis(50)).await;
165 rt_clone.trigger_shutdown();
166 });
167
168 // Wait for shutdown signal
169 rx.changed().await.unwrap();
170 assert!(*rx.borrow());
171 }
172
173 #[tokio::test]
174 async fn test_shutdown_signal_cloning() {
175 let runtime = BundleRuntime::new();
176 let mut rx1 = runtime.shutdown_signal();
177 let mut rx2 = runtime.shutdown_signal();
178
179 runtime.trigger_shutdown();
180
181 // Both receivers should see the change
182 rx1.changed().await.unwrap();
183 rx2.changed().await.unwrap();
184
185 assert!(*rx1.borrow());
186 assert!(*rx2.borrow());
187 }
188
189 #[tokio::test]
190 async fn test_fatal_shutdown() {
191 let runtime = BundleRuntime::new();
192 let mut rx = runtime.shutdown_signal();
193
194 // Initially not a fatal shutdown
195 assert!(!runtime.is_fatal_shutdown());
196
197 // Trigger fatal shutdown
198 runtime.trigger_fatal_shutdown();
199
200 // Should be marked as fatal
201 assert!(runtime.is_fatal_shutdown());
202
203 // Shutdown signal should be set
204 rx.changed().await.unwrap();
205 assert!(*rx.borrow());
206 }
207}