+8
slingshot/src/error.rs
+8
slingshot/src/error.rs
···
47
}
48
49
#[derive(Debug, Error)]
50
pub enum MainTaskError {
51
#[error(transparent)]
52
ConsumerTaskError(#[from] ConsumerError),
···
54
ServerTaskError(#[from] ServerError),
55
#[error(transparent)]
56
IdentityTaskError(#[from] IdentityError),
57
#[error("firehose cache failed to close: {0}")]
58
FirehoseCacheCloseError(foyer::Error),
59
}
···
47
}
48
49
#[derive(Debug, Error)]
50
+
pub enum HealthCheckError {
51
+
#[error("failed to send checkin: {0}")]
52
+
HealthCheckError(#[from] reqwest::Error),
53
+
}
54
+
55
+
#[derive(Debug, Error)]
56
pub enum MainTaskError {
57
#[error(transparent)]
58
ConsumerTaskError(#[from] ConsumerError),
···
60
ServerTaskError(#[from] ServerError),
61
#[error(transparent)]
62
IdentityTaskError(#[from] IdentityError),
63
+
#[error(transparent)]
64
+
HealthCheckError(#[from] HealthCheckError),
65
#[error("firehose cache failed to close: {0}")]
66
FirehoseCacheCloseError(foyer::Error),
67
}
+32
slingshot/src/healthcheck.rs
+32
slingshot/src/healthcheck.rs
···
···
1
+
use crate::error::HealthCheckError;
2
+
use reqwest::Client;
3
+
use std::time::Duration;
4
+
use tokio::time::sleep;
5
+
use tokio_util::sync::CancellationToken;
6
+
7
+
pub async fn healthcheck(
8
+
endpoint: String,
9
+
shutdown: CancellationToken,
10
+
) -> Result<(), HealthCheckError> {
11
+
let client = Client::builder()
12
+
.user_agent(format!(
13
+
"microcosm slingshot v{} (dev: @bad-example.com)",
14
+
env!("CARGO_PKG_VERSION")
15
+
))
16
+
.no_proxy()
17
+
.timeout(Duration::from_secs(10))
18
+
.build()?;
19
+
20
+
loop {
21
+
tokio::select! {
22
+
res = client.get(&endpoint).send() => {
23
+
let _ = res
24
+
.and_then(|r| r.error_for_status())
25
+
.inspect_err(|e| log::error!("failed to send healthcheck: {e}"));
26
+
},
27
+
_ = shutdown.cancelled() => break,
28
+
}
29
+
sleep(Duration::from_secs(51)).await;
30
+
}
31
+
Ok(())
32
+
}
+2
slingshot/src/lib.rs
+2
slingshot/src/lib.rs
···
1
mod consumer;
2
pub mod error;
3
mod firehose_cache;
4
+
mod healthcheck;
5
mod identity;
6
mod record;
7
mod server;
8
9
pub use consumer::consume;
10
pub use firehose_cache::firehose_cache;
11
+
pub use healthcheck::healthcheck;
12
pub use identity::Identity;
13
pub use record::{CachedRecord, ErrorResponseObject, Repo};
14
pub use server::serve;
+14
-1
slingshot/src/main.rs
+14
-1
slingshot/src/main.rs
···
1
// use foyer::HybridCache;
2
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
-
use slingshot::{Identity, Repo, consume, error::MainTaskError, firehose_cache, serve};
5
use std::path::PathBuf;
6
7
use clap::Parser;
···
44
/// recommended in production, but mind the file permissions.
45
#[arg(long)]
46
certs: Option<PathBuf>,
47
}
48
49
#[tokio::main]
···
126
.await?;
127
Ok(())
128
});
129
130
tokio::select! {
131
_ = shutdown.cancelled() => log::warn!("shutdown requested"),
···
1
// use foyer::HybridCache;
2
// use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder};
3
use metrics_exporter_prometheus::PrometheusBuilder;
4
+
use slingshot::{
5
+
Identity, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve,
6
+
};
7
use std::path::PathBuf;
8
9
use clap::Parser;
···
46
/// recommended in production, but mind the file permissions.
47
#[arg(long)]
48
certs: Option<PathBuf>,
49
+
/// an web address to send healtcheck pings to every ~51s or so
50
+
#[arg(long)]
51
+
healthcheck: Option<String>,
52
}
53
54
#[tokio::main]
···
131
.await?;
132
Ok(())
133
});
134
+
135
+
if let Some(hc) = args.healthcheck {
136
+
let healthcheck_shutdown = shutdown.clone();
137
+
tasks.spawn(async move {
138
+
healthcheck(hc, healthcheck_shutdown).await?;
139
+
Ok(())
140
+
});
141
+
}
142
143
tokio::select! {
144
_ = shutdown.cancelled() => log::warn!("shutdown requested"),
+2
-1
slingshot/src/server.rs
+2
-1
slingshot/src/server.rs
···
19
Listener, TcpListener,
20
acme::{AutoCert, LETS_ENCRYPT_PRODUCTION},
21
},
22
-
middleware::{Cors, Tracing},
23
};
24
use poem_openapi::{
25
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
···
758
.allow_methods([Method::GET])
759
.allow_credentials(false),
760
)
761
.with(Tracing);
762
Server::new(listener)
763
.name("slingshot")
···
19
Listener, TcpListener,
20
acme::{AutoCert, LETS_ENCRYPT_PRODUCTION},
21
},
22
+
middleware::{CatchPanic, Cors, Tracing},
23
};
24
use poem_openapi::{
25
ApiResponse, ContactObject, ExternalDocumentObject, Object, OpenApi, OpenApiService, Tags,
···
758
.allow_methods([Method::GET])
759
.allow_credentials(false),
760
)
761
+
.with(CatchPanic::new())
762
.with(Tracing);
763
Server::new(listener)
764
.name("slingshot")