···2name = "allegedly"
3description = "public ledger server tools and services (for the PLC)"
4license = "MIT OR Apache-2.0"
5-version = "0.2.1"
6edition = "2024"
7default-run = "allegedly"
8···16http-body-util = "0.1.3"
17log = "0.4.28"
18native-tls = "0.2.14"
00019poem = { version = "3.1.12", features = ["acme", "compression"] }
20postgres-native-tls = "0.5.1"
21-reqwest = { version = "0.12.23", features = ["stream", "json"] }
22reqwest-middleware = "0.4.2"
23reqwest-retry = "0.7.0"
24rustls = "0.23.32"
···29tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
30tokio-stream = { version = "0.1.17", features = ["io-util"] }
31tokio-util = { version = "0.7.16", features = ["compat"] }
0032tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
···2name = "allegedly"
3description = "public ledger server tools and services (for the PLC)"
4license = "MIT OR Apache-2.0"
5+version = "0.3.3"
6edition = "2024"
7default-run = "allegedly"
8···16http-body-util = "0.1.3"
17log = "0.4.28"
18native-tls = "0.2.14"
19+opentelemetry = "0.30.0"
20+opentelemetry-otlp = { version = "0.30.0" }
21+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
22poem = { version = "3.1.12", features = ["acme", "compression"] }
23postgres-native-tls = "0.5.1"
24+reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
25reqwest-middleware = "0.4.2"
26reqwest-retry = "0.7.0"
27rustls = "0.23.32"
···32tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
33tokio-stream = { version = "0.1.17", features = ["io-util"] }
34tokio-util = { version = "0.7.16", features = ["compat"] }
35+tracing = "0.1.41"
36+tracing-opentelemetry = "0.31.0"
37tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···0000000000000000000000000000000000000
···1+use allegedly::{ExportPage, poll_upstream};
2+3+#[tokio::main]
4+async fn main() {
5+ // set to `None` to replay from the beginning of the PLC history
6+ let after = Some(chrono::Utc::now());
7+8+ // the PLC server to poll for new ops
9+ let upstream = "https://plc.wtf/export".parse().unwrap();
10+11+ // self-rate-limit (plc.directory's limit interval is 600ms)
12+ let throttle = std::time::Duration::from_millis(300);
13+14+ // pages are sent out of the poller via a tokio mpsc channel
15+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
16+17+ // spawn a tokio task to run the poller
18+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
19+20+ // receive pages of plc ops from the poller
21+ while let Some(ExportPage { ops }) = rx.recv().await {
22+ println!("received {} plc ops", ops.len());
23+24+ for op in ops {
25+ // in this example we're alerting when changes are found for one
26+ // specific identity
27+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
28+ println!(
29+ "Update found for {}! cid={}\n -> operation: {}",
30+ op.did,
31+ op.cid,
32+ op.operation.get()
33+ );
34+ }
35+ }
36+ }
37+}
+25-7
readme.md
···26 sudo allegedly mirror \
27 --upstream "https://plc.directory" \
28 --wrap "http://127.0.0.1:3000" \
029 --acme-domain "plc.wtf" \
0030 --acme-cache-path ./acme-cache \
31- --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory"
00000000000000032 ```
3334···61- monitoring of the various tasks
62- health check pings
63- expose metrics/tracing
64-- read-only flag for mirror wrapper
65- bundle: write directly to s3-compatible object storage
66- helpers for automating periodic `bundle` runs
676869### new things
7071-- experimental: websocket version of /export
72-- experimental: accept writes by forwarding them upstream
73-- experimental: serve a tlog
74-- experimental: embed a log database directly for fast and efficient mirroring
75-- experimental: support multiple upstreams?
7677- [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
78- [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
···26 sudo allegedly mirror \
27 --upstream "https://plc.directory" \
28 --wrap "http://127.0.0.1:3000" \
29+ --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \
30 --acme-domain "plc.wtf" \
31+ --acme-domain "alt.plc.wtf" \
32+ --experimental-acme-domain "experimental.plc.wtf" \
33 --acme-cache-path ./acme-cache \
34+ --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \
35+ --acme-ipv6 \
36+ --experimental-write-upstream
37+ ```
38+39+- Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream
40+41+ ```bash
42+ sudo allegedly wrap \
43+ --wrap "http://127.0.0.1:3000" \
44+ --acme-ipv6 \
45+ --acme-cache-path ./acme-cache \
46+ --acme-domain "plc.wtf" \
47+ --experimental-acme-domain "experimental.plc.wtf" \
48+ --experimental-write-upstream \
49+ --upstream "https://plc.wtf" \
50 ```
5152···79- monitoring of the various tasks
80- health check pings
81- expose metrics/tracing
82+- [x] read-only flag for mirror wrapper
83- bundle: write directly to s3-compatible object storage
84- helpers for automating periodic `bundle` runs
858687### new things
8889+- [ ] experimental: websocket version of /export
90+- [x] experimental: accept writes by forwarding them upstream
91+- [ ] experimental: serve a tlog
92+- [ ] experimental: embed a log database directly for fast and efficient mirroring
93+- [ ] experimental: support multiple upstreams?
9495- [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
96- [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
+34-6
src/bin/allegedly.rs
···1-use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream};
02use clap::{CommandFactory, Parser, Subcommand};
3-use std::{path::PathBuf, time::Instant};
4use tokio::fs::create_dir_all;
5use tokio::sync::mpsc;
6···48 Mirror {
49 #[command(flatten)]
50 args: mirror::Args,
00000000051 },
52 /// Poll an upstream PLC server and log new ops to stdout
53 Tail {
···57 },
58}
590000000000000060#[tokio::main]
61async fn main() -> anyhow::Result<()> {
62 let args = Cli::parse();
63 let matches = Cli::command().get_matches();
64 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
65- bin_init(name);
06667 let globals = args.globals.clone();
68···76 } => {
77 let mut url = globals.upstream;
78 url.set_path("/export");
079 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
80 tokio::task::spawn(async move {
81- poll_upstream(Some(after), url, tx)
82 .await
83 .expect("to poll upstream")
84 });
···90 .await
91 .expect("to write bundles to output files");
92 }
93- Commands::Mirror { args } => mirror::run(globals, args).await?,
094 Commands::Tail { after } => {
95 let mut url = globals.upstream;
96 url.set_path("/export");
97 let start_at = after.or_else(|| Some(chrono::Utc::now()));
098 let (tx, rx) = mpsc::channel(1);
99 tokio::task::spawn(async move {
100- poll_upstream(start_at, url, tx)
101 .await
102 .expect("to poll upstream")
103 });
···2use tokio::sync::{mpsc, oneshot};
34mod backfill;
05mod client;
6mod mirror;
7mod plc_pg;
···12pub mod bin;
1314pub use backfill::backfill;
015pub use client::{CLIENT, UA};
16-pub use mirror::{ListenConf, serve};
17pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
18pub use poll::{PageBoundaryState, get_page, poll_upstream};
19-pub use ratelimit::GovernorMiddleware;
20pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
2122pub type Dt = chrono::DateTime<chrono::Utc>;
···143 env!("CARGO_PKG_VERSION"),
144 )
145}
146-147-pub fn bin_init(name: &str) {
148- if std::env::var_os("RUST_LOG").is_none() {
149- unsafe { std::env::set_var("RUST_LOG", "info") };
150- }
151- let filter = tracing_subscriber::EnvFilter::from_default_env();
152- tracing_subscriber::fmt()
153- .with_writer(std::io::stderr)
154- .with_env_filter(filter)
155- .init();
156-157- log::info!("{}", logo(name));
158-}
···2use tokio::sync::{mpsc, oneshot};
34mod backfill;
5+mod cached_value;
6mod client;
7mod mirror;
8mod plc_pg;
···13pub mod bin;
1415pub use backfill::backfill;
16+pub use cached_value::{CachedValue, Fetcher};
17pub use client::{CLIENT, UA};
18+pub use mirror::{ExperimentalConf, ListenConf, serve};
19pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
20pub use poll::{PageBoundaryState, get_page, poll_upstream};
21+pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
22pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
2324pub type Dt = chrono::DateTime<chrono::Utc>;
···145 env!("CARGO_PKG_VERSION"),
146 )
147}
0000000000000
+287-61
src/mirror.rs
···1-use crate::{GovernorMiddleware, UA, logo};
002use futures::TryStreamExt;
3use governor::Quota;
4use poem::{
5- Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get,
6- handler,
7- http::StatusCode,
8 listener::{Listener, TcpListener, acme::AutoCert},
9 middleware::{AddData, CatchPanic, Compression, Cors, Tracing},
10- web::{Data, Json},
11};
12use reqwest::{Client, Url};
13use std::{net::SocketAddr, path::PathBuf, time::Duration};
1415-#[derive(Debug, Clone)]
16struct State {
17 client: Client,
18 plc: Url,
19 upstream: Url,
0020}
2122-#[handler]
23-fn hello(Data(State { upstream, .. }): Data<&State>) -> String {
24- format!(
25- r#"{}
00260000000000000027This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
28synchronizes a local PLC reference server instance[2] (why?[3]).
29···3233 {upstream}
3400000000000000000000000000000000003536Available APIs:
37···40 - GET /* Proxies to wrapped server; see PLC API docs:
41 https://web.plc.directory/api/redoc
4243- - POST /* Always rejected. This is a mirror.
4404546- tip: try `GET /{{did}}` to resolve an identity
47004849-Allegedly is a suit of open-source CLI tools for working with PLC logs:
5051- https://tangled.org/@microcosm.blue/Allegedly
525354[1] https://web.plc.directory
···64 include_bytes!("../favicon.ico").with_content_type("image/x-icon")
65}
6667-fn failed_to_reach_wrapped() -> String {
68 format!(
69 r#"{}
7071-Failed to reach the wrapped reference PLC server. Sorry.
72"#,
73 logo("mirror 502 :( ")
74 )
75}
7677-async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) {
0000000000000078 use serde_json::json;
7980 let mut url = url.clone();
···110 }
111}
1120000000000000000000113#[handler]
114async fn health(
115 Data(State {
116 plc,
117 client,
118- upstream,
0119 }): Data<&State>,
120) -> impl IntoResponse {
121 let mut overall_status = StatusCode::OK;
···123 if !ok {
124 overall_status = StatusCode::BAD_GATEWAY;
125 }
126- let (ok, upstream_status) = plc_status(upstream, client).await;
127- if !ok {
128- overall_status = StatusCode::BAD_GATEWAY;
0000000000000000000000000000129 }
130- (
131- overall_status,
132- Json(serde_json::json!({
133- "server": "allegedly (mirror)",
134- "version": env!("CARGO_PKG_VERSION"),
135- "wrapped_plc": wrapped_status,
136- "upstream_plc": upstream_status,
137- })),
138- )
00000000139}
140141#[handler]
142-async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> {
143 let mut target = state.plc.clone();
144 target.set_path(req.uri().path());
145- let upstream_res = state
0146 .client
147 .get(target)
148 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server
···151 .await
152 .map_err(|e| {
153 log::error!("upstream req fail: {e}");
154- Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY)
000155 })?;
156157- let http_res: poem::http::Response<reqwest::Body> = upstream_res.into();
158- let (parts, reqw_body) = http_res.into_parts();
159160- let parts = poem::ResponseParts {
161- status: parts.status,
162- version: parts.version,
163- headers: parts.headers,
164- extensions: parts.extensions,
165- };
000000000000000000166167- let body = http_body_util::BodyDataStream::new(reqw_body)
168- .map_err(|e| std::io::Error::other(Box::new(e)));
0000000000000169170- Ok(Response::from_parts(
171- parts,
172- poem::Body::from_bytes_stream(body),
173- ))
00000000000000174}
175176#[handler]
···182183Sorry, this server does not accept POST requests.
184185-You may wish to try upstream: {upstream}
00186"#,
187 logo("mirror (nope)")
188 ),
···195 domains: Vec<String>,
196 cache_path: PathBuf,
197 directory_url: String,
0198 },
199 Bind(SocketAddr),
200}
201202-pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> {
000000000000203 log::info!("starting server...");
204205 // not using crate CLIENT: don't want the retries etc
···209 .build()
210 .expect("reqwest client to build");
211000000000212 let state = State {
213 client,
214 plc,
215 upstream: upstream.clone(),
00216 };
217218- let app = Route::new()
219 .at("/", get(hello))
220 .at("/favicon.ico", get(favicon))
221 .at("/_health", get(health))
222- .at("/:any", get(proxy).post(nope))
00000000000000000223 .with(AddData::new(state))
224 .with(Cors::new().allow_credentials(false))
225 .with(Compression::new())
226- .with(GovernorMiddleware::new(Quota::per_minute(
227 3000.try_into().expect("ratelimit middleware to build"),
228- )))
229 .with(CatchPanic::new())
230 .with(Tracing);
231···234 domains,
235 cache_path,
236 directory_url,
0237 } => {
238 rustls::crypto::aws_lc_rs::default_provider()
239 .install_default()
···247 }
248 let auto_cert = auto_cert.build().expect("acme config to build");
249250- let notice_task = tokio::task::spawn(run_insecure_notice());
251- let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await;
000252 log::warn!("server task ended, aborting insecure server task...");
253 notice_task.abort();
254 app_res?;
···272}
273274/// kick off a tiny little server on a tokio task to tell people to use 443
275-async fn run_insecure_notice() -> Result<(), std::io::Error> {
276 #[handler]
277 fn oop_plz_be_secure() -> (StatusCode, String) {
278 (
···288 }
289290 let app = Route::new()
291- .at("/", get(oop_plz_be_secure))
292 .at("/favicon.ico", get(favicon))
0293 .with(Tracing);
294- Server::new(TcpListener::bind("0.0.0.0:80"))
295- .name("allegedly (mirror:80 helper)")
296- .run(app)
297- .await
0000298}
···1+use crate::{
2+ CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo,
3+};
4use futures::TryStreamExt;
5use governor::Quota;
6use poem::{
7+ Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server,
8+ get, handler,
9+ http::{StatusCode, header::USER_AGENT},
10 listener::{Listener, TcpListener, acme::AutoCert},
11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing},
12+ web::{Data, Json, Path},
13};
14use reqwest::{Client, Url};
15use std::{net::SocketAddr, path::PathBuf, time::Duration};
1617+#[derive(Clone)]
18struct State {
19 client: Client,
20 plc: Url,
21 upstream: Url,
22+ sync_info: Option<SyncInfo>,
23+ experimental: ExperimentalConf,
24}
2526+/// server info that only applies in mirror (synchronizing) mode
27+#[derive(Clone)]
28+struct SyncInfo {
29+ latest_at: CachedValue<Dt, GetLatestAt>,
30+ upstream_status: CachedValue<PlcStatus, CheckUpstream>,
31+}
3233+#[handler]
34+fn hello(
35+ Data(State {
36+ sync_info,
37+ upstream,
38+ experimental: exp,
39+ ..
40+ }): Data<&State>,
41+ req: &Request,
42+) -> String {
43+ // let mode = if sync_info.is_some() { "mirror" } else { "wrap" };
44+ let pre_info = if sync_info.is_some() {
45+ format!(
46+ r#"
47This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
48synchronizes a local PLC reference server instance[2] (why?[3]).
49···5253 {upstream}
5455+"#
56+ )
57+ } else {
58+ format!(
59+ r#"
60+This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse-
61+proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy.
62+63+64+Configured upstream (only used if experimental op forwarding is enabled):
65+66+ {upstream}
67+68+"#
69+ )
70+ };
71+72+ let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) {
73+ (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(),
74+ (_, None, _) => {
75+ " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
76+ }
77+ (_, Some(d), Some(f)) if f == d => {
78+ " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
79+ }
80+ (_, Some(d), _) => format!(
81+ r#" - POST /* Rejected, but experimental upstream op forwarding is
82+ available at `POST https://{d}/:did`!"#
83+ ),
84+ };
85+86+ format!(
87+ r#"{}
88+{pre_info}
8990Available APIs:
91···94 - GET /* Proxies to wrapped server; see PLC API docs:
95 https://web.plc.directory/api/redoc
9697+ tip: try `GET /{{did}}` to resolve an identity
9899+{post_info}
1000101102+Allegedly is a suite of open-source CLI tools from for working with PLC logs,
103+from microcosm:
104105+ https://tangled.org/@microcosm.blue/Allegedly
106107+ https://microcosm.blue
108109110[1] https://web.plc.directory
···120 include_bytes!("../favicon.ico").with_content_type("image/x-icon")
121}
122123+fn failed_to_reach_named(name: &str) -> String {
124 format!(
125 r#"{}
126127+Failed to reach the {name} server. Sorry.
128"#,
129 logo("mirror 502 :( ")
130 )
131}
132133+fn bad_create_op(reason: &str) -> Response {
134+ Response::builder()
135+ .status(StatusCode::BAD_REQUEST)
136+ .body(format!(
137+ r#"{}
138+139+NooOOOooooo: {reason}
140+"#,
141+ logo("mirror 400 >:( ")
142+ ))
143+}
144+145+type PlcStatus = (bool, serde_json::Value);
146+147+async fn plc_status(url: &Url, client: &Client) -> PlcStatus {
148 use serde_json::json;
149150 let mut url = url.clone();
···180 }
181}
182183+#[derive(Clone)]
184+struct GetLatestAt(Db);
185+impl Fetcher<Dt> for GetLatestAt {
186+ async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> {
187+ let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!(
188+ "expected to find at least one thing in the db"
189+ ))?;
190+ Ok(now)
191+ }
192+}
193+194+#[derive(Clone)]
195+struct CheckUpstream(Url, Client);
196+impl Fetcher<PlcStatus> for CheckUpstream {
197+ async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> {
198+ Ok(plc_status(&self.0, &self.1).await)
199+ }
200+}
201+202#[handler]
203async fn health(
204 Data(State {
205 plc,
206 client,
207+ sync_info,
208+ ..
209 }): Data<&State>,
210) -> impl IntoResponse {
211 let mut overall_status = StatusCode::OK;
···213 if !ok {
214 overall_status = StatusCode::BAD_GATEWAY;
215 }
216+ if let Some(SyncInfo {
217+ latest_at,
218+ upstream_status,
219+ }) = sync_info
220+ {
221+ // mirror mode
222+ let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
223+ if !ok {
224+ overall_status = StatusCode::BAD_GATEWAY;
225+ }
226+ let latest = latest_at.get().await.ok();
227+ (
228+ overall_status,
229+ Json(serde_json::json!({
230+ "server": "allegedly (mirror)",
231+ "version": env!("CARGO_PKG_VERSION"),
232+ "wrapped_plc": wrapped_status,
233+ "upstream_plc": upstream_status,
234+ "latest_at": latest,
235+ })),
236+ )
237+ } else {
238+ // wrap mode
239+ (
240+ overall_status,
241+ Json(serde_json::json!({
242+ "server": "allegedly (mirror)",
243+ "version": env!("CARGO_PKG_VERSION"),
244+ "wrapped_plc": wrapped_status,
245+ })),
246+ )
247 }
248+}
249+250+fn proxy_response(res: reqwest::Response) -> Response {
251+ let http_res: poem::http::Response<reqwest::Body> = res.into();
252+ let (parts, reqw_body) = http_res.into_parts();
253+254+ let parts = poem::ResponseParts {
255+ status: parts.status,
256+ version: parts.version,
257+ headers: parts.headers,
258+ extensions: parts.extensions,
259+ };
260+261+ let body = http_body_util::BodyDataStream::new(reqw_body)
262+ .map_err(|e| std::io::Error::other(Box::new(e)));
263+264+ Response::from_parts(parts, poem::Body::from_bytes_stream(body))
265}
266267#[handler]
268+async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> {
269 let mut target = state.plc.clone();
270 target.set_path(req.uri().path());
271+ target.set_query(req.uri().query());
272+ let wrapped_res = state
273 .client
274 .get(target)
275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server
···278 .await
279 .map_err(|e| {
280 log::error!("upstream req fail: {e}");
281+ Error::from_string(
282+ failed_to_reach_named("wrapped reference PLC"),
283+ StatusCode::BAD_GATEWAY,
284+ )
285 })?;
286287+ Ok(proxy_response(wrapped_res))
288+}
289290+#[handler]
291+async fn forward_create_op_upstream(
292+ Data(State {
293+ upstream,
294+ client,
295+ experimental,
296+ ..
297+ }): Data<&State>,
298+ Path(did): Path<String>,
299+ req: &Request,
300+ body: Body,
301+) -> Result<Response> {
302+ if let Some(expected_domain) = &experimental.acme_domain {
303+ let Some(found_host) = req.uri().host() else {
304+ return Ok(bad_create_op(&format!(
305+ "missing `Host` header, expected {expected_domain:?} for experimental requests."
306+ )));
307+ };
308+ if found_host != expected_domain {
309+ return Ok(bad_create_op(&format!(
310+ "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}"
311+ )));
312+ }
313+ }
314315+ // adjust proxied headers
316+ let mut headers: reqwest::header::HeaderMap = req.headers().clone();
317+ log::trace!("original request headers: {headers:?}");
318+ headers.insert("Host", upstream.host_str().unwrap().parse().unwrap());
319+ let client_ua = headers
320+ .get(USER_AGENT)
321+ .map(|h| h.to_str().unwrap())
322+ .unwrap_or("unknown");
323+ headers.insert(
324+ USER_AGENT,
325+ format!("{UA} (forwarding from {client_ua:?})")
326+ .parse()
327+ .unwrap(),
328+ );
329+ log::trace!("adjusted request headers: {headers:?}");
330331+ let mut target = upstream.clone();
332+ target.set_path(&did);
333+ let upstream_res = client
334+ .post(target)
335+ .timeout(Duration::from_secs(15)) // be a little generous
336+ .headers(headers)
337+ .body(reqwest::Body::wrap_stream(body.into_bytes_stream()))
338+ .send()
339+ .await
340+ .map_err(|e| {
341+ log::warn!("upstream write fail: {e}");
342+ Error::from_string(
343+ failed_to_reach_named("upstream PLC"),
344+ StatusCode::BAD_GATEWAY,
345+ )
346+ })?;
347+348+ Ok(proxy_response(upstream_res))
349}
350351#[handler]
···357358Sorry, this server does not accept POST requests.
359360+You may wish to try sending that to our upstream: {upstream}.
361+362+If you operate this server, try running with `--experimental-write-upstream`.
363"#,
364 logo("mirror (nope)")
365 ),
···372 domains: Vec<String>,
373 cache_path: PathBuf,
374 directory_url: String,
375+ ipv6: bool,
376 },
377 Bind(SocketAddr),
378}
379380+#[derive(Debug, Clone)]
381+pub struct ExperimentalConf {
382+ pub acme_domain: Option<String>,
383+ pub write_upstream: bool,
384+}
385+386+pub async fn serve(
387+ upstream: Url,
388+ plc: Url,
389+ listen: ListenConf,
390+ experimental: ExperimentalConf,
391+ db: Option<Db>,
392+) -> anyhow::Result<&'static str> {
393 log::info!("starting server...");
394395 // not using crate CLIENT: don't want the retries etc
···399 .build()
400 .expect("reqwest client to build");
401402+ // when `db` is None, we're running in wrap mode. no db access, no upstream sync
403+ let sync_info = db.map(|db| SyncInfo {
404+ latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)),
405+ upstream_status: CachedValue::new(
406+ CheckUpstream(upstream.clone(), client.clone()),
407+ Duration::from_secs(6),
408+ ),
409+ });
410+411 let state = State {
412 client,
413 plc,
414 upstream: upstream.clone(),
415+ sync_info,
416+ experimental: experimental.clone(),
417 };
418419+ let mut app = Route::new()
420 .at("/", get(hello))
421 .at("/favicon.ico", get(favicon))
422 .at("/_health", get(health))
423+ .at("/export", get(proxy));
424+425+ if experimental.write_upstream {
426+ log::info!("enabling experimental write forwarding to upstream");
427+428+ let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap()));
429+ let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap()));
430+431+ let upstream_proxier = forward_create_op_upstream
432+ .with(GovernorMiddleware::new(did_limiter))
433+ .with(GovernorMiddleware::new(ip_limiter));
434+435+ app = app.at("/did:plc:*", get(proxy).post(upstream_proxier));
436+ } else {
437+ app = app.at("/did:plc:*", get(proxy).post(nope));
438+ }
439+440+ let app = app
441 .with(AddData::new(state))
442 .with(Cors::new().allow_credentials(false))
443 .with(Compression::new())
444+ .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute(
445 3000.try_into().expect("ratelimit middleware to build"),
446+ ))))
447 .with(CatchPanic::new())
448 .with(Tracing);
449···452 domains,
453 cache_path,
454 directory_url,
455+ ipv6,
456 } => {
457 rustls::crypto::aws_lc_rs::default_provider()
458 .install_default()
···466 }
467 let auto_cert = auto_cert.build().expect("acme config to build");
468469+ log::trace!("auto_cert: {auto_cert:?}");
470+471+ let notice_task = tokio::task::spawn(run_insecure_notice(ipv6));
472+ let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" });
473+ let app_res = run(app, listener.acme(auto_cert)).await;
474 log::warn!("server task ended, aborting insecure server task...");
475 notice_task.abort();
476 app_res?;
···494}
495496/// kick off a tiny little server on a tokio task to tell people to use 443
497+async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> {
498 #[handler]
499 fn oop_plz_be_secure() -> (StatusCode, String) {
500 (
···510 }
511512 let app = Route::new()
0513 .at("/favicon.ico", get(favicon))
514+ .nest("/", get(oop_plz_be_secure))
515 .with(Tracing);
516+ Server::new(TcpListener::bind(if ipv6 {
517+ "[::]:80"
518+ } else {
519+ "0.0.0.0:80"
520+ }))
521+ .name("allegedly (mirror:80 helper)")
522+ .run(app)
523+ .await
524}
+53-8
src/poll.rs
···4use thiserror::Error;
5use tokio::sync::mpsc;
67-// plc.directory ratelimit on /export is 500 per 5 mins
8-const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600);
9-10#[derive(Debug, Error)]
11pub enum GetPageError {
12 #[error(transparent)]
···54 }
55}
5657-/// PLC
58#[derive(Debug, PartialEq)]
59pub struct PageBoundaryState {
00060 pub last_at: Dt,
061 keys_at: Vec<OpKey>, // expected to ~always be length one
62}
6364-/// track keys at final createdAt to deduplicate the start of the next page
65impl PageBoundaryState {
066 pub fn new(page: &ExportPage) -> Option<Self> {
67 // grab the very last op
68 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···7879 Some(me)
80 }
000000081 fn apply_to_next(&mut self, page: &mut ExportPage) {
82 // walk ops forward, kicking previously-seen ops until created_at advances
83 let to_remove: Vec<usize> = page
···127 }
128}
129000130pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
131 log::trace!("Getting page: {url}");
132···141 .split('\n')
142 .filter_map(|s| {
143 serde_json::from_str::<Op>(s)
144- .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
0000145 .ok()
146 })
147 .collect();
···151 Ok((ExportPage { ops }, last_op))
152}
15300000000000000000000000000000154pub async fn poll_upstream(
155 after: Option<Dt>,
156 base: Url,
0157 dest: mpsc::Sender<ExportPage>,
158) -> anyhow::Result<&'static str> {
159- log::info!("starting upstream poller after {after:?}");
160- let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
161 let mut prev_last: Option<LastOp> = after.map(Into::into);
162 let mut boundary_state: Option<PageBoundaryState> = None;
163 loop {
···4use thiserror::Error;
5use tokio::sync::mpsc;
60007#[derive(Debug, Error)]
8pub enum GetPageError {
9 #[error(transparent)]
···51 }
52}
5354+/// State for removing duplicates ops between PLC export page boundaries
55#[derive(Debug, PartialEq)]
56pub struct PageBoundaryState {
57+ /// The previous page's last timestamp
58+ ///
59+ /// Duplicate ops from /export only occur for the same exact timestamp
60 pub last_at: Dt,
61+ /// The previous page's ops at its last timestamp
62 keys_at: Vec<OpKey>, // expected to ~always be length one
63}
64065impl PageBoundaryState {
66+ /// Initialize the boundary state with a PLC page
67 pub fn new(page: &ExportPage) -> Option<Self> {
68 // grab the very last op
69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···7980 Some(me)
81 }
82+ /// Apply the deduplication and update state
83+ ///
84+ /// The beginning of the page will be modified to remove duplicates from the
85+ /// previous page.
86+ ///
87+ /// The end of the page is inspected to update the deduplicator state for
88+ /// the next page.
89 fn apply_to_next(&mut self, page: &mut ExportPage) {
90 // walk ops forward, kicking previously-seen ops until created_at advances
91 let to_remove: Vec<usize> = page
···135 }
136}
137138+/// Get one PLC export page
139+///
140+/// Extracts the final op so it can be used to fetch the following page
141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
142 log::trace!("Getting page: {url}");
143···152 .split('\n')
153 .filter_map(|s| {
154 serde_json::from_str::<Op>(s)
155+ .inspect_err(|e| {
156+ if !s.is_empty() {
157+ log::warn!("failed to parse op: {e} ({s})")
158+ }
159+ })
160 .ok()
161 })
162 .collect();
···166 Ok((ExportPage { ops }, last_op))
167}
168169+/// Poll an upstream PLC server for new ops
170+///
171+/// Pages of operations are written to the `dest` channel.
172+///
173+/// ```no_run
174+/// # #[tokio::main]
175+/// # async fn main() {
176+/// use allegedly::{ExportPage, Op, poll_upstream};
177+///
178+/// let after = Some(chrono::Utc::now());
179+/// let upstream = "https://plc.wtf/export".parse().unwrap();
180+/// let throttle = std::time::Duration::from_millis(300);
181+///
182+/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
183+/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
184+///
185+/// while let Some(ExportPage { ops }) = rx.recv().await {
186+/// println!("received {} plc ops", ops.len());
187+///
188+/// for Op { did, cid, operation, .. } in ops {
189+/// // in this example we're alerting when changes are found for one
190+/// // specific identity
191+/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
192+/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
193+/// }
194+/// }
195+/// }
196+/// # }
197+/// ```
198pub async fn poll_upstream(
199 after: Option<Dt>,
200 base: Url,
201+ throttle: Duration,
202 dest: mpsc::Sender<ExportPage>,
203) -> anyhow::Result<&'static str> {
204+ log::info!("starting upstream poller at {base} after {after:?}");
205+ let mut tick = tokio::time::interval(throttle);
206 let mut prev_last: Option<LastOp> = after.map(Into::into);
207 let mut boundary_state: Option<PageBoundaryState> = None;
208 loop {
+88-33
src/ratelimit.rs
···8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode};
9use std::{
10 convert::TryInto,
011 net::{IpAddr, Ipv6Addr},
12 sync::{Arc, LazyLock},
13 time::Duration,
···20type IP6_56 = [u8; 7];
21type IP6_48 = [u8; 6];
2200000023fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> {
24 let period = quota.replenish_interval() / factor;
25 let burst = quota
···30}
3132#[derive(Debug)]
33-struct IpLimiters {
00000000000000000000000000000000034 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>,
35 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>,
36 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>,
···44 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
45 }
46 }
47- pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> {
000000000048 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now());
49 match ip {
50- addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf),
51 IpAddr::V6(a) => {
52 // always check all limiters
53 let check_ip = self
···74 }
75 }
76 }
0000000000077}
7879/// Once the rate limit has been reached, the middleware will respond with
80/// status code 429 (too many requests) and a `Retry-After` header with the amount
81/// of time that needs to pass before another request will be allowed.
82-#[derive(Debug)]
83-pub struct GovernorMiddleware {
84 #[allow(dead_code)]
85 stop_on_drop: oneshot::Sender<()>,
86- limiters: Arc<IpLimiters>,
87}
8889-impl GovernorMiddleware {
90 /// Limit request rates
91 ///
92 /// a little gross but this spawns a tokio task for housekeeping:
93 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping
94- pub fn new(quota: Quota) -> Self {
95- let limiters = Arc::new(IpLimiters::new(quota));
96 let (stop_on_drop, mut stopped) = oneshot::channel();
97 tokio::task::spawn({
98 let limiters = limiters.clone();
···102 _ = &mut stopped => break,
103 _ = tokio::time::sleep(Duration::from_secs(60)) => {},
104 };
105- log::debug!(
106- "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48",
107- limiters.per_ip.len(),
108- limiters.ip6_56.len(),
109- limiters.ip6_48.len(),
110- );
111- limiters.per_ip.retain_recent();
112- limiters.ip6_56.retain_recent();
113- limiters.ip6_48.retain_recent();
114 }
115 }
116 });
···121 }
122}
123124-impl<E: Endpoint> Middleware<E> for GovernorMiddleware {
125- type Output = GovernorMiddlewareImpl<E>;
0000126 fn transform(&self, ep: E) -> Self::Output {
127 GovernorMiddlewareImpl {
128 ep,
···131 }
132}
133134-pub struct GovernorMiddlewareImpl<E> {
135 ep: E,
136- limiters: Arc<IpLimiters>,
137}
138139-impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> {
0000140 type Output = E::Output;
141142 async fn call(&self, req: Request) -> Result<Self::Output> {
143- let remote = req
144- .remote_addr()
145- .as_socket_addr()
146- .expect("failed to get request's remote addr") // TODO
147- .ip();
148149- log::trace!("remote: {remote}");
150-151- match self.limiters.check_key(remote) {
152 Ok(_) => {
153- log::debug!("allowing remote {remote}");
154 self.ep.call(req).await
155 }
156 Err(d) => {
157 let wait_time = d.as_secs();
158159- log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s");
160161 let res = Response::builder()
162 .status(StatusCode::TOO_MANY_REQUESTS)
···8use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode};
9use std::{
10 convert::TryInto,
11+ hash::Hash,
12 net::{IpAddr, Ipv6Addr},
13 sync::{Arc, LazyLock},
14 time::Duration,
···21type IP6_56 = [u8; 7];
22type IP6_48 = [u8; 6];
2324+pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static {
25+ fn extract_key(&self, req: &Request) -> Result<K>;
26+ fn check_key(&self, ip: &K) -> Result<(), Duration>;
27+ fn housekeep(&self);
28+}
29+30fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> {
31 let period = quota.replenish_interval() / factor;
32 let burst = quota
···37}
3839#[derive(Debug)]
40+pub struct CreatePlcOpLimiter {
41+ limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>,
42+}
43+44+impl CreatePlcOpLimiter {
45+ pub fn new(quota: Quota) -> Self {
46+ Self {
47+ limiter: RateLimiter::keyed(quota),
48+ }
49+ }
50+}
51+52+/// this must be used with an endpoint with a single path param for the did
53+impl Limiter<String> for CreatePlcOpLimiter {
54+ fn extract_key(&self, req: &Request) -> Result<String> {
55+ let (did,) = req.path_params::<(String,)>()?;
56+ Ok(did)
57+ }
58+ fn check_key(&self, did: &String) -> Result<(), Duration> {
59+ self.limiter
60+ .check_key(did)
61+ .map_err(|e| e.wait_time_from(CLOCK.now()))
62+ }
63+ fn housekeep(&self) {
64+ log::debug!(
65+ "limiter size before housekeeping: {} dids",
66+ self.limiter.len()
67+ );
68+ self.limiter.retain_recent();
69+ }
70+}
71+72+#[derive(Debug)]
73+pub struct IpLimiters {
74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>,
75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>,
76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>,
···84 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
85 }
86 }
87+}
88+89+impl Limiter<IpAddr> for IpLimiters {
90+ fn extract_key(&self, req: &Request) -> Result<IpAddr> {
91+ Ok(req
92+ .remote_addr()
93+ .as_socket_addr()
94+ .expect("failed to get request's remote addr") // TODO
95+ .ip())
96+ }
97+ fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> {
98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now());
99 match ip {
100+ addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf),
101 IpAddr::V6(a) => {
102 // always check all limiters
103 let check_ip = self
···124 }
125 }
126 }
127+ fn housekeep(&self) {
128+ log::debug!(
129+ "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48",
130+ self.per_ip.len(),
131+ self.ip6_56.len(),
132+ self.ip6_48.len(),
133+ );
134+ self.per_ip.retain_recent();
135+ self.ip6_56.retain_recent();
136+ self.ip6_48.retain_recent();
137+ }
138}
139140/// Once the rate limit has been reached, the middleware will respond with
141/// status code 429 (too many requests) and a `Retry-After` header with the amount
142/// of time that needs to pass before another request will be allowed.
143+// #[derive(Debug)]
144+pub struct GovernorMiddleware<K> {
145 #[allow(dead_code)]
146 stop_on_drop: oneshot::Sender<()>,
147+ limiters: Arc<dyn Limiter<K>>,
148}
149150+impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> {
151 /// Limit request rates
152 ///
153 /// a little gross but this spawns a tokio task for housekeeping:
154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping
155+ pub fn new(limiters: impl Limiter<K>) -> Self {
156+ let limiters = Arc::new(limiters);
157 let (stop_on_drop, mut stopped) = oneshot::channel();
158 tokio::task::spawn({
159 let limiters = limiters.clone();
···163 _ = &mut stopped => break,
164 _ = tokio::time::sleep(Duration::from_secs(60)) => {},
165 };
166+ limiters.housekeep();
00000000167 }
168 }
169 });
···174 }
175}
176177+impl<E, K> Middleware<E> for GovernorMiddleware<K>
178+where
179+ E: Endpoint,
180+ K: Hash + std::fmt::Debug + Send + Sync + 'static,
181+{
182+ type Output = GovernorMiddlewareImpl<E, K>;
183 fn transform(&self, ep: E) -> Self::Output {
184 GovernorMiddlewareImpl {
185 ep,
···188 }
189}
190191+pub struct GovernorMiddlewareImpl<E, K> {
192 ep: E,
193+ limiters: Arc<dyn Limiter<K>>,
194}
195196+impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K>
197+where
198+ E: Endpoint,
199+ K: Hash + std::fmt::Debug + Send + Sync + 'static,
200+{
201 type Output = E::Output;
202203 async fn call(&self, req: Request) -> Result<Self::Output> {
204+ let key = self.limiters.extract_key(&req)?;
0000205206+ match self.limiters.check_key(&key) {
00207 Ok(_) => {
208+ log::debug!("allowing key {key:?}");
209 self.ep.call(req).await
210 }
211 Err(d) => {
212 let wait_time = d.as_secs();
213214+ log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s");
215216 let res = Response::builder()
217 .status(StatusCode::TOO_MANY_REQUESTS)