#![allow(async_fn_in_trait)] // We use native async fn in traits for Worker/WorkerFactory //! Unified worker supervision and lifecycle management //! //! This module provides a trait-based interface for background workers with: //! - Consistent initialization and shutdown //! - Automatic restart on failure with configurable policies //! - Error collection and reporting //! - Health monitoring //! //! ## Usage //! //! ```rust,ignore //! // Define a worker //! struct MyWorker { pool: Pool } //! //! #[async_trait::async_trait] //! impl Worker for MyWorker { //! fn name(&self) -> &'static str { "my_worker" } //! //! async fn run(self, stop: WatchReceiver) -> eyre::Result<()> { //! // Worker logic here //! Ok(()) //! } //! } //! //! // Spawn with supervision //! let supervisor = WorkerSupervisor::new(); //! supervisor.spawn( //! MyWorker { pool }, //! stop.clone(), //! RestartPolicy::Backoff { //! initial_delay: Duration::from_secs(1), //! max_delay: Duration::from_secs(60), //! max_retries: 5, //! }, //! ); //! ``` use eyre::Result; use std::time::Duration; use tokio::sync::watch::Receiver as WatchReceiver; use tokio::task::JoinHandle; use tokio_util::task::TaskTracker; use tracing::{error, info, warn}; /// Worker trait for background tasks /// /// All workers implement this trait to provide consistent lifecycle management. /// /// Note: Uses native async fn in traits (stable since Rust 1.75). /// The Future returned is automatically Send because the trait requires Send + 'static. pub trait Worker: Send + 'static { /// Worker name for logging and metrics /// /// Should be a static string like "cleanup_worker" or "handle_resolution" fn name(&self) -> &'static str; /// Run the worker until stopped /// /// This is the main worker loop. Workers should: /// - Poll the stop signal regularly (via select!) /// - Return Ok(()) on graceful shutdown /// - Return Err(_) only for unrecoverable errors /// /// Transient errors should be handled internally with retries. fn run(self, stop: WatchReceiver) -> impl std::future::Future> + Send where Self: Sized; } /// Factory trait for creating restartable workers /// /// Workers that support restart must implement this trait. /// The factory is cloned on each restart attempt. /// /// Note: Uses native async fn in traits. pub trait WorkerFactory: Clone + Send + 'static { /// The worker type this factory creates type Worker: Worker; /// Worker name (used for logging before worker is created) fn name(&self) -> &'static str; /// Create a new worker instance /// /// Called on initial spawn and each restart attempt. /// Should return Err only if worker creation is impossible. fn create(&self) -> impl std::future::Future> + Send; } /// Restart policy for worker supervision #[derive(Debug, Clone, Copy)] pub enum RestartPolicy { /// Never restart - worker runs once /// /// Use for: /// - One-shot initialization tasks /// - Workers that should fail-fast Never, /// Restart immediately up to max_retries times /// /// Use for: /// - Quick recovery from transient errors /// - Workers with external rate limiting Immediate { max_retries: u32 }, /// Restart with exponential backoff (1s, 2s, 4s, 8s... up to 60s) /// /// Use for: /// - Network-dependent workers /// - Workers that might overwhelm external services Backoff { max_retries: u32 }, /// Always restart, no limit /// /// Use for: /// - Critical infrastructure workers /// - Workers that must never stop Always, } impl RestartPolicy { /// Get delay for given attempt number (0-indexed) fn delay_for_attempt(&self, attempt: u32) -> Option { match self { RestartPolicy::Never => None, RestartPolicy::Immediate { max_retries } => { if attempt < *max_retries { Some(Duration::from_millis(0)) } else { None } } RestartPolicy::Backoff { max_retries } => { if attempt >= *max_retries { return None; } // Exponential: 1s, 2s, 4s, 8s, 16s, 32s, capped at 60s let delay_secs = (1_u64 << attempt).min(60); Some(Duration::from_secs(delay_secs)) } RestartPolicy::Always => Some(Duration::from_secs(1)), } } /// Check if restart is allowed for given attempt fn should_restart(&self, attempt: u32) -> bool { self.delay_for_attempt(attempt).is_some() } } /// Worker status after completion #[derive(Debug)] pub enum WorkerStatus { /// Worker completed successfully Completed { name: &'static str }, /// Worker failed and will not restart Failed { name: &'static str, error: eyre::Report, attempts: u32, }, /// Worker stopped by signal Stopped { name: &'static str }, /// Worker panicked Panicked { name: &'static str }, } /// Handle to a supervised worker pub struct WorkerHandle { name: &'static str, join_handle: JoinHandle, } impl WorkerHandle { /// Wait for worker to complete and get final status pub async fn wait(self) -> WorkerStatus { match self.join_handle.await { Ok(status) => status, Err(_) => WorkerStatus::Panicked { name: self.name }, } } /// Worker name pub fn name(&self) -> &'static str { self.name } } /// Supervisor for managing worker lifecycle with restart policies pub struct WorkerSupervisor { tracker: TaskTracker, } impl WorkerSupervisor { /// Create a new worker supervisor pub fn new() -> Self { Self { tracker: TaskTracker::new(), } } /// Spawn a one-shot worker (no restart support) /// /// Use this for workers that should never restart. /// The worker is consumed on first run. pub fn spawn_oneshot(&self, worker: W, stop: WatchReceiver) -> WorkerHandle { let name = worker.name(); let handle = self.tracker.spawn(Self::supervise_oneshot(worker, stop)); WorkerHandle { name, join_handle: handle, } } /// Spawn a restartable worker using a factory /// /// The factory will be used to create new worker instances on restart. pub fn spawn( &self, factory: F, stop: WatchReceiver, policy: RestartPolicy, ) -> WorkerHandle { let name = factory.name(); let handle = self.tracker.spawn(Self::supervise(factory, stop, policy)); WorkerHandle { name, join_handle: handle, } } /// Supervise a one-shot worker (no restart) async fn supervise_oneshot( worker: W, stop: WatchReceiver, ) -> WorkerStatus { let name = worker.name(); info!(worker = name, "Worker starting (one-shot)"); match worker.run(stop).await { Ok(()) => { info!(worker = name, "Worker completed successfully"); WorkerStatus::Completed { name } } Err(e) => { error!(worker = name, error = ?e, "Worker failed"); WorkerStatus::Failed { name, error: e, attempts: 1, } } } } /// Supervise a restartable worker with restart logic async fn supervise( factory: F, stop: WatchReceiver, policy: RestartPolicy, ) -> WorkerStatus { let name = factory.name(); let mut attempt = 0_u32; loop { // Create worker instance let worker = match factory.create().await { Ok(w) => w, Err(e) => { error!( worker = name, attempt = attempt + 1, error = ?e, "Failed to create worker" ); // If we can't create the worker, check restart policy if !policy.should_restart(attempt) { return WorkerStatus::Failed { name, error: e.wrap_err("Failed to create worker"), attempts: attempt + 1, }; } // Wait before retry if let Some(delay) = policy.delay_for_attempt(attempt) { if delay > Duration::from_millis(0) { warn!( worker = name, attempt = attempt + 1, delay_secs = delay.as_secs(), "Waiting before worker creation retry" ); tokio::time::sleep(delay).await; } } attempt += 1; continue; } }; // Run worker info!(worker = name, attempt = attempt + 1, "Worker starting"); match worker.run(stop.clone()).await { Ok(()) => { info!(worker = name, "Worker completed successfully"); return WorkerStatus::Completed { name }; } Err(e) => { error!( worker = name, attempt = attempt + 1, error = ?e, "Worker failed" ); // Check if we should restart if !policy.should_restart(attempt) { error!( worker = name, attempts = attempt + 1, "Worker will not restart (max attempts reached)" ); return WorkerStatus::Failed { name, error: e, attempts: attempt + 1, }; } // Wait before restart if let Some(delay) = policy.delay_for_attempt(attempt) { if delay > Duration::from_millis(0) { warn!( worker = name, attempt = attempt + 2, delay_secs = delay.as_secs(), "Waiting before worker restart" ); tokio::time::sleep(delay).await; } } attempt += 1; } } } } /// Wait for all workers to complete /// /// Returns status for each worker. /// Note: Currently this doesn't collect individual statuses /// because TaskTracker doesn't expose handles. Use WorkerHandle::wait() instead. pub async fn wait_all(self) -> Vec { self.tracker.close(); self.tracker.wait().await; Vec::new() } /// Close the supervisor (no new workers can be spawned) pub fn close(&self) { self.tracker.close(); } /// Wait for all workers without consuming self pub async fn wait(&self) { self.tracker.wait().await; } } impl Default for WorkerSupervisor { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; #[test] fn test_restart_policy_never() { let policy = RestartPolicy::Never; assert!(!policy.should_restart(0)); assert!(!policy.should_restart(1)); } #[test] fn test_restart_policy_immediate() { let policy = RestartPolicy::Immediate { max_retries: 3 }; assert!(policy.should_restart(0)); assert!(policy.should_restart(1)); assert!(policy.should_restart(2)); assert!(!policy.should_restart(3)); } #[test] fn test_restart_policy_backoff() { let policy = RestartPolicy::Backoff { max_retries: 5 }; // Delays should be: 1s, 2s, 4s, 8s, 16s assert_eq!( policy.delay_for_attempt(0), Some(Duration::from_secs(1)) ); assert_eq!( policy.delay_for_attempt(1), Some(Duration::from_secs(2)) ); assert_eq!( policy.delay_for_attempt(2), Some(Duration::from_secs(4)) ); assert_eq!( policy.delay_for_attempt(3), Some(Duration::from_secs(8)) ); assert_eq!( policy.delay_for_attempt(4), Some(Duration::from_secs(16)) ); assert_eq!(policy.delay_for_attempt(5), None); // max_retries } #[test] fn test_restart_policy_backoff_capped() { let policy = RestartPolicy::Backoff { max_retries: 10 }; // 1s, 2s, 4s, 8s, 16s, 32s, then capped at 60s assert_eq!( policy.delay_for_attempt(5), Some(Duration::from_secs(32)) ); assert_eq!( policy.delay_for_attempt(6), Some(Duration::from_secs(60)) ); assert_eq!( policy.delay_for_attempt(7), Some(Duration::from_secs(60)) ); } #[test] fn test_restart_policy_always() { let policy = RestartPolicy::Always; assert!(policy.should_restart(0)); assert!(policy.should_restart(100)); assert!(policy.should_restart(1000)); } }