+37
-13
constellation/src/bin/main.rs
+37
-13
constellation/src/bin/main.rs
···
1
use anyhow::{bail, Result};
2
use clap::{Parser, ValueEnum};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
use std::num::NonZero;
5
use std::path::PathBuf;
6
use std::sync::{atomic::AtomicU32, Arc};
···
21
#[derive(Parser, Debug)]
22
#[command(version, about, long_about = None)]
23
struct Args {
24
-
#[arg(short, long)]
25
/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
26
/// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
27
#[arg(short, long)]
···
77
78
let stream = jetstream_url(&args.jetstream);
79
println!("using jetstream server {stream:?}...",);
80
81
let stay_alive = CancellationToken::new();
82
83
match args.backend {
84
-
StorageBackend::Memory => run(MemStorage::new(), fixture, None, stream, stay_alive),
85
#[cfg(feature = "rocks")]
86
StorageBackend::Rocks => {
87
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
···
96
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
97
}
98
println!("rocks ready.");
99
-
run(rocks, fixture, args.data, stream, stay_alive)
100
}
101
}
102
}
···
106
fixture: Option<PathBuf>,
107
data_dir: Option<PathBuf>,
108
stream: String,
109
stay_alive: CancellationToken,
110
) -> Result<()> {
111
ctrlc::set_handler({
···
150
.build()
151
.expect("axum startup")
152
.block_on(async {
153
-
install_metrics_server()?;
154
-
serve(readable, "0.0.0.0:6789", staying_alive).await
155
})
156
.unwrap();
157
stay_alive.drop_guard();
···
218
Ok(())
219
}
220
221
-
fn install_metrics_server() -> Result<()> {
222
println!("installing metrics server...");
223
-
let host = [0, 0, 0, 0];
224
-
let port = 8765;
225
PrometheusBuilder::new()
226
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
227
.set_bucket_duration(time::Duration::from_secs(30))?
228
.set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
229
.set_enable_unit_suffix(true)
230
-
.with_http_listener((host, port))
231
.install()?;
232
-
println!(
233
-
"metrics server installed! listening on http://{}.{}.{}.{}:{port}",
234
-
host[0], host[1], host[2], host[3]
235
-
);
236
Ok(())
237
}
238
···
1
use anyhow::{bail, Result};
2
use clap::{Parser, ValueEnum};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
+
use std::net::SocketAddr;
5
use std::num::NonZero;
6
use std::path::PathBuf;
7
use std::sync::{atomic::AtomicU32, Arc};
···
22
#[derive(Parser, Debug)]
23
#[command(version, about, long_about = None)]
24
struct Args {
25
+
/// constellation server's listen address
26
+
#[arg(long)]
27
+
#[clap(default_value = "0.0.0.0:6789")]
28
+
bind: SocketAddr,
29
+
/// metrics server's listen address
30
+
#[arg(long)]
31
+
#[clap(default_value = "0.0.0.0:8765")]
32
+
bind_metrics: SocketAddr,
33
/// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
34
/// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
35
#[arg(short, long)]
···
85
86
let stream = jetstream_url(&args.jetstream);
87
println!("using jetstream server {stream:?}...",);
88
+
89
+
let bind = args.bind;
90
+
let metrics_bind = args.bind_metrics;
91
92
let stay_alive = CancellationToken::new();
93
94
match args.backend {
95
+
StorageBackend::Memory => run(
96
+
MemStorage::new(),
97
+
fixture,
98
+
None,
99
+
stream,
100
+
bind,
101
+
metrics_bind,
102
+
stay_alive,
103
+
),
104
#[cfg(feature = "rocks")]
105
StorageBackend::Rocks => {
106
let storage_dir = args.data.clone().unwrap_or("rocks.test".into());
···
115
rocks.start_backup(backup_dir, auto_backup, stay_alive.clone())?;
116
}
117
println!("rocks ready.");
118
+
run(
119
+
rocks,
120
+
fixture,
121
+
args.data,
122
+
stream,
123
+
bind,
124
+
metrics_bind,
125
+
stay_alive,
126
+
)
127
}
128
}
129
}
···
133
fixture: Option<PathBuf>,
134
data_dir: Option<PathBuf>,
135
stream: String,
136
+
bind: SocketAddr,
137
+
metrics_bind: SocketAddr,
138
stay_alive: CancellationToken,
139
) -> Result<()> {
140
ctrlc::set_handler({
···
179
.build()
180
.expect("axum startup")
181
.block_on(async {
182
+
install_metrics_server(metrics_bind)?;
183
+
serve(readable, bind, staying_alive).await
184
})
185
.unwrap();
186
stay_alive.drop_guard();
···
247
Ok(())
248
}
249
250
+
fn install_metrics_server(metrics_bind: SocketAddr) -> Result<()> {
251
println!("installing metrics server...");
252
PrometheusBuilder::new()
253
.set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
254
.set_bucket_duration(time::Duration::from_secs(30))?
255
.set_bucket_count(NonZero::new(10).unwrap()) // count * duration = 5 mins. stuff doesn't happen that fast here.
256
.set_enable_unit_suffix(true)
257
+
.with_http_listener(metrics_bind)
258
.install()?;
259
+
println!("metrics server installed! listening at {metrics_bind:?}");
260
Ok(())
261
}
262