A pit full of rusty nails
at main 122 lines 3.9 kB view raw
1mod shutdown; 2 3use core::time::Duration; 4 5use futures_concurrency::future::TryJoin; 6use nailstate::ServerState; 7use tokio_util::sync::CancellationToken; 8 9async fn worker_task<Fut, F>( 10 worker: usize, 11 app: ServerState, 12 shutdown: CancellationToken, 13 main_fn: &F, 14) -> color_eyre::Result<()> 15where 16 Fut: Future<Output = color_eyre::Result<()>>, 17 F: Fn(ServerState, CancellationToken) -> Fut + Sync + Send, 18{ 19 while let Err(e) = main_fn(app.clone(), shutdown.clone()).await { 20 tracing::error!(error = %e, worker = worker, "Failed to start"); 21 // Wait a moment before trying again 22 tokio::time::sleep(Duration::from_secs(1)).await; 23 tracing::info!(worker = worker, "Restarting Worker..."); 24 } 25 26 Ok(()) 27} 28 29pub fn start<Fut, F>(app: ServerState, main_fn: F) -> color_eyre::Result<()> 30where 31 Fut: Future<Output = color_eyre::Result<()>>, 32 F: Fn(ServerState, CancellationToken) -> Fut + Sync + Send, 33{ 34 let workers = std::thread::available_parallelism()?.min(app.config.server.worker_threads); 35 36 let shutdown_notifier = CancellationToken::new(); 37 38 let core_ids = core_affinity::get_core_ids().unwrap_or_default(); 39 40 let mut id: usize = 0; 41 42 core_ids 43 .get({ 44 let pin = id; 45 id += 1; 46 pin 47 }) 48 .copied() 49 .map(core_affinity::set_for_current); 50 51 // Main worker MUST start, else we just error out. 52 let rt = tokio::runtime::Builder::new_current_thread() 53 .enable_all() 54 .build()?; 55 56 std::thread::scope(|s| { 57 for num in 1..workers.get() { 58 let cloned = &main_fn; 59 let app = &app; 60 let shutdown_notifier = &shutdown_notifier; 61 let core_id = core_ids 62 .get({ 63 let pin = id; 64 id += 1; 65 pin 66 }) 67 .copied(); 68 69 // If any worker threads fail to be created, the program will terminate. If the 70 // runtime within the worker thread fails to be created, this will also terminate the 71 // program, and the error will get logged. 72 std::thread::Builder::new() 73 .name(format!("worker {num}")) 74 .spawn_scoped(s, move || { 75 core_id.map(core_affinity::set_for_current); 76 77 match tokio::runtime::Builder::new_current_thread() 78 .enable_all() 79 .build() 80 { 81 Ok(rt) => { 82 rt.block_on(worker_task( 83 num, 84 app.clone(), 85 shutdown_notifier.clone(), 86 cloned, 87 )) 88 .ok(); 89 90 rt.shutdown_timeout(Duration::from_secs(60)); 91 } 92 Err(e) => { 93 tracing::error!(error = %e, worker = num, "Failed to start"); 94 shutdown_notifier.cancel(); 95 } 96 } 97 }) 98 .inspect_err(|_| shutdown_notifier.cancel())?; 99 } 100 101 rt.block_on(async { 102 let guard = nailotel::init_telemetry(app.config.clone_inner())?; 103 104 ( 105 worker_task(0, app.clone(), shutdown_notifier.clone(), &main_fn), 106 crate::shutdown::shutdown_task(shutdown_notifier.clone()), 107 ) 108 .try_join() 109 .await?; 110 111 tracing::info!("Waiting for background tasks to complete..."); 112 113 rt.spawn_blocking(|| drop(guard)); 114 115 color_eyre::eyre::Ok(()) 116 }) 117 })?; 118 119 rt.shutdown_timeout(Duration::from_secs(60)); 120 121 Ok(()) 122}