+37
-10
ufos/src/main.rs
+37
-10
ufos/src/main.rs
···
4
4
use metrics_exporter_prometheus::PrometheusBuilder;
5
5
use std::path::PathBuf;
6
6
use std::time::{Duration, SystemTime};
7
+
use tokio::task::JoinSet;
7
8
use ufos::consumer;
8
9
use ufos::file_consumer;
9
10
use ufos::server;
···
72
73
Ok(())
73
74
}
74
75
75
-
async fn go<B: StoreBackground>(
76
+
async fn go<B: StoreBackground + 'static>(
76
77
args: Args,
77
78
read_store: impl StoreReader + 'static + Clone,
78
79
mut write_store: impl StoreWriter<B> + 'static,
79
80
cursor: Option<Cursor>,
80
81
sketch_secret: SketchSecretPrefix,
81
82
) -> anyhow::Result<()> {
83
+
let mut tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
84
+
82
85
println!("starting server with storage...");
83
86
let serving = server::serve(read_store.clone());
87
+
tasks.spawn(async move {
88
+
serving.await.map_err(|e| {
89
+
log::warn!("server ended: {e}");
90
+
anyhow::anyhow!(e)
91
+
})
92
+
});
84
93
85
94
if args.pause_writer {
86
95
log::info!("not starting jetstream or the write loop.");
87
-
serving.await.map_err(|e| anyhow::anyhow!(e))?;
96
+
for t in tasks.join_all().await {
97
+
if let Err(e) = t {
98
+
return Err(anyhow::anyhow!(e));
99
+
}
100
+
}
88
101
return Ok(());
89
102
}
90
103
···
102
115
let rolling = write_store
103
116
.background_tasks(args.reroll)?
104
117
.run(args.backfill);
105
-
let consuming = write_store.receive_batches(batches);
118
+
tasks.spawn(async move {
119
+
rolling
120
+
.await
121
+
.inspect_err(|e| log::warn!("rollup ended: {e}"))?;
122
+
Ok(())
123
+
});
124
+
125
+
tasks.spawn(async move {
126
+
write_store
127
+
.receive_batches(batches)
128
+
.await
129
+
.inspect_err(|e| log::warn!("consumer ended: {e}"))?;
130
+
Ok(())
131
+
});
106
132
107
-
let stating = do_update_stuff(read_store);
133
+
tasks.spawn(async move {
134
+
do_update_stuff(read_store).await;
135
+
log::warn!("status task ended");
136
+
Ok(())
137
+
});
108
138
109
139
install_metrics_server()?;
110
140
111
-
tokio::select! {
112
-
z = serving => log::warn!("serve task ended: {z:?}"),
113
-
z = rolling => log::warn!("rollup task ended: {z:?}"),
114
-
z = consuming => log::warn!("consuming task ended: {z:?}"),
115
-
z = stating => log::warn!("status task ended: {z:?}"),
116
-
};
141
+
for (i, t) in tasks.join_all().await.iter().enumerate() {
142
+
log::warn!("task {i} done: {t:?}");
143
+
}
117
144
118
145
println!("bye!");
119
146