···2name = "allegedly"
3description = "public ledger server tools and services (for the PLC)"
4license = "MIT OR Apache-2.0"
5-version = "0.3.0"
6edition = "2024"
7default-run = "allegedly"
8···16http-body-util = "0.1.3"
17log = "0.4.28"
18native-tls = "0.2.14"
00019poem = { version = "3.1.12", features = ["acme", "compression"] }
20postgres-native-tls = "0.5.1"
21-reqwest = { version = "0.12.23", features = ["stream", "json"] }
22reqwest-middleware = "0.4.2"
23reqwest-retry = "0.7.0"
24rustls = "0.23.32"
···29tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
30tokio-stream = { version = "0.1.17", features = ["io-util"] }
31tokio-util = { version = "0.7.16", features = ["compat"] }
0032tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
···2name = "allegedly"
3description = "public ledger server tools and services (for the PLC)"
4license = "MIT OR Apache-2.0"
5+version = "0.3.3"
6edition = "2024"
7default-run = "allegedly"
8···16http-body-util = "0.1.3"
17log = "0.4.28"
18native-tls = "0.2.14"
19+opentelemetry = "0.30.0"
20+opentelemetry-otlp = { version = "0.30.0" }
21+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] }
22poem = { version = "3.1.12", features = ["acme", "compression"] }
23postgres-native-tls = "0.5.1"
24+reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] }
25reqwest-middleware = "0.4.2"
26reqwest-retry = "0.7.0"
27rustls = "0.23.32"
···32tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] }
33tokio-stream = { version = "0.1.17", features = ["io-util"] }
34tokio-util = { version = "0.7.16", features = ["compat"] }
35+tracing = "0.1.41"
36+tracing-opentelemetry = "0.31.0"
37tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···0000000000000000000000000000000000000
···1+use allegedly::{ExportPage, poll_upstream};
2+3+#[tokio::main]
4+async fn main() {
5+ // set to `None` to replay from the beginning of the PLC history
6+ let after = Some(chrono::Utc::now());
7+8+ // the PLC server to poll for new ops
9+ let upstream = "https://plc.wtf/export".parse().unwrap();
10+11+ // self-rate-limit (plc.directory's limit interval is 600ms)
12+ let throttle = std::time::Duration::from_millis(300);
13+14+ // pages are sent out of the poller via a tokio mpsc channel
15+ let (tx, mut rx) = tokio::sync::mpsc::channel(1);
16+17+ // spawn a tokio task to run the poller
18+ tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
19+20+ // receive pages of plc ops from the poller
21+ while let Some(ExportPage { ops }) = rx.recv().await {
22+ println!("received {} plc ops", ops.len());
23+24+ for op in ops {
25+ // in this example we're alerting when changes are found for one
26+ // specific identity
27+ if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
28+ println!(
29+ "Update found for {}! cid={}\n -> operation: {}",
30+ op.did,
31+ op.cid,
32+ op.operation.get()
33+ );
34+ }
35+ }
36+ }
37+}
+14-1
readme.md
···36 --experimental-write-upstream
37 ```
3800000000000003940add `--help` to any command for more info about it
41···66- monitoring of the various tasks
67- health check pings
68- expose metrics/tracing
69-- read-only flag for mirror wrapper
70- bundle: write directly to s3-compatible object storage
71- helpers for automating periodic `bundle` runs
72
···36 --experimental-write-upstream
37 ```
3839+- Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream
40+41+ ```bash
42+ sudo allegedly wrap \
43+ --wrap "http://127.0.0.1:3000" \
44+ --acme-ipv6 \
45+ --acme-cache-path ./acme-cache \
46+ --acme-domain "plc.wtf" \
47+ --experimental-acme-domain "experimental.plc.wtf" \
48+ --experimental-write-upstream \
49+ --upstream "https://plc.wtf" \
50+ ```
51+5253add `--help` to any command for more info about it
54···79- monitoring of the various tasks
80- health check pings
81- expose metrics/tracing
82+- [x] read-only flag for mirror wrapper
83- bundle: write directly to s3-compatible object storage
84- helpers for automating periodic `bundle` runs
85
+34-6
src/bin/allegedly.rs
···1-use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream};
02use clap::{CommandFactory, Parser, Subcommand};
3-use std::{path::PathBuf, time::Instant};
4use tokio::fs::create_dir_all;
5use tokio::sync::mpsc;
6···48 Mirror {
49 #[command(flatten)]
50 args: mirror::Args,
00000000051 },
52 /// Poll an upstream PLC server and log new ops to stdout
53 Tail {
···57 },
58}
590000000000000060#[tokio::main]
61async fn main() -> anyhow::Result<()> {
62 let args = Cli::parse();
63 let matches = Cli::command().get_matches();
64 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???");
65- bin_init(name);
06667 let globals = args.globals.clone();
68···76 } => {
77 let mut url = globals.upstream;
78 url.set_path("/export");
079 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason
80 tokio::task::spawn(async move {
81- poll_upstream(Some(after), url, tx)
82 .await
83 .expect("to poll upstream")
84 });
···90 .await
91 .expect("to write bundles to output files");
92 }
93- Commands::Mirror { args } => mirror::run(globals, args).await?,
094 Commands::Tail { after } => {
95 let mut url = globals.upstream;
96 url.set_path("/export");
97 let start_at = after.or_else(|| Some(chrono::Utc::now()));
098 let (tx, rx) = mpsc::channel(1);
99 tokio::task::spawn(async move {
100- poll_upstream(start_at, url, tx)
101 .await
102 .expect("to poll upstream")
103 });
···19 client: Client,
20 plc: Url,
21 upstream: Url,
000000022 latest_at: CachedValue<Dt, GetLatestAt>,
23 upstream_status: CachedValue<PlcStatus, CheckUpstream>,
24- experimental: ExperimentalConf,
25}
2627#[handler]
28fn hello(
29 Data(State {
030 upstream,
31 experimental: exp,
32 ..
33 }): Data<&State>,
34 req: &Request,
35) -> String {
0000000000000000000000000000036 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) {
37 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(),
38 (_, None, _) => {
···42 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
43 }
44 (_, Some(d), _) => format!(
45- r#"\
46- - POST /* Rejected, but experimental upstream op forwarding is
47- available at `POST {d}/:did`!"#
48 ),
49 };
5051 format!(
52 r#"{}
53-54-This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
55-synchronizes a local PLC reference server instance[2] (why?[3]).
56-57-58-Configured upstream:
59-60- {upstream}
61-6263Available APIs:
64···177 Data(State {
178 plc,
179 client,
180- latest_at,
181- upstream_status,
182 ..
183 }): Data<&State>,
184) -> impl IntoResponse {
···187 if !ok {
188 overall_status = StatusCode::BAD_GATEWAY;
189 }
190- let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
191- if !ok {
192- overall_status = StatusCode::BAD_GATEWAY;
0000000000000000000000000000193 }
194- let latest = latest_at.get().await.ok();
195- (
196- overall_status,
197- Json(serde_json::json!({
198- "server": "allegedly (mirror)",
199- "version": env!("CARGO_PKG_VERSION"),
200- "wrapped_plc": wrapped_status,
201- "upstream_plc": upstream_status,
202- "latest_at": latest,
203- })),
204- )
205}
206207fn proxy_response(res: reqwest::Response) -> Response {
···225async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> {
226 let mut target = state.plc.clone();
227 target.set_path(req.uri().path());
0228 let wrapped_res = state
229 .client
230 .get(target)
···344 plc: Url,
345 listen: ListenConf,
346 experimental: ExperimentalConf,
347- db: Db,
348) -> anyhow::Result<&'static str> {
349 log::info!("starting server...");
350···355 .build()
356 .expect("reqwest client to build");
357358- let latest_at = CachedValue::new(GetLatestAt(db), Duration::from_secs(2));
359- let upstream_status = CachedValue::new(
360- CheckUpstream(upstream.clone(), client.clone()),
361- Duration::from_secs(6),
362- );
000363364 let state = State {
365 client,
366 plc,
367 upstream: upstream.clone(),
368- latest_at,
369- upstream_status,
370 experimental: experimental.clone(),
371 };
372373 let mut app = Route::new()
374 .at("/", get(hello))
375 .at("/favicon.ico", get(favicon))
376- .at("/_health", get(health));
0377378 if experimental.write_upstream {
379 log::info!("enabling experimental write forwarding to upstream");
···385 .with(GovernorMiddleware::new(did_limiter))
386 .with(GovernorMiddleware::new(ip_limiter));
387388- app = app.at("/:any", get(proxy).post(upstream_proxier));
389 } else {
390- app = app.at("/:any", get(proxy).post(nope));
391 }
392393 let app = app
···463 }
464465 let app = Route::new()
466- .at("/", get(oop_plz_be_secure))
467 .at("/favicon.ico", get(favicon))
0468 .with(Tracing);
469 Server::new(TcpListener::bind(if ipv6 {
470 "[::]:80"
···19 client: Client,
20 plc: Url,
21 upstream: Url,
22+ sync_info: Option<SyncInfo>,
23+ experimental: ExperimentalConf,
24+}
25+26+/// server info that only applies in mirror (synchronizing) mode
27+#[derive(Clone)]
28+struct SyncInfo {
29 latest_at: CachedValue<Dt, GetLatestAt>,
30 upstream_status: CachedValue<PlcStatus, CheckUpstream>,
031}
3233#[handler]
34fn hello(
35 Data(State {
36+ sync_info,
37 upstream,
38 experimental: exp,
39 ..
40 }): Data<&State>,
41 req: &Request,
42) -> String {
43+ // let mode = if sync_info.is_some() { "mirror" } else { "wrap" };
44+ let pre_info = if sync_info.is_some() {
45+ format!(
46+ r#"
47+This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and
48+synchronizes a local PLC reference server instance[2] (why?[3]).
49+50+51+Configured upstream:
52+53+ {upstream}
54+55+"#
56+ )
57+ } else {
58+ format!(
59+ r#"
60+This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse-
61+proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy.
62+63+64+Configured upstream (only used if experimental op forwarding is enabled):
65+66+ {upstream}
67+68+"#
69+ )
70+ };
71+72 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) {
73 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(),
74 (_, None, _) => {
···78 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string()
79 }
80 (_, Some(d), _) => format!(
81+ r#" - POST /* Rejected, but experimental upstream op forwarding is
82+ available at `POST https://{d}/:did`!"#
083 ),
84 };
8586 format!(
87 r#"{}
88+{pre_info}
000000008990Available APIs:
91···204 Data(State {
205 plc,
206 client,
207+ sync_info,
0208 ..
209 }): Data<&State>,
210) -> impl IntoResponse {
···213 if !ok {
214 overall_status = StatusCode::BAD_GATEWAY;
215 }
216+ if let Some(SyncInfo {
217+ latest_at,
218+ upstream_status,
219+ }) = sync_info
220+ {
221+ // mirror mode
222+ let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible");
223+ if !ok {
224+ overall_status = StatusCode::BAD_GATEWAY;
225+ }
226+ let latest = latest_at.get().await.ok();
227+ (
228+ overall_status,
229+ Json(serde_json::json!({
230+ "server": "allegedly (mirror)",
231+ "version": env!("CARGO_PKG_VERSION"),
232+ "wrapped_plc": wrapped_status,
233+ "upstream_plc": upstream_status,
234+ "latest_at": latest,
235+ })),
236+ )
237+ } else {
238+ // wrap mode
239+ (
240+ overall_status,
241+ Json(serde_json::json!({
242+ "server": "allegedly (mirror)",
243+ "version": env!("CARGO_PKG_VERSION"),
244+ "wrapped_plc": wrapped_status,
245+ })),
246+ )
247 }
00000000000248}
249250fn proxy_response(res: reqwest::Response) -> Response {
···268async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> {
269 let mut target = state.plc.clone();
270 target.set_path(req.uri().path());
271+ target.set_query(req.uri().query());
272 let wrapped_res = state
273 .client
274 .get(target)
···388 plc: Url,
389 listen: ListenConf,
390 experimental: ExperimentalConf,
391+ db: Option<Db>,
392) -> anyhow::Result<&'static str> {
393 log::info!("starting server...");
394···399 .build()
400 .expect("reqwest client to build");
401402+ // when `db` is None, we're running in wrap mode. no db access, no upstream sync
403+ let sync_info = db.map(|db| SyncInfo {
404+ latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)),
405+ upstream_status: CachedValue::new(
406+ CheckUpstream(upstream.clone(), client.clone()),
407+ Duration::from_secs(6),
408+ ),
409+ });
410411 let state = State {
412 client,
413 plc,
414 upstream: upstream.clone(),
415+ sync_info,
0416 experimental: experimental.clone(),
417 };
418419 let mut app = Route::new()
420 .at("/", get(hello))
421 .at("/favicon.ico", get(favicon))
422+ .at("/_health", get(health))
423+ .at("/export", get(proxy));
424425 if experimental.write_upstream {
426 log::info!("enabling experimental write forwarding to upstream");
···432 .with(GovernorMiddleware::new(did_limiter))
433 .with(GovernorMiddleware::new(ip_limiter));
434435+ app = app.at("/did:plc:*", get(proxy).post(upstream_proxier));
436 } else {
437+ app = app.at("/did:plc:*", get(proxy).post(nope));
438 }
439440 let app = app
···510 }
511512 let app = Route::new()
0513 .at("/favicon.ico", get(favicon))
514+ .nest("/", get(oop_plz_be_secure))
515 .with(Tracing);
516 Server::new(TcpListener::bind(if ipv6 {
517 "[::]:80"
+53-8
src/poll.rs
···4use thiserror::Error;
5use tokio::sync::mpsc;
67-// plc.directory ratelimit on /export is 500 per 5 mins
8-const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600);
9-10#[derive(Debug, Error)]
11pub enum GetPageError {
12 #[error(transparent)]
···54 }
55}
5657-/// PLC
58#[derive(Debug, PartialEq)]
59pub struct PageBoundaryState {
00060 pub last_at: Dt,
061 keys_at: Vec<OpKey>, // expected to ~always be length one
62}
6364-/// track keys at final createdAt to deduplicate the start of the next page
65impl PageBoundaryState {
066 pub fn new(page: &ExportPage) -> Option<Self> {
67 // grab the very last op
68 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···7879 Some(me)
80 }
000000081 fn apply_to_next(&mut self, page: &mut ExportPage) {
82 // walk ops forward, kicking previously-seen ops until created_at advances
83 let to_remove: Vec<usize> = page
···127 }
128}
129000130pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
131 log::trace!("Getting page: {url}");
132···141 .split('\n')
142 .filter_map(|s| {
143 serde_json::from_str::<Op>(s)
144- .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
0000145 .ok()
146 })
147 .collect();
···151 Ok((ExportPage { ops }, last_op))
152}
15300000000000000000000000000000154pub async fn poll_upstream(
155 after: Option<Dt>,
156 base: Url,
0157 dest: mpsc::Sender<ExportPage>,
158) -> anyhow::Result<&'static str> {
159- log::info!("starting upstream poller after {after:?}");
160- let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL);
161 let mut prev_last: Option<LastOp> = after.map(Into::into);
162 let mut boundary_state: Option<PageBoundaryState> = None;
163 loop {
···4use thiserror::Error;
5use tokio::sync::mpsc;
60007#[derive(Debug, Error)]
8pub enum GetPageError {
9 #[error(transparent)]
···51 }
52}
5354+/// State for removing duplicates ops between PLC export page boundaries
55#[derive(Debug, PartialEq)]
56pub struct PageBoundaryState {
57+ /// The previous page's last timestamp
58+ ///
59+ /// Duplicate ops from /export only occur for the same exact timestamp
60 pub last_at: Dt,
61+ /// The previous page's ops at its last timestamp
62 keys_at: Vec<OpKey>, // expected to ~always be length one
63}
64065impl PageBoundaryState {
66+ /// Initialize the boundary state with a PLC page
67 pub fn new(page: &ExportPage) -> Option<Self> {
68 // grab the very last op
69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
···7980 Some(me)
81 }
82+ /// Apply the deduplication and update state
83+ ///
84+ /// The beginning of the page will be modified to remove duplicates from the
85+ /// previous page.
86+ ///
87+ /// The end of the page is inspected to update the deduplicator state for
88+ /// the next page.
89 fn apply_to_next(&mut self, page: &mut ExportPage) {
90 // walk ops forward, kicking previously-seen ops until created_at advances
91 let to_remove: Vec<usize> = page
···135 }
136}
137138+/// Get one PLC export page
139+///
140+/// Extracts the final op so it can be used to fetch the following page
141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
142 log::trace!("Getting page: {url}");
143···152 .split('\n')
153 .filter_map(|s| {
154 serde_json::from_str::<Op>(s)
155+ .inspect_err(|e| {
156+ if !s.is_empty() {
157+ log::warn!("failed to parse op: {e} ({s})")
158+ }
159+ })
160 .ok()
161 })
162 .collect();
···166 Ok((ExportPage { ops }, last_op))
167}
168169+/// Poll an upstream PLC server for new ops
170+///
171+/// Pages of operations are written to the `dest` channel.
172+///
173+/// ```no_run
174+/// # #[tokio::main]
175+/// # async fn main() {
176+/// use allegedly::{ExportPage, Op, poll_upstream};
177+///
178+/// let after = Some(chrono::Utc::now());
179+/// let upstream = "https://plc.wtf/export".parse().unwrap();
180+/// let throttle = std::time::Duration::from_millis(300);
181+///
182+/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
183+/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
184+///
185+/// while let Some(ExportPage { ops }) = rx.recv().await {
186+/// println!("received {} plc ops", ops.len());
187+///
188+/// for Op { did, cid, operation, .. } in ops {
189+/// // in this example we're alerting when changes are found for one
190+/// // specific identity
191+/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
192+/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
193+/// }
194+/// }
195+/// }
196+/// # }
197+/// ```
198pub async fn poll_upstream(
199 after: Option<Dt>,
200 base: Url,
201+ throttle: Duration,
202 dest: mpsc::Sender<ExportPage>,
203) -> anyhow::Result<&'static str> {
204+ log::info!("starting upstream poller at {base} after {after:?}");
205+ let mut tick = tokio::time::interval(throttle);
206 let mut prev_last: Option<LastOp> = after.map(Into::into);
207 let mut boundary_state: Option<PageBoundaryState> = None;
208 loop {