···22name = "allegedly"
33description = "public ledger server tools and services (for the PLC)"
44license = "MIT OR Apache-2.0"
55-version = "0.2.1"
55+version = "0.3.3"
66edition = "2024"
77default-run = "allegedly"
88···1616http-body-util = "0.1.3"
1717log = "0.4.28"
1818native-tls = "0.2.14"
1919+opentelemetry = "0.30.0"
2020+opentelemetry-otlp = { version = "0.30.0" }
2121+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
1922poem = { version = "3.1.12", features = ["acme", "compression"] }
2023postgres-native-tls = "0.5.1"
2121-reqwest = { version = "0.12.23", features = ["stream", "json"] }
2424+reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
2225reqwest-middleware = "0.4.2"
2326reqwest-retry = "0.7.0"
2427rustls = "0.23.32"
···2932tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
3033tokio-stream = { version = "0.1.17", features = ["io-util"] }
3134tokio-util = { version = "0.7.16", features = ["compat"] }
3535+tracing = "0.1.41"
3636+tracing-opentelemetry = "0.31.0"
3237tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···11+use allegedly::{ExportPage, poll_upstream};
22+33+#[tokio::main]
44+async fn main() {
55+ // set to `None` to replay from the beginning of the PLC history
66+ let after = Some(chrono::Utc::now());
77+88+ // the PLC server to poll for new ops
99+ let upstream = "https://plc.wtf/export".parse().unwrap();
1010+1111+ // self-rate-limit (plc.directory's limit interval is 600ms)
1212+ let throttle = std::time::Duration::from_millis(300);
1313+1414+ // pages are sent out of the poller via a tokio mpsc channel
1515+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1616+1717+ // spawn a tokio task to run the poller
1818+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
1919+2020+ // receive pages of plc ops from the poller
2121+ while let Some(ExportPage { ops }) = rx.recv().await {
2222+ println!("received {} plc ops", ops.len());
2323+2424+ for op in ops {
2525+ // in this example we're alerting when changes are found for one
2626+ // specific identity
2727+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
2828+ println!(
2929+ "Update found for {}! cid={}\n -> operation: {}",
3030+ op.did,
3131+ op.cid,
3232+ op.operation.get()
3333+ );
3434+ }
3535+ }
3636+ }
3737+}
+25-7
readme.md
···2626 sudo allegedly mirror \
2727 --upstream "https://plc.directory" \
2828 --wrap "http://127.0.0.1:3000" \
2929+ --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \
2930 --acme-domain "plc.wtf" \
3131+ --acme-domain "alt.plc.wtf" \
3232+ --experimental-acme-domain "experimental.plc.wtf" \
3033 --acme-cache-path ./acme-cache \
3131- --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory"
3434+ --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \
3535+ --acme-ipv6 \
3636+ --experimental-write-upstream
3737+ ```
3838+3939+- Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream
4040+4141+ ```bash
4242+ sudo allegedly wrap \
4343+ --wrap "http://127.0.0.1:3000" \
4444+ --acme-ipv6 \
4545+ --acme-cache-path ./acme-cache \
4646+ --acme-domain "plc.wtf" \
4747+ --experimental-acme-domain "experimental.plc.wtf" \
4848+ --experimental-write-upstream \
4949+ --upstream "https://plc.wtf" \
3250 ```
33513452···6179- monitoring of the various tasks
6280- health check pings
6381- expose metrics/tracing
6464-- read-only flag for mirror wrapper
8282+- [x] read-only flag for mirror wrapper
6583- bundle: write directly to s3-compatible object storage
6684- helpers for automating periodic `bundle` runs
678568866987### new things
70887171-- experimental: websocket version of /export
7272-- experimental: accept writes by forwarding them upstream
7373-- experimental: serve a tlog
7474-- experimental: embed a log database directly for fast and efficient mirroring
7575-- experimental: support multiple upstreams?
8989+- [ ] experimental: websocket version of /export
9090+- [x] experimental: accept writes by forwarding them upstream
9191+- [ ] experimental: serve a tlog
9292+- [ ] experimental: embed a log database directly for fast and efficient mirroring
9393+- [ ] experimental: support multiple upstreams?
76947795- [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range
7896- [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
···22use tokio::sync::{mpsc, oneshot};
3344mod backfill;
55+mod cached_value;
56mod client;
67mod mirror;
78mod plc_pg;
···1213pub mod bin;
13141415pub use backfill::backfill;
1616+pub use cached_value::{CachedValue, Fetcher};
1517pub use client::{CLIENT, UA};
1616-pub use mirror::{ListenConf, serve};
1818+pub use mirror::{ExperimentalConf, ListenConf, serve};
1719pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
1820pub use poll::{PageBoundaryState, get_page, poll_upstream};
1919-pub use ratelimit::GovernorMiddleware;
2121+pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
2022pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
21232224pub type Dt = chrono::DateTime<chrono::Utc>;
···143145 env!("CARGO_PKG_VERSION"),
144146 )
145147}
146146-147147-pub fn bin_init(name: &str) {
148148- if std::env::var_os("RUST_LOG").is_none() {
149149- unsafe { std::env::set_var("RUST_LOG", "info") };
150150- }
151151- let filter = tracing_subscriber::EnvFilter::from_default_env();
152152- tracing_subscriber::fmt()
153153- .with_writer(std::io::stderr)
154154- .with_env_filter(filter)
155155- .init();
156156-157157- log::info!("{}", logo(name));
158158-}
+287-61
src/mirror.rs
···11-use crate::{GovernorMiddleware, UA, logo};
11+use crate::{
22+ CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo,
33+};
24use futures::TryStreamExt;
35use governor::Quota;
46use poem::{
55- Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get,
66- handler,
77- http::StatusCode,
77+ Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server,
88+ get, handler,
99+ http::{StatusCode, header::USER_AGENT},
810 listener::{Listener, TcpListener, acme::AutoCert},
911 middleware::{AddData, CatchPanic, Compression, Cors, Tracing},
1010- web::{Data, Json},
1212+ web::{Data, Json, Path},
1113};
1214use reqwest::{Client, Url};
1315use std::{net::SocketAddr, path::PathBuf, time::Duration};
14161515-#[derive(Debug, Clone)]
1717+#[derive(Clone)]
1618struct State {
1719 client: Client,
1820 plc: Url,
1921 upstream: Url,
2222+ sync_info: Option<SyncInfo>,
2323+ experimental: ExperimentalConf,
2024}
21252222-#[handler]
2323-fn hello(Data(State { upstream, .. }): Data<&State>) -> String {
2424- format!(
2525- r#"{}
2626+/// server info that only applies in mirror (synchronizing) mode
2727+#[derive(Clone)]
2828+struct SyncInfo {
2929+ latest_at: CachedValue<Dt, GetLatestAt>,
3030+ upstream_status: CachedValue<PlcStatus, CheckUpstream>,
3131+}
26323333+#[handler]
3434+fn hello(
3535+ Data(State {
3636+ sync_info,
3737+ upstream,
3838+ experimental: exp,
3939+ ..
4040+ }): Data<&State>,
4141+ req: &Request,
4242+) -> String {
4343+ // let mode = if sync_info.is_some() { "mirror" } else { "wrap" };
4444+ let pre_info = if sync_info.is_some() {
4545+ format!(
4646+ r#"
2747This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
2848synchronizes a local PLC reference server instance[2] (why?[3]).
2949···32523353 {upstream}
34545555+"#
5656+ )
5757+ } else {
5858+ format!(
5959+ r#"
6060+This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse-
6161+proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy.
6262+6363+6464+Configured upstream (only used if experimental op forwarding is enabled):
6565+6666+ {upstream}
6767+6868+"#
6969+ )
7070+ };
7171+7272+ let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) {
7373+ (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(),
7474+ (_, None, _) => {
7575+ " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
7676+ }
7777+ (_, Some(d), Some(f)) if f == d => {
7878+ " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
7979+ }
8080+ (_, Some(d), _) => format!(
8181+ r#" - POST /* Rejected, but experimental upstream op forwarding is
8282+ available at `POST https://{d}/:did`!"#
8383+ ),
8484+ };
8585+8686+ format!(
8787+ r#"{}
8888+{pre_info}
35893690Available APIs:
3791···4094 - GET /* Proxies to wrapped server; see PLC API docs:
4195 https://web.plc.directory/api/redoc
42964343- - POST /* Always rejected. This is a mirror.
9797+ tip: try `GET /{{did}}` to resolve an identity
44989999+{post_info}
451004646- tip: try `GET /{{did}}` to resolve an identity
47101102102+Allegedly is a suite of open-source CLI tools from for working with PLC logs,
103103+from microcosm:
481044949-Allegedly is a suit of open-source CLI tools for working with PLC logs:
105105+ https://tangled.org/@microcosm.blue/Allegedly
501065151- https://tangled.org/@microcosm.blue/Allegedly
107107+ https://microcosm.blue
521085310954110[1] https://web.plc.directory
···64120 include_bytes!("../favicon.ico").with_content_type("image/x-icon")
65121}
661226767-fn failed_to_reach_wrapped() -> String {
123123+fn failed_to_reach_named(name: &str) -> String {
68124 format!(
69125 r#"{}
701267171-Failed to reach the wrapped reference PLC server. Sorry.
127127+Failed to reach the {name} server. Sorry.
72128"#,
73129 logo("mirror 502 :( ")
74130 )
75131}
761327777-async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) {
133133+fn bad_create_op(reason: &str) -> Response {
134134+ Response::builder()
135135+ .status(StatusCode::BAD_REQUEST)
136136+ .body(format!(
137137+ r#"{}
138138+139139+NooOOOooooo: {reason}
140140+"#,
141141+ logo("mirror 400 >:( ")
142142+ ))
143143+}
144144+145145+type PlcStatus = (bool, serde_json::Value);
146146+147147+async fn plc_status(url: &Url, client: &Client) -> PlcStatus {
78148 use serde_json::json;
7914980150 let mut url = url.clone();
···110180 }
111181}
112182183183+#[derive(Clone)]
184184+struct GetLatestAt(Db);
185185+impl Fetcher<Dt> for GetLatestAt {
186186+ async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> {
187187+ let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!(
188188+ "expected to find at least one thing in the db"
189189+ ))?;
190190+ Ok(now)
191191+ }
192192+}
193193+194194+#[derive(Clone)]
195195+struct CheckUpstream(Url, Client);
196196+impl Fetcher<PlcStatus> for CheckUpstream {
197197+ async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> {
198198+ Ok(plc_status(&self.0, &self.1).await)
199199+ }
200200+}
201201+113202#[handler]
114203async fn health(
115204 Data(State {
116205 plc,
117206 client,
118118- upstream,
207207+ sync_info,
208208+ ..
119209 }): Data<&State>,
120210) -> impl IntoResponse {
121211 let mut overall_status = StatusCode::OK;
···123213 if !ok {
124214 overall_status = StatusCode::BAD_GATEWAY;
125215 }
126126- let (ok, upstream_status) = plc_status(upstream, client).await;
127127- if !ok {
128128- overall_status = StatusCode::BAD_GATEWAY;
216216+ if let Some(SyncInfo {
217217+ latest_at,
218218+ upstream_status,
219219+ }) = sync_info
220220+ {
221221+ // mirror mode
222222+ let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
223223+ if !ok {
224224+ overall_status = StatusCode::BAD_GATEWAY;
225225+ }
226226+ let latest = latest_at.get().await.ok();
227227+ (
228228+ overall_status,
229229+ Json(serde_json::json!({
230230+ "server": "allegedly (mirror)",
231231+ "version": env!("CARGO_PKG_VERSION"),
232232+ "wrapped_plc": wrapped_status,
233233+ "upstream_plc": upstream_status,
234234+ "latest_at": latest,
235235+ })),
236236+ )
237237+ } else {
238238+ // wrap mode
239239+ (
240240+ overall_status,
241241+ Json(serde_json::json!({
242242+ "server": "allegedly (mirror)",
243243+ "version": env!("CARGO_PKG_VERSION"),
244244+ "wrapped_plc": wrapped_status,
245245+ })),
246246+ )
129247 }
130130- (
131131- overall_status,
132132- Json(serde_json::json!({
133133- "server": "allegedly (mirror)",
134134- "version": env!("CARGO_PKG_VERSION"),
135135- "wrapped_plc": wrapped_status,
136136- "upstream_plc": upstream_status,
137137- })),
138138- )
248248+}
249249+250250+fn proxy_response(res: reqwest::Response) -> Response {
251251+ let http_res: poem::http::Response<reqwest::Body> = res.into();
252252+ let (parts, reqw_body) = http_res.into_parts();
253253+254254+ let parts = poem::ResponseParts {
255255+ status: parts.status,
256256+ version: parts.version,
257257+ headers: parts.headers,
258258+ extensions: parts.extensions,
259259+ };
260260+261261+ let body = http_body_util::BodyDataStream::new(reqw_body)
262262+ .map_err(|e| std::io::Error::other(Box::new(e)));
263263+264264+ Response::from_parts(parts, poem::Body::from_bytes_stream(body))
139265}
140266141267#[handler]
142142-async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> {
268268+async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> {
143269 let mut target = state.plc.clone();
144270 target.set_path(req.uri().path());
145145- let upstream_res = state
271271+ target.set_query(req.uri().query());
272272+ let wrapped_res = state
146273 .client
147274 .get(target)
148275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server
···151278 .await
152279 .map_err(|e| {
153280 log::error!("upstream req fail: {e}");
154154- Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY)
281281+ Error::from_string(
282282+ failed_to_reach_named("wrapped reference PLC"),
283283+ StatusCode::BAD_GATEWAY,
284284+ )
155285 })?;
156286157157- let http_res: poem::http::Response<reqwest::Body> = upstream_res.into();
158158- let (parts, reqw_body) = http_res.into_parts();
287287+ Ok(proxy_response(wrapped_res))
288288+}
159289160160- let parts = poem::ResponseParts {
161161- status: parts.status,
162162- version: parts.version,
163163- headers: parts.headers,
164164- extensions: parts.extensions,
165165- };
290290+#[handler]
291291+async fn forward_create_op_upstream(
292292+ Data(State {
293293+ upstream,
294294+ client,
295295+ experimental,
296296+ ..
297297+ }): Data<&State>,
298298+ Path(did): Path<String>,
299299+ req: &Request,
300300+ body: Body,
301301+) -> Result<Response> {
302302+ if let Some(expected_domain) = &experimental.acme_domain {
303303+ let Some(found_host) = req.uri().host() else {
304304+ return Ok(bad_create_op(&format!(
305305+ "missing `Host` header, expected {expected_domain:?} for experimental requests."
306306+ )));
307307+ };
308308+ if found_host != expected_domain {
309309+ return Ok(bad_create_op(&format!(
310310+ "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}"
311311+ )));
312312+ }
313313+ }
166314167167- let body = http_body_util::BodyDataStream::new(reqw_body)
168168- .map_err(|e| std::io::Error::other(Box::new(e)));
315315+ // adjust proxied headers
316316+ let mut headers: reqwest::header::HeaderMap = req.headers().clone();
317317+ log::trace!("original request headers: {headers:?}");
318318+ headers.insert("Host", upstream.host_str().unwrap().parse().unwrap());
319319+ let client_ua = headers
320320+ .get(USER_AGENT)
321321+ .map(|h| h.to_str().unwrap())
322322+ .unwrap_or("unknown");
323323+ headers.insert(
324324+ USER_AGENT,
325325+ format!("{UA} (forwarding from {client_ua:?})")
326326+ .parse()
327327+ .unwrap(),
328328+ );
329329+ log::trace!("adjusted request headers: {headers:?}");
169330170170- Ok(Response::from_parts(
171171- parts,
172172- poem::Body::from_bytes_stream(body),
173173- ))
331331+ let mut target = upstream.clone();
332332+ target.set_path(&did);
333333+ let upstream_res = client
334334+ .post(target)
335335+ .timeout(Duration::from_secs(15)) // be a little generous
336336+ .headers(headers)
337337+ .body(reqwest::Body::wrap_stream(body.into_bytes_stream()))
338338+ .send()
339339+ .await
340340+ .map_err(|e| {
341341+ log::warn!("upstream write fail: {e}");
342342+ Error::from_string(
343343+ failed_to_reach_named("upstream PLC"),
344344+ StatusCode::BAD_GATEWAY,
345345+ )
346346+ })?;
347347+348348+ Ok(proxy_response(upstream_res))
174349}
175350176351#[handler]
···182357183358Sorry, this server does not accept POST requests.
184359185185-You may wish to try upstream: {upstream}
360360+You may wish to try sending that to our upstream: {upstream}.
361361+362362+If you operate this server, try running with `--experimental-write-upstream`.
186363"#,
187364 logo("mirror (nope)")
188365 ),
···195372 domains: Vec<String>,
196373 cache_path: PathBuf,
197374 directory_url: String,
375375+ ipv6: bool,
198376 },
199377 Bind(SocketAddr),
200378}
201379202202-pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> {
380380+#[derive(Debug, Clone)]
381381+pub struct ExperimentalConf {
382382+ pub acme_domain: Option<String>,
383383+ pub write_upstream: bool,
384384+}
385385+386386+pub async fn serve(
387387+ upstream: Url,
388388+ plc: Url,
389389+ listen: ListenConf,
390390+ experimental: ExperimentalConf,
391391+ db: Option<Db>,
392392+) -> anyhow::Result<&'static str> {
203393 log::info!("starting server...");
204394205395 // not using crate CLIENT: don't want the retries etc
···209399 .build()
210400 .expect("reqwest client to build");
211401402402+ // when `db` is None, we're running in wrap mode. no db access, no upstream sync
403403+ let sync_info = db.map(|db| SyncInfo {
404404+ latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)),
405405+ upstream_status: CachedValue::new(
406406+ CheckUpstream(upstream.clone(), client.clone()),
407407+ Duration::from_secs(6),
408408+ ),
409409+ });
410410+212411 let state = State {
213412 client,
214413 plc,
215414 upstream: upstream.clone(),
415415+ sync_info,
416416+ experimental: experimental.clone(),
216417 };
217418218218- let app = Route::new()
419419+ let mut app = Route::new()
219420 .at("/", get(hello))
220421 .at("/favicon.ico", get(favicon))
221422 .at("/_health", get(health))
222222- .at("/:any", get(proxy).post(nope))
423423+ .at("/export", get(proxy));
424424+425425+ if experimental.write_upstream {
426426+ log::info!("enabling experimental write forwarding to upstream");
427427+428428+ let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap()));
429429+ let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap()));
430430+431431+ let upstream_proxier = forward_create_op_upstream
432432+ .with(GovernorMiddleware::new(did_limiter))
433433+ .with(GovernorMiddleware::new(ip_limiter));
434434+435435+ app = app.at("/did:plc:*", get(proxy).post(upstream_proxier));
436436+ } else {
437437+ app = app.at("/did:plc:*", get(proxy).post(nope));
438438+ }
439439+440440+ let app = app
223441 .with(AddData::new(state))
224442 .with(Cors::new().allow_credentials(false))
225443 .with(Compression::new())
226226- .with(GovernorMiddleware::new(Quota::per_minute(
444444+ .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute(
227445 3000.try_into().expect("ratelimit middleware to build"),
228228- )))
446446+ ))))
229447 .with(CatchPanic::new())
230448 .with(Tracing);
231449···234452 domains,
235453 cache_path,
236454 directory_url,
455455+ ipv6,
237456 } => {
238457 rustls::crypto::aws_lc_rs::default_provider()
239458 .install_default()
···247466 }
248467 let auto_cert = auto_cert.build().expect("acme config to build");
249468250250- let notice_task = tokio::task::spawn(run_insecure_notice());
251251- let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await;
469469+ log::trace!("auto_cert: {auto_cert:?}");
470470+471471+ let notice_task = tokio::task::spawn(run_insecure_notice(ipv6));
472472+ let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" });
473473+ let app_res = run(app, listener.acme(auto_cert)).await;
252474 log::warn!("server task ended, aborting insecure server task...");
253475 notice_task.abort();
254476 app_res?;
···272494}
273495274496/// kick off a tiny little server on a tokio task to tell people to use 443
275275-async fn run_insecure_notice() -> Result<(), std::io::Error> {
497497+async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> {
276498 #[handler]
277499 fn oop_plz_be_secure() -> (StatusCode, String) {
278500 (
···288510 }
289511290512 let app = Route::new()
291291- .at("/", get(oop_plz_be_secure))
292513 .at("/favicon.ico", get(favicon))
514514+ .nest("/", get(oop_plz_be_secure))
293515 .with(Tracing);
294294- Server::new(TcpListener::bind("0.0.0.0:80"))
295295- .name("allegedly (mirror:80 helper)")
296296- .run(app)
297297- .await
516516+ Server::new(TcpListener::bind(if ipv6 {
517517+ "[::]:80"
518518+ } else {
519519+ "0.0.0.0:80"
520520+ }))
521521+ .name("allegedly (mirror:80 helper)")
522522+ .run(app)
523523+ .await
298524}
+53-8
src/poll.rs
···44use thiserror::Error;
55use tokio::sync::mpsc;
6677-// plc.directory ratelimit on /export is 500 per 5 mins
88-const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600);
99-107#[derive(Debug, Error)]
118pub enum GetPageError {
129 #[error(transparent)]
···5451 }
5552}
56535757-/// PLC
5454+/// State for removing duplicates ops between PLC export page boundaries
5855#[derive(Debug, PartialEq)]
5956pub struct PageBoundaryState {
5757+ /// The previous page's last timestamp
5858+ ///
5959+ /// Duplicate ops from /export only occur for the same exact timestamp
6060 pub last_at: Dt,
6161+ /// The previous page's ops at its last timestamp
6162 keys_at: Vec<OpKey>, // expected to ~always be length one
6263}
63646464-/// track keys at final createdAt to deduplicate the start of the next page
6565impl PageBoundaryState {
6666+ /// Initialize the boundary state with a PLC page
6667 pub fn new(page: &ExportPage) -> Option<Self> {
6768 // grab the very last op
6869 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···78797980 Some(me)
8081 }
8282+ /// Apply the deduplication and update state
8383+ ///
8484+ /// The beginning of the page will be modified to remove duplicates from the
8585+ /// previous page.
8686+ ///
8787+ /// The end of the page is inspected to update the deduplicator state for
8888+ /// the next page.
8189 fn apply_to_next(&mut self, page: &mut ExportPage) {
8290 // walk ops forward, kicking previously-seen ops until created_at advances
8391 let to_remove: Vec<usize> = page
···127135 }
128136}
129137138138+/// Get one PLC export page
139139+///
140140+/// Extracts the final op so it can be used to fetch the following page
130141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
131142 log::trace!("Getting page: {url}");
132143···141152 .split('\n')
142153 .filter_map(|s| {
143154 serde_json::from_str::<Op>(s)
144144- .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
155155+ .inspect_err(|e| {
156156+ if !s.is_empty() {
157157+ log::warn!("failed to parse op: {e} ({s})")
158158+ }
159159+ })
145160 .ok()
146161 })
147162 .collect();
···151166 Ok((ExportPage { ops }, last_op))
152167}
153168169169+/// Poll an upstream PLC server for new ops
170170+///
171171+/// Pages of operations are written to the `dest` channel.
172172+///
173173+/// ```no_run
174174+/// # #[tokio::main]
175175+/// # async fn main() {
176176+/// use allegedly::{ExportPage, Op, poll_upstream};
177177+///
178178+/// let after = Some(chrono::Utc::now());
179179+/// let upstream = "https://plc.wtf/export".parse().unwrap();
180180+/// let throttle = std::time::Duration::from_millis(300);
181181+///
182182+/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
183183+/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
184184+///
185185+/// while let Some(ExportPage { ops }) = rx.recv().await {
186186+/// println!("received {} plc ops", ops.len());
187187+///
188188+/// for Op { did, cid, operation, .. } in ops {
189189+/// // in this example we're alerting when changes are found for one
190190+/// // specific identity
191191+/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
192192+/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
193193+/// }
194194+/// }
195195+/// }
196196+/// # }
197197+/// ```
154198pub async fn poll_upstream(
155199 after: Option<Dt>,
156200 base: Url,
201201+ throttle: Duration,
157202 dest: mpsc::Sender<ExportPage>,
158203) -> anyhow::Result<&'static str> {
159159- log::info!("starting upstream poller after {after:?}");
160160- let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
204204+ log::info!("starting upstream poller at {base} after {after:?}");
205205+ let mut tick = tokio::time::interval(throttle);
161206 let mut prev_last: Option<LastOp> = after.map(Into::into);
162207 let mut boundary_state: Option<PageBoundaryState> = None;
163208 loop {
+88-33
src/ratelimit.rs
···88use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode};
99use std::{
1010 convert::TryInto,
1111+ hash::Hash,
1112 net::{IpAddr, Ipv6Addr},
1213 sync::{Arc, LazyLock},
1314 time::Duration,
···2021type IP6_56 = [u8; 7];
2122type IP6_48 = [u8; 6];
22232424+pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static {
2525+ fn extract_key(&self, req: &Request) -> Result<K>;
2626+ fn check_key(&self, ip: &K) -> Result<(), Duration>;
2727+ fn housekeep(&self);
2828+}
2929+2330fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> {
2431 let period = quota.replenish_interval() / factor;
2532 let burst = quota
···3037}
31383239#[derive(Debug)]
3333-struct IpLimiters {
4040+pub struct CreatePlcOpLimiter {
4141+ limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>,
4242+}
4343+4444+impl CreatePlcOpLimiter {
4545+ pub fn new(quota: Quota) -> Self {
4646+ Self {
4747+ limiter: RateLimiter::keyed(quota),
4848+ }
4949+ }
5050+}
5151+5252+/// this must be used with an endpoint with a single path param for the did
5353+impl Limiter<String> for CreatePlcOpLimiter {
5454+ fn extract_key(&self, req: &Request) -> Result<String> {
5555+ let (did,) = req.path_params::<(String,)>()?;
5656+ Ok(did)
5757+ }
5858+ fn check_key(&self, did: &String) -> Result<(), Duration> {
5959+ self.limiter
6060+ .check_key(did)
6161+ .map_err(|e| e.wait_time_from(CLOCK.now()))
6262+ }
6363+ fn housekeep(&self) {
6464+ log::debug!(
6565+ "limiter size before housekeeping: {} dids",
6666+ self.limiter.len()
6767+ );
6868+ self.limiter.retain_recent();
6969+ }
7070+}
7171+7272+#[derive(Debug)]
7373+pub struct IpLimiters {
3474 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>,
3575 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>,
3676 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>,
···4484 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")),
4585 }
4686 }
4747- pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> {
8787+}
8888+8989+impl Limiter<IpAddr> for IpLimiters {
9090+ fn extract_key(&self, req: &Request) -> Result<IpAddr> {
9191+ Ok(req
9292+ .remote_addr()
9393+ .as_socket_addr()
9494+ .expect("failed to get request's remote addr") // TODO
9595+ .ip())
9696+ }
9797+ fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> {
4898 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now());
4999 match ip {
5050- addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf),
100100+ addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf),
51101 IpAddr::V6(a) => {
52102 // always check all limiters
53103 let check_ip = self
···74124 }
75125 }
76126 }
127127+ fn housekeep(&self) {
128128+ log::debug!(
129129+ "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48",
130130+ self.per_ip.len(),
131131+ self.ip6_56.len(),
132132+ self.ip6_48.len(),
133133+ );
134134+ self.per_ip.retain_recent();
135135+ self.ip6_56.retain_recent();
136136+ self.ip6_48.retain_recent();
137137+ }
77138}
7813979140/// Once the rate limit has been reached, the middleware will respond with
80141/// status code 429 (too many requests) and a `Retry-After` header with the amount
81142/// of time that needs to pass before another request will be allowed.
8282-#[derive(Debug)]
8383-pub struct GovernorMiddleware {
143143+// #[derive(Debug)]
144144+pub struct GovernorMiddleware<K> {
84145 #[allow(dead_code)]
85146 stop_on_drop: oneshot::Sender<()>,
8686- limiters: Arc<IpLimiters>,
147147+ limiters: Arc<dyn Limiter<K>>,
87148}
881498989-impl GovernorMiddleware {
150150+impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> {
90151 /// Limit request rates
91152 ///
92153 /// a little gross but this spawns a tokio task for housekeeping:
93154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping
9494- pub fn new(quota: Quota) -> Self {
9595- let limiters = Arc::new(IpLimiters::new(quota));
155155+ pub fn new(limiters: impl Limiter<K>) -> Self {
156156+ let limiters = Arc::new(limiters);
96157 let (stop_on_drop, mut stopped) = oneshot::channel();
97158 tokio::task::spawn({
98159 let limiters = limiters.clone();
···102163 _ = &mut stopped => break,
103164 _ = tokio::time::sleep(Duration::from_secs(60)) => {},
104165 };
105105- log::debug!(
106106- "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48",
107107- limiters.per_ip.len(),
108108- limiters.ip6_56.len(),
109109- limiters.ip6_48.len(),
110110- );
111111- limiters.per_ip.retain_recent();
112112- limiters.ip6_56.retain_recent();
113113- limiters.ip6_48.retain_recent();
166166+ limiters.housekeep();
114167 }
115168 }
116169 });
···121174 }
122175}
123176124124-impl<E: Endpoint> Middleware<E> for GovernorMiddleware {
125125- type Output = GovernorMiddlewareImpl<E>;
177177+impl<E, K> Middleware<E> for GovernorMiddleware<K>
178178+where
179179+ E: Endpoint,
180180+ K: Hash + std::fmt::Debug + Send + Sync + 'static,
181181+{
182182+ type Output = GovernorMiddlewareImpl<E, K>;
126183 fn transform(&self, ep: E) -> Self::Output {
127184 GovernorMiddlewareImpl {
128185 ep,
···131188 }
132189}
133190134134-pub struct GovernorMiddlewareImpl<E> {
191191+pub struct GovernorMiddlewareImpl<E, K> {
135192 ep: E,
136136- limiters: Arc<IpLimiters>,
193193+ limiters: Arc<dyn Limiter<K>>,
137194}
138195139139-impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> {
196196+impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K>
197197+where
198198+ E: Endpoint,
199199+ K: Hash + std::fmt::Debug + Send + Sync + 'static,
200200+{
140201 type Output = E::Output;
141202142203 async fn call(&self, req: Request) -> Result<Self::Output> {
143143- let remote = req
144144- .remote_addr()
145145- .as_socket_addr()
146146- .expect("failed to get request's remote addr") // TODO
147147- .ip();
204204+ let key = self.limiters.extract_key(&req)?;
148205149149- log::trace!("remote: {remote}");
150150-151151- match self.limiters.check_key(remote) {
206206+ match self.limiters.check_key(&key) {
152207 Ok(_) => {
153153- log::debug!("allowing remote {remote}");
208208+ log::debug!("allowing key {key:?}");
154209 self.ep.call(req).await
155210 }
156211 Err(d) => {
157212 let wait_time = d.as_secs();
158213159159- log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s");
214214+ log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s");
160215161216 let res = Response::builder()
162217 .status(StatusCode::TOO_MANY_REQUESTS)