+11
-7
ufos/src/main.rs
+11
-7
ufos/src/main.rs
···
80
80
cursor: Option<Cursor>,
81
81
sketch_secret: SketchSecretPrefix,
82
82
) -> anyhow::Result<()> {
83
-
let mut tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
83
+
let mut whatever_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
84
+
let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new();
84
85
85
86
println!("starting server with storage...");
86
87
let serving = server::serve(read_store.clone());
87
-
tasks.spawn(async move {
88
+
whatever_tasks.spawn(async move {
88
89
serving.await.map_err(|e| {
89
90
log::warn!("server ended: {e}");
90
91
anyhow::anyhow!(e)
···
93
94
94
95
if args.pause_writer {
95
96
log::info!("not starting jetstream or the write loop.");
96
-
for t in tasks.join_all().await {
97
+
for t in whatever_tasks.join_all().await {
97
98
if let Err(e) = t {
98
99
return Err(anyhow::anyhow!(e));
99
100
}
···
115
116
let rolling = write_store
116
117
.background_tasks(args.reroll)?
117
118
.run(args.backfill);
118
-
tasks.spawn(async move {
119
+
consumer_tasks.spawn(async move {
119
120
rolling
120
121
.await
121
122
.inspect_err(|e| log::warn!("rollup ended: {e}"))?;
122
123
Ok(())
123
124
});
124
125
125
-
tasks.spawn(async move {
126
+
consumer_tasks.spawn(async move {
126
127
write_store
127
128
.receive_batches(batches)
128
129
.await
···
130
131
Ok(())
131
132
});
132
133
133
-
tasks.spawn(async move {
134
+
whatever_tasks.spawn(async move {
134
135
do_update_stuff(read_store).await;
135
136
log::warn!("status task ended");
136
137
Ok(())
···
138
139
139
140
install_metrics_server()?;
140
141
141
-
for (i, t) in tasks.join_all().await.iter().enumerate() {
142
+
for (i, t) in consumer_tasks.join_all().await.iter().enumerate() {
142
143
log::warn!("task {i} done: {t:?}");
143
144
}
145
+
146
+
println!("consumer tasks all completed, killing the others");
147
+
whatever_tasks.shutdown().await;
144
148
145
149
println!("bye!");
146
150