···22name = "allegedly"
33description = "public ledger server tools and services (for the PLC)"
44license = "MIT OR Apache-2.0"
55-version = "0.3.0"
55+version = "0.3.3"
66edition = "2024"
77default-run = "allegedly"
88···1616http-body-util = "0.1.3"
1717log = "0.4.28"
1818native-tls = "0.2.14"
1919+opentelemetry = "0.30.0"
2020+opentelemetry-otlp = { version = "0.30.0" }
2121+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
1922poem = { version = "3.1.12", features = ["acme", "compression"] }
2023postgres-native-tls = "0.5.1"
2121-reqwest = { version = "0.12.23", features = ["stream", "json"] }
2424+reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
2225reqwest-middleware = "0.4.2"
2326reqwest-retry = "0.7.0"
2427rustls = "0.23.32"
···2932tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
3033tokio-stream = { version = "0.1.17", features = ["io-util"] }
3134tokio-util = { version = "0.7.16", features = ["compat"] }
3535+tracing = "0.1.41"
3636+tracing-opentelemetry = "0.31.0"
3237tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···11+use allegedly::{ExportPage, poll_upstream};
22+33+#[tokio::main]
44+async fn main() {
55+ // set to `None` to replay from the beginning of the PLC history
66+ let after = Some(chrono::Utc::now());
77+88+ // the PLC server to poll for new ops
99+ let upstream = "https://plc.wtf/export".parse().unwrap();
1010+1111+ // self-rate-limit (plc.directory's limit interval is 600ms)
1212+ let throttle = std::time::Duration::from_millis(300);
1313+1414+ // pages are sent out of the poller via a tokio mpsc channel
1515+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1616+1717+ // spawn a tokio task to run the poller
1818+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
1919+2020+ // receive pages of plc ops from the poller
2121+ while let Some(ExportPage { ops }) = rx.recv().await {
2222+ println!("received {} plc ops", ops.len());
2323+2424+ for op in ops {
2525+ // in this example we're alerting when changes are found for one
2626+ // specific identity
2727+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
2828+ println!(
2929+ "Update found for {}! cid={}\n -> operation: {}",
3030+ op.did,
3131+ op.cid,
3232+ op.operation.get()
3333+ );
3434+ }
3535+ }
3636+ }
3737+}
+14-1
readme.md
···3636 --experimental-write-upstream
3737 ```
38383939+- Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream
4040+4141+ ```bash
4242+ sudo allegedly wrap \
4343+ --wrap "http://127.0.0.1:3000" \
4444+ --acme-ipv6 \
4545+ --acme-cache-path ./acme-cache \
4646+ --acme-domain "plc.wtf" \
4747+ --experimental-acme-domain "experimental.plc.wtf" \
4848+ --experimental-write-upstream \
4949+ --upstream "https://plc.wtf" \
5050+ ```
5151+39524053add `--help` to any command for more info about it
4154···6679- monitoring of the various tasks
6780- health check pings
6881- expose metrics/tracing
6969-- read-only flag for mirror wrapper
8282+- [x] read-only flag for mirror wrapper
7083- bundle: write directly to s3-compatible object storage
7184- helpers for automating periodic `bundle` runs
7285
···1919 client: Client,
2020 plc: Url,
2121 upstream: Url,
2222+ sync_info: Option<SyncInfo>,
2323+ experimental: ExperimentalConf,
2424+}
2525+2626+/// server info that only applies in mirror (synchronizing) mode
2727+#[derive(Clone)]
2828+struct SyncInfo {
2229 latest_at: CachedValue<Dt, GetLatestAt>,
2330 upstream_status: CachedValue<PlcStatus, CheckUpstream>,
2424- experimental: ExperimentalConf,
2531}
26322733#[handler]
2834fn hello(
2935 Data(State {
3636+ sync_info,
3037 upstream,
3138 experimental: exp,
3239 ..
3340 }): Data<&State>,
3441 req: &Request,
3542) -> String {
4343+ // let mode = if sync_info.is_some() { "mirror" } else { "wrap" };
4444+ let pre_info = if sync_info.is_some() {
4545+ format!(
4646+ r#"
4747+This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
4848+synchronizes a local PLC reference server instance[2] (why?[3]).
4949+5050+5151+Configured upstream:
5252+5353+ {upstream}
5454+5555+"#
5656+ )
5757+ } else {
5858+ format!(
5959+ r#"
6060+This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse-
6161+proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy.
6262+6363+6464+Configured upstream (only used if experimental op forwarding is enabled):
6565+6666+ {upstream}
6767+6868+"#
6969+ )
7070+ };
7171+3672 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) {
3773 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(),
3874 (_, None, _) => {
···4278 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
4379 }
4480 (_, Some(d), _) => format!(
4545- r#"\
4646- - POST /* Rejected, but experimental upstream op forwarding is
4747- available at `POST {d}/:did`!"#
8181+ r#" - POST /* Rejected, but experimental upstream op forwarding is
8282+ available at `POST https://{d}/:did`!"#
4883 ),
4984 };
50855186 format!(
5287 r#"{}
5353-5454-This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
5555-synchronizes a local PLC reference server instance[2] (why?[3]).
5656-5757-5858-Configured upstream:
5959-6060- {upstream}
6161-8888+{pre_info}
62896390Available APIs:
6491···177204 Data(State {
178205 plc,
179206 client,
180180- latest_at,
181181- upstream_status,
207207+ sync_info,
182208 ..
183209 }): Data<&State>,
184210) -> impl IntoResponse {
···187213 if !ok {
188214 overall_status = StatusCode::BAD_GATEWAY;
189215 }
190190- let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
191191- if !ok {
192192- overall_status = StatusCode::BAD_GATEWAY;
216216+ if let Some(SyncInfo {
217217+ latest_at,
218218+ upstream_status,
219219+ }) = sync_info
220220+ {
221221+ // mirror mode
222222+ let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
223223+ if !ok {
224224+ overall_status = StatusCode::BAD_GATEWAY;
225225+ }
226226+ let latest = latest_at.get().await.ok();
227227+ (
228228+ overall_status,
229229+ Json(serde_json::json!({
230230+ "server": "allegedly (mirror)",
231231+ "version": env!("CARGO_PKG_VERSION"),
232232+ "wrapped_plc": wrapped_status,
233233+ "upstream_plc": upstream_status,
234234+ "latest_at": latest,
235235+ })),
236236+ )
237237+ } else {
238238+ // wrap mode
239239+ (
240240+ overall_status,
241241+ Json(serde_json::json!({
242242+ "server": "allegedly (mirror)",
243243+ "version": env!("CARGO_PKG_VERSION"),
244244+ "wrapped_plc": wrapped_status,
245245+ })),
246246+ )
193247 }
194194- let latest = latest_at.get().await.ok();
195195- (
196196- overall_status,
197197- Json(serde_json::json!({
198198- "server": "allegedly (mirror)",
199199- "version": env!("CARGO_PKG_VERSION"),
200200- "wrapped_plc": wrapped_status,
201201- "upstream_plc": upstream_status,
202202- "latest_at": latest,
203203- })),
204204- )
205248}
206249207250fn proxy_response(res: reqwest::Response) -> Response {
···225268async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> {
226269 let mut target = state.plc.clone();
227270 target.set_path(req.uri().path());
271271+ target.set_query(req.uri().query());
228272 let wrapped_res = state
229273 .client
230274 .get(target)
···344388 plc: Url,
345389 listen: ListenConf,
346390 experimental: ExperimentalConf,
347347- db: Db,
391391+ db: Option<Db>,
348392) -> anyhow::Result<&'static str> {
349393 log::info!("starting server...");
350394···355399 .build()
356400 .expect("reqwest client to build");
357401358358- let latest_at = CachedValue::new(GetLatestAt(db), Duration::from_secs(2));
359359- let upstream_status = CachedValue::new(
360360- CheckUpstream(upstream.clone(), client.clone()),
361361- Duration::from_secs(6),
362362- );
402402+ // when `db` is None, we're running in wrap mode. no db access, no upstream sync
403403+ let sync_info = db.map(|db| SyncInfo {
404404+ latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)),
405405+ upstream_status: CachedValue::new(
406406+ CheckUpstream(upstream.clone(), client.clone()),
407407+ Duration::from_secs(6),
408408+ ),
409409+ });
363410364411 let state = State {
365412 client,
366413 plc,
367414 upstream: upstream.clone(),
368368- latest_at,
369369- upstream_status,
415415+ sync_info,
370416 experimental: experimental.clone(),
371417 };
372418373419 let mut app = Route::new()
374420 .at("/", get(hello))
375421 .at("/favicon.ico", get(favicon))
376376- .at("/_health", get(health));
422422+ .at("/_health", get(health))
423423+ .at("/export", get(proxy));
377424378425 if experimental.write_upstream {
379426 log::info!("enabling experimental write forwarding to upstream");
···385432 .with(GovernorMiddleware::new(did_limiter))
386433 .with(GovernorMiddleware::new(ip_limiter));
387434388388- app = app.at("/:any", get(proxy).post(upstream_proxier));
435435+ app = app.at("/did:plc:*", get(proxy).post(upstream_proxier));
389436 } else {
390390- app = app.at("/:any", get(proxy).post(nope));
437437+ app = app.at("/did:plc:*", get(proxy).post(nope));
391438 }
392439393440 let app = app
···463510 }
464511465512 let app = Route::new()
466466- .at("/", get(oop_plz_be_secure))
467513 .at("/favicon.ico", get(favicon))
514514+ .nest("/", get(oop_plz_be_secure))
468515 .with(Tracing);
469516 Server::new(TcpListener::bind(if ipv6 {
470517 "[::]:80"
+53-8
src/poll.rs
···44use thiserror::Error;
55use tokio::sync::mpsc;
6677-// plc.directory ratelimit on /export is 500 per 5 mins
88-const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600);
99-107#[derive(Debug, Error)]
118pub enum GetPageError {
129 #[error(transparent)]
···5451 }
5552}
56535757-/// PLC
5454+/// State for removing duplicates ops between PLC export page boundaries
5855#[derive(Debug, PartialEq)]
5956pub struct PageBoundaryState {
5757+ /// The previous page's last timestamp
5858+ ///
5959+ /// Duplicate ops from /export only occur for the same exact timestamp
6060 pub last_at: Dt,
6161+ /// The previous page's ops at its last timestamp
6162 keys_at: Vec<OpKey>, // expected to ~always be length one
6263}
63646464-/// track keys at final createdAt to deduplicate the start of the next page
6565impl PageBoundaryState {
6666+ /// Initialize the boundary state with a PLC page
6667 pub fn new(page: &ExportPage) -> Option<Self> {
6768 // grab the very last op
6869 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···78797980 Some(me)
8081 }
8282+ /// Apply the deduplication and update state
8383+ ///
8484+ /// The beginning of the page will be modified to remove duplicates from the
8585+ /// previous page.
8686+ ///
8787+ /// The end of the page is inspected to update the deduplicator state for
8888+ /// the next page.
8189 fn apply_to_next(&mut self, page: &mut ExportPage) {
8290 // walk ops forward, kicking previously-seen ops until created_at advances
8391 let to_remove: Vec<usize> = page
···127135 }
128136}
129137138138+/// Get one PLC export page
139139+///
140140+/// Extracts the final op so it can be used to fetch the following page
130141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
131142 log::trace!("Getting page: {url}");
132143···141152 .split('\n')
142153 .filter_map(|s| {
143154 serde_json::from_str::<Op>(s)
144144- .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
155155+ .inspect_err(|e| {
156156+ if !s.is_empty() {
157157+ log::warn!("failed to parse op: {e} ({s})")
158158+ }
159159+ })
145160 .ok()
146161 })
147162 .collect();
···151166 Ok((ExportPage { ops }, last_op))
152167}
153168169169+/// Poll an upstream PLC server for new ops
170170+///
171171+/// Pages of operations are written to the `dest` channel.
172172+///
173173+/// ```no_run
174174+/// # #[tokio::main]
175175+/// # async fn main() {
176176+/// use allegedly::{ExportPage, Op, poll_upstream};
177177+///
178178+/// let after = Some(chrono::Utc::now());
179179+/// let upstream = "https://plc.wtf/export".parse().unwrap();
180180+/// let throttle = std::time::Duration::from_millis(300);
181181+///
182182+/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
183183+/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
184184+///
185185+/// while let Some(ExportPage { ops }) = rx.recv().await {
186186+/// println!("received {} plc ops", ops.len());
187187+///
188188+/// for Op { did, cid, operation, .. } in ops {
189189+/// // in this example we're alerting when changes are found for one
190190+/// // specific identity
191191+/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
192192+/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
193193+/// }
194194+/// }
195195+/// }
196196+/// # }
197197+/// ```
154198pub async fn poll_upstream(
155199 after: Option<Dt>,
156200 base: Url,
201201+ throttle: Duration,
157202 dest: mpsc::Sender<ExportPage>,
158203) -> anyhow::Result<&'static str> {
159159- log::info!("starting upstream poller after {after:?}");
160160- let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
204204+ log::info!("starting upstream poller at {base} after {after:?}");
205205+ let mut tick = tokio::time::interval(throttle);
161206 let mut prev_last: Option<LastOp> = after.map(Into::into);
162207 let mut boundary_state: Option<PageBoundaryState> = None;
163208 loop {