1mod shutdown;
2
3use core::time::Duration;
4
5use futures_concurrency::future::TryJoin;
6use nailstate::ServerState;
7use tokio_util::sync::CancellationToken;
8
9async fn worker_task<Fut, F>(
10 worker: usize,
11 app: ServerState,
12 shutdown: CancellationToken,
13 main_fn: &F,
14) -> color_eyre::Result<()>
15where
16 Fut: Future<Output = color_eyre::Result<()>>,
17 F: Fn(ServerState, CancellationToken) -> Fut + Sync + Send,
18{
19 while let Err(e) = main_fn(app.clone(), shutdown.clone()).await {
20 tracing::error!(error = %e, worker = worker, "Failed to start");
21 // Wait a moment before trying again
22 tokio::time::sleep(Duration::from_secs(1)).await;
23 tracing::info!(worker = worker, "Restarting Worker...");
24 }
25
26 Ok(())
27}
28
29pub fn start<Fut, F>(app: ServerState, main_fn: F) -> color_eyre::Result<()>
30where
31 Fut: Future<Output = color_eyre::Result<()>>,
32 F: Fn(ServerState, CancellationToken) -> Fut + Sync + Send,
33{
34 let workers = std::thread::available_parallelism()?.min(app.config.server.worker_threads);
35
36 let shutdown_notifier = CancellationToken::new();
37
38 let core_ids = core_affinity::get_core_ids().unwrap_or_default();
39
40 let mut id: usize = 0;
41
42 core_ids
43 .get({
44 let pin = id;
45 id += 1;
46 pin
47 })
48 .copied()
49 .map(core_affinity::set_for_current);
50
51 // Main worker MUST start, else we just error out.
52 let rt = tokio::runtime::Builder::new_current_thread()
53 .enable_all()
54 .build()?;
55
56 std::thread::scope(|s| {
57 for num in 1..workers.get() {
58 let cloned = &main_fn;
59 let app = &app;
60 let shutdown_notifier = &shutdown_notifier;
61 let core_id = core_ids
62 .get({
63 let pin = id;
64 id += 1;
65 pin
66 })
67 .copied();
68
69 // If any worker threads fail to be created, the program will terminate. If the
70 // runtime within the worker thread fails to be created, this will also terminate the
71 // program, and the error will get logged.
72 std::thread::Builder::new()
73 .name(format!("worker {num}"))
74 .spawn_scoped(s, move || {
75 core_id.map(core_affinity::set_for_current);
76
77 match tokio::runtime::Builder::new_current_thread()
78 .enable_all()
79 .build()
80 {
81 Ok(rt) => {
82 rt.block_on(worker_task(
83 num,
84 app.clone(),
85 shutdown_notifier.clone(),
86 cloned,
87 ))
88 .ok();
89
90 rt.shutdown_timeout(Duration::from_secs(60));
91 }
92 Err(e) => {
93 tracing::error!(error = %e, worker = num, "Failed to start");
94 shutdown_notifier.cancel();
95 }
96 }
97 })
98 .inspect_err(|_| shutdown_notifier.cancel())?;
99 }
100
101 rt.block_on(async {
102 let guard = nailotel::init_telemetry(app.config.clone_inner())?;
103
104 (
105 worker_task(0, app.clone(), shutdown_notifier.clone(), &main_fn),
106 crate::shutdown::shutdown_task(shutdown_notifier.clone()),
107 )
108 .try_join()
109 .await?;
110
111 tracing::info!("Waiting for background tasks to complete...");
112
113 rt.spawn_blocking(|| drop(guard));
114
115 color_eyre::eyre::Ok(())
116 })
117 })?;
118
119 rt.shutdown_timeout(Duration::from_secs(60));
120
121 Ok(())
122}