High-performance implementation of plcbundle written in Rust
at main 207 lines 7.1 kB view raw
1//! Graceful shutdown coordination for server and background tasks, with unified shutdown future and fatal-error handling 2// Runtime module - shutdown coordination for server and background tasks 3use std::future::Future; 4use std::sync::Arc; 5use std::sync::atomic::{AtomicBool, Ordering}; 6use tokio::signal; 7use tokio::sync::watch; 8use tokio::task::JoinSet; 9 10/// Lightweight coordination for bundle operations shutdown and background tasks 11/// Used by both server and sync continuous mode for graceful shutdown handling 12#[derive(Clone)] 13pub struct BundleRuntime { 14 shutdown_tx: watch::Sender<bool>, 15 shutdown_rx: watch::Receiver<bool>, 16 fatal_error: Arc<AtomicBool>, 17} 18 19impl BundleRuntime { 20 /// Create a new bundle runtime coordinator 21 pub fn new() -> Self { 22 let (shutdown_tx, shutdown_rx) = watch::channel(false); 23 Self { 24 shutdown_tx, 25 shutdown_rx, 26 fatal_error: Arc::new(AtomicBool::new(false)), 27 } 28 } 29 30 /// Get a receiver to watch for shutdown signals 31 /// Clone this and pass it to background tasks 32 pub fn shutdown_signal(&self) -> watch::Receiver<bool> { 33 self.shutdown_rx.clone() 34 } 35 36 /// Get a sender to trigger shutdown programmatically 37 /// Use this when you need to pass the sender to other components 38 pub fn shutdown_sender(&self) -> watch::Sender<bool> { 39 self.shutdown_tx.clone() 40 } 41 42 /// Trigger a programmatic shutdown 43 /// Call this from background tasks when they encounter fatal errors 44 pub fn trigger_shutdown(&self) { 45 let _ = self.shutdown_tx.send(true); 46 } 47 48 /// Trigger shutdown due to a fatal error 49 /// This marks the shutdown as fatal, which will cause tasks to be aborted immediately 50 pub fn trigger_fatal_shutdown(&self) { 51 self.fatal_error.store(true, Ordering::Relaxed); 52 self.trigger_shutdown(); 53 } 54 55 /// Check if shutdown was triggered by a fatal error 56 pub fn is_fatal_shutdown(&self) -> bool { 57 self.fatal_error.load(Ordering::Relaxed) 58 } 59 60 /// Create a unified shutdown future that responds to both Ctrl+C and programmatic shutdown 61 /// Use this with axum's `with_graceful_shutdown()` 62 pub fn create_shutdown_future(&self) -> impl Future<Output = ()> + Send + 'static { 63 let mut shutdown_rx = self.shutdown_rx.clone(); 64 65 async move { 66 tokio::select! { 67 _ = signal::ctrl_c() => { 68 eprintln!("\nShutdown signal (Ctrl+C) received..."); 69 } 70 _ = shutdown_rx.changed() => { 71 if *shutdown_rx.borrow() { 72 eprintln!("\nShutdown triggered by background task..."); 73 } 74 } 75 } 76 } 77 } 78 79 /// Common shutdown cleanup handler for server and sync commands 80 /// 81 /// This method handles the common pattern of: 82 /// 1. Triggering shutdown (if not already triggered) 83 /// 2. Aborting resolver tasks immediately (if any) 84 /// 3. Handling background tasks based on shutdown type (fatal vs normal) 85 /// 4. Printing completion message 86 /// 87 /// # Arguments 88 /// * `service_name` - Name of the service (e.g., "Server", "Sync") for messages 89 /// * `resolver_tasks` - Optional resolver tasks to abort immediately 90 /// * `background_tasks` - Optional background tasks to wait for or abort 91 pub async fn wait_for_shutdown_cleanup<T: 'static>( 92 &self, 93 service_name: &str, 94 resolver_tasks: Option<&mut JoinSet<T>>, 95 background_tasks: Option<&mut JoinSet<T>>, 96 ) { 97 // Ensure every background task sees the shutdown flag 98 self.trigger_shutdown(); 99 100 // Always abort resolver tasks immediately - they're just keep-alive pings 101 if let Some(resolver_tasks) = resolver_tasks 102 && !resolver_tasks.is_empty() 103 { 104 resolver_tasks.abort_all(); 105 while let Some(result) = resolver_tasks.join_next().await { 106 if let Err(e) = result 107 && !e.is_cancelled() 108 { 109 eprintln!("Resolver task error: {}", e); 110 } 111 } 112 } 113 114 // If shutdown was triggered by a fatal error, abort all tasks immediately 115 // Otherwise, wait for them to finish gracefully 116 if let Some(background_tasks) = background_tasks { 117 if self.is_fatal_shutdown() { 118 eprintln!("\nFatal error detected - aborting background tasks..."); 119 background_tasks.abort_all(); 120 // Wait briefly for aborted tasks to finish 121 while let Some(result) = background_tasks.join_next().await { 122 if let Err(e) = result 123 && !e.is_cancelled() 124 { 125 eprintln!("Background task error: {}", e); 126 } 127 } 128 } else { 129 // Normal shutdown - wait for tasks to finish gracefully 130 if !background_tasks.is_empty() { 131 eprintln!("\nWaiting for background tasks to finish..."); 132 while let Some(result) = background_tasks.join_next().await { 133 if let Err(e) = result { 134 eprintln!("Background task error: {}", e); 135 } 136 } 137 } 138 } 139 } 140 141 eprintln!("{} stopped", service_name); 142 } 143} 144 145impl Default for BundleRuntime { 146 fn default() -> Self { 147 Self::new() 148 } 149} 150 151#[cfg(test)] 152mod tests { 153 use super::*; 154 use tokio::time::{Duration, sleep}; 155 156 #[tokio::test] 157 async fn test_programmatic_shutdown() { 158 let runtime = BundleRuntime::new(); 159 let mut rx = runtime.shutdown_signal(); 160 161 // Spawn task to trigger shutdown 162 let rt_clone = runtime.clone(); 163 tokio::spawn(async move { 164 sleep(Duration::from_millis(50)).await; 165 rt_clone.trigger_shutdown(); 166 }); 167 168 // Wait for shutdown signal 169 rx.changed().await.unwrap(); 170 assert!(*rx.borrow()); 171 } 172 173 #[tokio::test] 174 async fn test_shutdown_signal_cloning() { 175 let runtime = BundleRuntime::new(); 176 let mut rx1 = runtime.shutdown_signal(); 177 let mut rx2 = runtime.shutdown_signal(); 178 179 runtime.trigger_shutdown(); 180 181 // Both receivers should see the change 182 rx1.changed().await.unwrap(); 183 rx2.changed().await.unwrap(); 184 185 assert!(*rx1.borrow()); 186 assert!(*rx2.borrow()); 187 } 188 189 #[tokio::test] 190 async fn test_fatal_shutdown() { 191 let runtime = BundleRuntime::new(); 192 let mut rx = runtime.shutdown_signal(); 193 194 // Initially not a fatal shutdown 195 assert!(!runtime.is_fatal_shutdown()); 196 197 // Trigger fatal shutdown 198 runtime.trigger_fatal_shutdown(); 199 200 // Should be marked as fatal 201 assert!(runtime.is_fatal_shutdown()); 202 203 // Shutdown signal should be set 204 rx.changed().await.unwrap(); 205 assert!(*rx.borrow()); 206 } 207}