Server tools to backfill, tail, mirror, and verify PLC logs

Compare changes

Choose any two refs to compare.

+942 -163
+164 -1
Cargo.lock
··· 28 28 29 29 [[package]] 30 30 name = "allegedly" 31 - version = "0.2.0" 31 + version = "0.3.3" 32 32 dependencies = [ 33 33 "anyhow", 34 34 "async-compression", ··· 39 39 "http-body-util", 40 40 "log", 41 41 "native-tls", 42 + "opentelemetry", 43 + "opentelemetry-otlp", 44 + "opentelemetry_sdk", 42 45 "poem", 43 46 "postgres-native-tls", 44 47 "reqwest", ··· 52 55 "tokio-postgres", 53 56 "tokio-stream", 54 57 "tokio-util", 58 + "tracing", 59 + "tracing-opentelemetry", 55 60 "tracing-subscriber", 56 61 ] 57 62 ··· 1584 1589 ] 1585 1590 1586 1591 [[package]] 1592 + name = "opentelemetry" 1593 + version = "0.30.0" 1594 + source = "registry+https://github.com/rust-lang/crates.io-index" 1595 + checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" 1596 + dependencies = [ 1597 + "futures-core", 1598 + "futures-sink", 1599 + "js-sys", 1600 + "pin-project-lite", 1601 + "thiserror 2.0.16", 1602 + "tracing", 1603 + ] 1604 + 1605 + [[package]] 1606 + name = "opentelemetry-http" 1607 + version = "0.30.0" 1608 + source = "registry+https://github.com/rust-lang/crates.io-index" 1609 + checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" 1610 + dependencies = [ 1611 + "async-trait", 1612 + "bytes", 1613 + "http", 1614 + "opentelemetry", 1615 + "reqwest", 1616 + ] 1617 + 1618 + [[package]] 1619 + name = "opentelemetry-otlp" 1620 + version = "0.30.0" 1621 + source = "registry+https://github.com/rust-lang/crates.io-index" 1622 + checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" 1623 + dependencies = [ 1624 + "http", 1625 + "opentelemetry", 1626 + "opentelemetry-http", 1627 + "opentelemetry-proto", 1628 + "opentelemetry_sdk", 1629 + "prost", 1630 + "reqwest", 1631 + "thiserror 2.0.16", 1632 + "tracing", 1633 + ] 1634 + 1635 + [[package]] 1636 + name = "opentelemetry-proto" 1637 + version = "0.30.0" 1638 + source = "registry+https://github.com/rust-lang/crates.io-index" 1639 + checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" 1640 + dependencies = [ 1641 + "opentelemetry", 1642 + "opentelemetry_sdk", 1643 + "prost", 1644 + "tonic", 1645 + ] 1646 + 1647 + [[package]] 1648 + name = "opentelemetry_sdk" 1649 + version = "0.30.0" 1650 + source = "registry+https://github.com/rust-lang/crates.io-index" 1651 + checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" 1652 + dependencies = [ 1653 + "futures-channel", 1654 + "futures-executor", 1655 + "futures-util", 1656 + "opentelemetry", 1657 + "percent-encoding", 1658 + "rand 0.9.2", 1659 + "serde_json", 1660 + "thiserror 2.0.16", 1661 + "tokio", 1662 + "tokio-stream", 1663 + ] 1664 + 1665 + [[package]] 1587 1666 name = "parking_lot" 1588 1667 version = "0.11.2" 1589 1668 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1666 1745 ] 1667 1746 1668 1747 [[package]] 1748 + name = "pin-project" 1749 + version = "1.1.10" 1750 + source = "registry+https://github.com/rust-lang/crates.io-index" 1751 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1752 + dependencies = [ 1753 + "pin-project-internal", 1754 + ] 1755 + 1756 + [[package]] 1757 + name = "pin-project-internal" 1758 + version = "1.1.10" 1759 + source = "registry+https://github.com/rust-lang/crates.io-index" 1760 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1761 + dependencies = [ 1762 + "proc-macro2", 1763 + "quote", 1764 + "syn", 1765 + ] 1766 + 1767 + [[package]] 1669 1768 name = "pin-project-lite" 1670 1769 version = "0.2.16" 1671 1770 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1837 1936 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1838 1937 dependencies = [ 1839 1938 "unicode-ident", 1939 + ] 1940 + 1941 + [[package]] 1942 + name = "prost" 1943 + version = "0.13.5" 1944 + source = "registry+https://github.com/rust-lang/crates.io-index" 1945 + checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 1946 + dependencies = [ 1947 + "bytes", 1948 + "prost-derive", 1949 + ] 1950 + 1951 + [[package]] 1952 + name = "prost-derive" 1953 + version = "0.13.5" 1954 + source = "registry+https://github.com/rust-lang/crates.io-index" 1955 + checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" 1956 + dependencies = [ 1957 + "anyhow", 1958 + "itertools", 1959 + "proc-macro2", 1960 + "quote", 1961 + "syn", 1840 1962 ] 1841 1963 1842 1964 [[package]] ··· 2057 2179 source = "registry+https://github.com/rust-lang/crates.io-index" 2058 2180 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2059 2181 dependencies = [ 2182 + "async-compression", 2060 2183 "base64", 2061 2184 "bytes", 2062 2185 "encoding_rs", 2186 + "futures-channel", 2063 2187 "futures-core", 2064 2188 "futures-util", 2065 2189 "h2", ··· 2802 2926 ] 2803 2927 2804 2928 [[package]] 2929 + name = "tonic" 2930 + version = "0.13.1" 2931 + source = "registry+https://github.com/rust-lang/crates.io-index" 2932 + checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" 2933 + dependencies = [ 2934 + "async-trait", 2935 + "base64", 2936 + "bytes", 2937 + "http", 2938 + "http-body", 2939 + "http-body-util", 2940 + "percent-encoding", 2941 + "pin-project", 2942 + "prost", 2943 + "tokio-stream", 2944 + "tower-layer", 2945 + "tower-service", 2946 + "tracing", 2947 + ] 2948 + 2949 + [[package]] 2805 2950 name = "tower" 2806 2951 version = "0.5.2" 2807 2952 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2887 3032 "log", 2888 3033 "once_cell", 2889 3034 "tracing-core", 3035 + ] 3036 + 3037 + [[package]] 3038 + name = "tracing-opentelemetry" 3039 + version = "0.31.0" 3040 + source = "registry+https://github.com/rust-lang/crates.io-index" 3041 + checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" 3042 + dependencies = [ 3043 + "js-sys", 3044 + "once_cell", 3045 + "opentelemetry", 3046 + "opentelemetry_sdk", 3047 + "smallvec", 3048 + "tracing", 3049 + "tracing-core", 3050 + "tracing-log", 3051 + "tracing-subscriber", 3052 + "web-time", 2890 3053 ] 2891 3054 2892 3055 [[package]]
+7 -2
Cargo.toml
··· 2 2 name = "allegedly" 3 3 description = "public ledger server tools and services (for the PLC)" 4 4 license = "MIT OR Apache-2.0" 5 - version = "0.2.1" 5 + version = "0.3.3" 6 6 edition = "2024" 7 7 default-run = "allegedly" 8 8 ··· 16 16 http-body-util = "0.1.3" 17 17 log = "0.4.28" 18 18 native-tls = "0.2.14" 19 + opentelemetry = "0.30.0" 20 + opentelemetry-otlp = { version = "0.30.0" } 21 + opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 19 22 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 23 postgres-native-tls = "0.5.1" 21 - reqwest = { version = "0.12.23", features = ["stream", "json"] } 24 + reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 22 25 reqwest-middleware = "0.4.2" 23 26 reqwest-retry = "0.7.0" 24 27 rustls = "0.23.32" ··· 29 32 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 30 33 tokio-stream = { version = "0.1.17", features = ["io-util"] } 31 34 tokio-util = { version = "0.7.16", features = ["compat"] } 35 + tracing = "0.1.41" 36 + tracing-opentelemetry = "0.31.0" 32 37 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
··· 1 + use allegedly::{ExportPage, poll_upstream}; 2 + 3 + #[tokio::main] 4 + async fn main() { 5 + // set to `None` to replay from the beginning of the PLC history 6 + let after = Some(chrono::Utc::now()); 7 + 8 + // the PLC server to poll for new ops 9 + let upstream = "https://plc.wtf/export".parse().unwrap(); 10 + 11 + // self-rate-limit (plc.directory's limit interval is 600ms) 12 + let throttle = std::time::Duration::from_millis(300); 13 + 14 + // pages are sent out of the poller via a tokio mpsc channel 15 + let (tx, mut rx) = tokio::sync::mpsc::channel(1); 16 + 17 + // spawn a tokio task to run the poller 18 + tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 19 + 20 + // receive pages of plc ops from the poller 21 + while let Some(ExportPage { ops }) = rx.recv().await { 22 + println!("received {} plc ops", ops.len()); 23 + 24 + for op in ops { 25 + // in this example we're alerting when changes are found for one 26 + // specific identity 27 + if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 28 + println!( 29 + "Update found for {}! cid={}\n -> operation: {}", 30 + op.did, 31 + op.cid, 32 + op.operation.get() 33 + ); 34 + } 35 + } 36 + } 37 + }
+25 -7
readme.md
··· 26 26 sudo allegedly mirror \ 27 27 --upstream "https://plc.directory" \ 28 28 --wrap "http://127.0.0.1:3000" \ 29 + --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \ 29 30 --acme-domain "plc.wtf" \ 31 + --acme-domain "alt.plc.wtf" \ 32 + --experimental-acme-domain "experimental.plc.wtf" \ 30 33 --acme-cache-path ./acme-cache \ 31 - --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 34 + --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \ 35 + --acme-ipv6 \ 36 + --experimental-write-upstream 37 + ``` 38 + 39 + - Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream 40 + 41 + ```bash 42 + sudo allegedly wrap \ 43 + --wrap "http://127.0.0.1:3000" \ 44 + --acme-ipv6 \ 45 + --acme-cache-path ./acme-cache \ 46 + --acme-domain "plc.wtf" \ 47 + --experimental-acme-domain "experimental.plc.wtf" \ 48 + --experimental-write-upstream \ 49 + --upstream "https://plc.wtf" \ 32 50 ``` 33 51 34 52 ··· 61 79 - monitoring of the various tasks 62 80 - health check pings 63 81 - expose metrics/tracing 64 - - read-only flag for mirror wrapper 82 + - [x] read-only flag for mirror wrapper 65 83 - bundle: write directly to s3-compatible object storage 66 84 - helpers for automating periodic `bundle` runs 67 85 68 86 69 87 ### new things 70 88 71 - - experimental: websocket version of /export 72 - - experimental: accept writes by forwarding them upstream 73 - - experimental: serve a tlog 74 - - experimental: embed a log database directly for fast and efficient mirroring 75 - - experimental: support multiple upstreams? 89 + - [ ] experimental: websocket version of /export 90 + - [x] experimental: accept writes by forwarding them upstream 91 + - [ ] experimental: serve a tlog 92 + - [ ] experimental: embed a log database directly for fast and efficient mirroring 93 + - [ ] experimental: support multiple upstreams? 76 94 77 95 - [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range 78 96 - [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
+34 -6
src/bin/allegedly.rs
··· 1 - use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 1 + use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2 + use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 3 use clap::{CommandFactory, Parser, Subcommand}; 3 - use std::{path::PathBuf, time::Instant}; 4 + use std::{path::PathBuf, time::Duration, time::Instant}; 4 5 use tokio::fs::create_dir_all; 5 6 use tokio::sync::mpsc; 6 7 ··· 48 49 Mirror { 49 50 #[command(flatten)] 50 51 args: mirror::Args, 52 + #[command(flatten)] 53 + instrumentation: InstrumentationArgs, 54 + }, 55 + /// Wrap any did-method-plc server, without syncing upstream (read-only) 56 + Wrap { 57 + #[command(flatten)] 58 + args: mirror::Args, 59 + #[command(flatten)] 60 + instrumentation: InstrumentationArgs, 51 61 }, 52 62 /// Poll an upstream PLC server and log new ops to stdout 53 63 Tail { ··· 57 67 }, 58 68 } 59 69 70 + impl Commands { 71 + fn enable_otel(&self) -> bool { 72 + match self { 73 + Commands::Mirror { 74 + instrumentation, .. 75 + } 76 + | Commands::Wrap { 77 + instrumentation, .. 78 + } => instrumentation.enable_opentelemetry, 79 + _ => false, 80 + } 81 + } 82 + } 83 + 60 84 #[tokio::main] 61 85 async fn main() -> anyhow::Result<()> { 62 86 let args = Cli::parse(); 63 87 let matches = Cli::command().get_matches(); 64 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 65 - bin_init(name); 89 + bin_init(args.command.enable_otel()); 90 + log::info!("{}", logo(name)); 66 91 67 92 let globals = args.globals.clone(); 68 93 ··· 76 101 } => { 77 102 let mut url = globals.upstream; 78 103 url.set_path("/export"); 104 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 79 105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 80 106 tokio::task::spawn(async move { 81 - poll_upstream(Some(after), url, tx) 107 + poll_upstream(Some(after), url, throttle, tx) 82 108 .await 83 109 .expect("to poll upstream") 84 110 }); ··· 90 116 .await 91 117 .expect("to write bundles to output files"); 92 118 } 93 - Commands::Mirror { args } => mirror::run(globals, args).await?, 119 + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 94 121 Commands::Tail { after } => { 95 122 let mut url = globals.upstream; 96 123 url.set_path("/export"); 97 124 let start_at = after.or_else(|| Some(chrono::Utc::now())); 125 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 98 126 let (tx, rx) = mpsc::channel(1); 99 127 tokio::task::spawn(async move { 100 - poll_upstream(start_at, url, tx) 128 + poll_upstream(start_at, url, throttle, tx) 101 129 .await 102 130 .expect("to poll upstream") 103 131 });
+15 -7
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 + bin::{GlobalArgs, bin_init}, 4 + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 4 5 }; 5 6 use clap::Parser; 6 7 use reqwest::Url; 7 - use std::path::PathBuf; 8 + use std::{path::PathBuf, time::Duration}; 8 9 use tokio::{ 9 10 sync::{mpsc, oneshot}, 10 11 task::JoinSet, ··· 53 54 } 54 55 55 56 pub async fn run( 56 - GlobalArgs { upstream }: GlobalArgs, 57 + GlobalArgs { 58 + upstream, 59 + upstream_throttle_ms, 60 + }: GlobalArgs, 57 61 Args { 58 62 http, 59 63 dir, ··· 98 102 } 99 103 let mut upstream = upstream; 100 104 upstream.set_path("/export"); 101 - tasks.spawn(poll_upstream(None, upstream, poll_tx)); 105 + let throttle = Duration::from_millis(upstream_throttle_ms); 106 + tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 102 107 tasks.spawn(full_pages(poll_out, full_tx)); 103 108 tasks.spawn(pages_to_stdout(full_out, None)); 104 109 } else { ··· 128 133 129 134 // and the catch-up source... 130 135 if let Some(last) = found_last_out { 136 + let throttle = Duration::from_millis(upstream_throttle_ms); 131 137 tasks.spawn(async move { 132 138 let mut upstream = upstream; 133 139 upstream.set_path("/export"); 134 - poll_upstream(last.await?, upstream, poll_tx).await 140 + 141 + poll_upstream(last.await?, upstream, throttle, poll_tx).await 135 142 }); 136 143 } 137 144 ··· 193 200 #[tokio::main] 194 201 async fn main() -> anyhow::Result<()> { 195 202 let args = CliArgs::parse(); 196 - bin_init("backfill"); 203 + bin_init(false); 204 + log::info!("{}", logo("backfill")); 197 205 run(args.globals, args.args).await?; 198 206 Ok(()) 199 207 }
+28
src/bin/instrumentation/mod.rs
··· 1 + use opentelemetry::trace::TracerProvider as _; 2 + use opentelemetry_otlp::{Protocol, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::registry::LookupSpan; 7 + 8 + pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer> 9 + where 10 + S: Subscriber + for<'span> LookupSpan<'span>, 11 + { 12 + let exporter = opentelemetry_otlp::SpanExporter::builder() 13 + .with_http() 14 + .with_protocol(Protocol::HttpBinary) 15 + .build() 16 + .expect("to build otel otlp exporter"); 17 + 18 + let provider = SdkTracerProvider::builder() 19 + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( 20 + 1.0, 21 + )))) 22 + .with_id_generator(RandomIdGenerator::default()) 23 + .with_batch_exporter(exporter) 24 + .build(); 25 + 26 + let tracer = provider.tracer("tracing-otel-subscriber"); 27 + tracing_opentelemetry::layer().with_tracer(tracer) 28 + }
+82 -23
src/bin/mirror.rs
··· 1 - use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 1 + use allegedly::{ 2 + Db, ExperimentalConf, ListenConf, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + logo, pages_to_pg, poll_upstream, serve, 5 + }; 2 6 use clap::Parser; 3 7 use reqwest::Url; 4 - use std::{net::SocketAddr, path::PathBuf}; 8 + use std::{net::SocketAddr, path::PathBuf, time::Duration}; 5 9 use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 6 10 7 11 #[derive(Debug, clap::Args)] ··· 11 15 wrap: Url, 12 16 /// the wrapped did-method-plc server's database (write access required) 13 17 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 14 - wrap_pg: Url, 18 + wrap_pg: Option<Url>, 15 19 /// path to tls cert for the wrapped postgres db, if needed 16 20 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 17 21 wrap_pg_cert: Option<PathBuf>, ··· 39 43 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 44 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 45 acme_directory_url: Url, 46 + /// try to listen for ipv6 47 + #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")] 48 + acme_ipv6: bool, 49 + /// only accept experimental requests at this hostname 50 + /// 51 + /// a cert will be provisioned for it from letsencrypt. if you're not using 52 + /// acme (eg., behind a tls-terminating reverse proxy), open a feature request. 53 + #[arg( 54 + long, 55 + requires("acme_domain"), 56 + env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN" 57 + )] 58 + experimental_acme_domain: Option<String>, 59 + /// accept writes! by forwarding them upstream 60 + #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 61 + experimental_write_upstream: bool, 42 62 } 43 63 44 64 pub async fn run( 45 - GlobalArgs { upstream }: GlobalArgs, 65 + GlobalArgs { 66 + upstream, 67 + upstream_throttle_ms, 68 + }: GlobalArgs, 46 69 Args { 47 70 wrap, 48 71 wrap_pg, ··· 51 74 acme_domain, 52 75 acme_cache_path, 53 76 acme_directory_url, 77 + acme_ipv6, 78 + experimental_acme_domain, 79 + experimental_write_upstream, 54 80 }: Args, 81 + sync: bool, 55 82 ) -> anyhow::Result<()> { 56 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 57 - 58 - // TODO: allow starting up with polling backfill from beginning? 59 - log::debug!("getting the latest op from the db..."); 60 - let latest = db 61 - .get_latest() 62 - .await? 63 - .expect("there to be at least one op in the db. did you backfill?"); 64 - 65 83 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 66 84 (_, false, Some(cache_path)) => { 67 - log::info!("configuring acme for https at {acme_domain:?}..."); 68 85 create_dir_all(&cache_path).await?; 86 + let mut domains = acme_domain.clone(); 87 + if let Some(ref experimental_domain) = experimental_acme_domain { 88 + domains.push(experimental_domain.clone()) 89 + } 90 + log::info!("configuring acme for https at {domains:?}..."); 69 91 ListenConf::Acme { 70 - domains: acme_domain, 92 + domains, 71 93 cache_path, 72 94 directory_url: acme_directory_url.to_string(), 95 + ipv6: acme_ipv6, 73 96 } 74 97 } 75 98 (bind, true, None) => ListenConf::Bind(bind), 76 99 (_, _, _) => unreachable!(), 77 100 }; 78 101 102 + let experimental_conf = ExperimentalConf { 103 + acme_domain: experimental_acme_domain, 104 + write_upstream: experimental_write_upstream, 105 + }; 106 + 79 107 let mut tasks = JoinSet::new(); 80 108 81 - let (send_page, recv_page) = mpsc::channel(8); 109 + let db = if sync { 110 + let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 + "a wrapped reference postgres must be provided to sync" 112 + ))?; 113 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 82 114 83 - let mut poll_url = upstream.clone(); 84 - poll_url.set_path("/export"); 115 + // TODO: allow starting up with polling backfill from beginning? 116 + log::debug!("getting the latest op from the db..."); 117 + let latest = db 118 + .get_latest() 119 + .await? 120 + .expect("there to be at least one op in the db. did you backfill?"); 85 121 86 - tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 87 - tasks.spawn(pages_to_pg(db.clone(), recv_page)); 88 - tasks.spawn(serve(upstream, wrap, listen_conf)); 122 + let (send_page, recv_page) = mpsc::channel(8); 123 + 124 + let mut poll_url = upstream.clone(); 125 + poll_url.set_path("/export"); 126 + let throttle = Duration::from_millis(upstream_throttle_ms); 127 + 128 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 130 + Some(db) 131 + } else { 132 + None 133 + }; 134 + 135 + tasks.spawn(serve( 136 + upstream, 137 + wrap, 138 + listen_conf, 139 + experimental_conf, 140 + db.clone(), 141 + )); 89 142 90 143 while let Some(next) = tasks.join_next().await { 91 144 match next { ··· 115 168 #[command(flatten)] 116 169 globals: GlobalArgs, 117 170 #[command(flatten)] 171 + instrumentation: InstrumentationArgs, 172 + #[command(flatten)] 118 173 args: Args, 174 + /// Run the mirror in wrap mode, no upstream synchronization (read-only) 175 + #[arg(long, action)] 176 + wrap_mode: bool, 119 177 } 120 178 121 179 #[allow(dead_code)] 122 180 #[tokio::main] 123 181 async fn main() -> anyhow::Result<()> { 124 182 let args = CliArgs::parse(); 125 - bin_init("mirror"); 126 - run(args.globals, args.args).await?; 183 + bin_init(args.instrumentation.enable_opentelemetry); 184 + log::info!("{}", logo("mirror")); 185 + run(args.globals, args.args, !args.wrap_mode).await?; 127 186 Ok(()) 128 187 }
+44
src/bin/mod.rs
··· 1 + mod instrumentation; 2 + 1 3 use reqwest::Url; 4 + use tracing_subscriber::layer::SubscriberExt; 2 5 3 6 #[derive(Debug, Clone, clap::Args)] 4 7 pub struct GlobalArgs { ··· 6 9 #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 10 #[clap(default_value = "https://plc.directory")] 8 11 pub upstream: Url, 12 + /// Self-rate-limit upstream request interval 13 + /// 14 + /// plc.directory's rate limiting is 500 requests per 5 mins (600ms) 15 + #[arg(long, global = true, env = "ALLEGEDLY_UPSTREAM_THROTTLE_MS")] 16 + #[clap(default_value = "600")] 17 + pub upstream_throttle_ms: u64, 18 + } 19 + 20 + #[derive(Debug, Default, Clone, clap::Args)] 21 + pub struct InstrumentationArgs { 22 + /// Export traces to an OTLP collector 23 + /// 24 + /// Configure the colletctor via standard env vars: 25 + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" 26 + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" 27 + /// - `OTEL_SERVICE_NAME` eg "my-app" 28 + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] 29 + pub enable_opentelemetry: bool, 30 + } 31 + 32 + pub fn bin_init(enable_otlp: bool) { 33 + let filter = tracing_subscriber::EnvFilter::builder() 34 + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) 35 + .from_env_lossy(); 36 + 37 + let stderr_log = tracing_subscriber::fmt::layer() 38 + .with_writer(std::io::stderr) 39 + .pretty(); 40 + 41 + let otel = if enable_otlp { 42 + Some(instrumentation::otel_layer()) 43 + } else { 44 + None 45 + }; 46 + 47 + let subscriber = tracing_subscriber::Registry::default() 48 + .with(filter) 49 + .with(stderr_log) 50 + .with(otel); 51 + 52 + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); 9 53 } 10 54 11 55 #[allow(dead_code)]
+73
src/cached_value.rs
··· 1 + use std::error::Error; 2 + use std::sync::Arc; 3 + use std::time::{Duration, Instant}; 4 + use tokio::sync::Mutex; 5 + 6 + pub trait Fetcher<T> { 7 + fn fetch(&self) -> impl Future<Output = Result<T, Box<dyn Error>>>; 8 + } 9 + 10 + #[derive(Debug)] 11 + struct ExpiringValue<T: Clone> { 12 + value: T, 13 + expires: Instant, 14 + } 15 + 16 + impl<T: Clone> ExpiringValue<T> { 17 + fn get(&self, now: Instant) -> Option<T> { 18 + if now <= self.expires { 19 + log::trace!("returning val (fresh for {:?})", self.expires - now); 20 + Some(self.value.clone()) 21 + } else { 22 + log::trace!("hiding expired val"); 23 + None 24 + } 25 + } 26 + } 27 + 28 + // TODO: generic over the fetcher's actual error type 29 + #[derive(Clone)] 30 + pub struct CachedValue<T: Clone, F: Fetcher<T>> { 31 + latest: Arc<Mutex<Option<ExpiringValue<T>>>>, 32 + fetcher: F, 33 + validitiy: Duration, 34 + } 35 + 36 + impl<T: Clone, F: Fetcher<T>> CachedValue<T, F> { 37 + pub fn new(f: F, validitiy: Duration) -> Self { 38 + Self { 39 + latest: Default::default(), 40 + fetcher: f, 41 + validitiy, 42 + } 43 + } 44 + pub async fn get(&self) -> Result<T, Box<dyn Error>> { 45 + let now = Instant::now(); 46 + return self.get_impl(now).await; 47 + } 48 + async fn get_impl(&self, now: Instant) -> Result<T, Box<dyn Error>> { 49 + let mut val = self.latest.lock().await; 50 + if let Some(v) = val.as_ref().and_then(|v| v.get(now)) { 51 + return Ok(v); 52 + } 53 + log::debug!( 54 + "value {}, fetching...", 55 + if val.is_some() { 56 + "expired" 57 + } else { 58 + "not present" 59 + } 60 + ); 61 + let new = self 62 + .fetcher 63 + .fetch() 64 + .await 65 + .inspect_err(|e| log::warn!("value fetch failed, next access will retry: {e}"))?; 66 + log::debug!("fetched ok, saving a copy for cache."); 67 + *val = Some(ExpiringValue { 68 + value: new.clone(), 69 + expires: now + self.validitiy, 70 + }); 71 + Ok(new) 72 + } 73 + }
+1
src/client.rs
··· 12 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 13 let inner = Client::builder() 14 14 .user_agent(UA) 15 + .gzip(true) 15 16 .build() 16 17 .expect("reqwest client to build"); 17 18
+4 -15
src/lib.rs
··· 2 2 use tokio::sync::{mpsc, oneshot}; 3 3 4 4 mod backfill; 5 + mod cached_value; 5 6 mod client; 6 7 mod mirror; 7 8 mod plc_pg; ··· 12 13 pub mod bin; 13 14 14 15 pub use backfill::backfill; 16 + pub use cached_value::{CachedValue, Fetcher}; 15 17 pub use client::{CLIENT, UA}; 16 - pub use mirror::{ListenConf, serve}; 18 + pub use mirror::{ExperimentalConf, ListenConf, serve}; 17 19 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 18 20 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 19 - pub use ratelimit::GovernorMiddleware; 21 + pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 20 22 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 21 23 22 24 pub type Dt = chrono::DateTime<chrono::Utc>; ··· 143 145 env!("CARGO_PKG_VERSION"), 144 146 ) 145 147 } 146 - 147 - pub fn bin_init(name: &str) { 148 - if std::env::var_os("RUST_LOG").is_none() { 149 - unsafe { std::env::set_var("RUST_LOG", "info") }; 150 - } 151 - let filter = tracing_subscriber::EnvFilter::from_default_env(); 152 - tracing_subscriber::fmt() 153 - .with_writer(std::io::stderr) 154 - .with_env_filter(filter) 155 - .init(); 156 - 157 - log::info!("{}", logo(name)); 158 - }
+287 -61
src/mirror.rs
··· 1 - use crate::{GovernorMiddleware, UA, logo}; 1 + use crate::{ 2 + CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 3 + }; 2 4 use futures::TryStreamExt; 3 5 use governor::Quota; 4 6 use poem::{ 5 - Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, 6 - handler, 7 - http::StatusCode, 7 + Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 8 + get, handler, 9 + http::{StatusCode, header::USER_AGENT}, 8 10 listener::{Listener, TcpListener, acme::AutoCert}, 9 11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 10 - web::{Data, Json}, 12 + web::{Data, Json, Path}, 11 13 }; 12 14 use reqwest::{Client, Url}; 13 15 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 14 16 15 - #[derive(Debug, Clone)] 17 + #[derive(Clone)] 16 18 struct State { 17 19 client: Client, 18 20 plc: Url, 19 21 upstream: Url, 22 + sync_info: Option<SyncInfo>, 23 + experimental: ExperimentalConf, 20 24 } 21 25 22 - #[handler] 23 - fn hello(Data(State { upstream, .. }): Data<&State>) -> String { 24 - format!( 25 - r#"{} 26 + /// server info that only applies in mirror (synchronizing) mode 27 + #[derive(Clone)] 28 + struct SyncInfo { 29 + latest_at: CachedValue<Dt, GetLatestAt>, 30 + upstream_status: CachedValue<PlcStatus, CheckUpstream>, 31 + } 26 32 33 + #[handler] 34 + fn hello( 35 + Data(State { 36 + sync_info, 37 + upstream, 38 + experimental: exp, 39 + .. 40 + }): Data<&State>, 41 + req: &Request, 42 + ) -> String { 43 + // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 + let pre_info = if sync_info.is_some() { 45 + format!( 46 + r#" 27 47 This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 28 48 synchronizes a local PLC reference server instance[2] (why?[3]). 29 49 ··· 32 52 33 53 {upstream} 34 54 55 + "# 56 + ) 57 + } else { 58 + format!( 59 + r#" 60 + This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse- 61 + proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy. 62 + 63 + 64 + Configured upstream (only used if experimental op forwarding is enabled): 65 + 66 + {upstream} 67 + 68 + "# 69 + ) 70 + }; 71 + 72 + let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 73 + (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 74 + (_, None, _) => { 75 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 76 + } 77 + (_, Some(d), Some(f)) if f == d => { 78 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 79 + } 80 + (_, Some(d), _) => format!( 81 + r#" - POST /* Rejected, but experimental upstream op forwarding is 82 + available at `POST https://{d}/:did`!"# 83 + ), 84 + }; 85 + 86 + format!( 87 + r#"{} 88 + {pre_info} 35 89 36 90 Available APIs: 37 91 ··· 40 94 - GET /* Proxies to wrapped server; see PLC API docs: 41 95 https://web.plc.directory/api/redoc 42 96 43 - - POST /* Always rejected. This is a mirror. 97 + tip: try `GET /{{did}}` to resolve an identity 44 98 99 + {post_info} 45 100 46 - tip: try `GET /{{did}}` to resolve an identity 47 101 102 + Allegedly is a suite of open-source CLI tools from for working with PLC logs, 103 + from microcosm: 48 104 49 - Allegedly is a suit of open-source CLI tools for working with PLC logs: 105 + https://tangled.org/@microcosm.blue/Allegedly 50 106 51 - https://tangled.org/@microcosm.blue/Allegedly 107 + https://microcosm.blue 52 108 53 109 54 110 [1] https://web.plc.directory ··· 64 120 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 65 121 } 66 122 67 - fn failed_to_reach_wrapped() -> String { 123 + fn failed_to_reach_named(name: &str) -> String { 68 124 format!( 69 125 r#"{} 70 126 71 - Failed to reach the wrapped reference PLC server. Sorry. 127 + Failed to reach the {name} server. Sorry. 72 128 "#, 73 129 logo("mirror 502 :( ") 74 130 ) 75 131 } 76 132 77 - async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) { 133 + fn bad_create_op(reason: &str) -> Response { 134 + Response::builder() 135 + .status(StatusCode::BAD_REQUEST) 136 + .body(format!( 137 + r#"{} 138 + 139 + NooOOOooooo: {reason} 140 + "#, 141 + logo("mirror 400 >:( ") 142 + )) 143 + } 144 + 145 + type PlcStatus = (bool, serde_json::Value); 146 + 147 + async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 78 148 use serde_json::json; 79 149 80 150 let mut url = url.clone(); ··· 110 180 } 111 181 } 112 182 183 + #[derive(Clone)] 184 + struct GetLatestAt(Db); 185 + impl Fetcher<Dt> for GetLatestAt { 186 + async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 187 + let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 188 + "expected to find at least one thing in the db" 189 + ))?; 190 + Ok(now) 191 + } 192 + } 193 + 194 + #[derive(Clone)] 195 + struct CheckUpstream(Url, Client); 196 + impl Fetcher<PlcStatus> for CheckUpstream { 197 + async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 198 + Ok(plc_status(&self.0, &self.1).await) 199 + } 200 + } 201 + 113 202 #[handler] 114 203 async fn health( 115 204 Data(State { 116 205 plc, 117 206 client, 118 - upstream, 207 + sync_info, 208 + .. 119 209 }): Data<&State>, 120 210 ) -> impl IntoResponse { 121 211 let mut overall_status = StatusCode::OK; ··· 123 213 if !ok { 124 214 overall_status = StatusCode::BAD_GATEWAY; 125 215 } 126 - let (ok, upstream_status) = plc_status(upstream, client).await; 127 - if !ok { 128 - overall_status = StatusCode::BAD_GATEWAY; 216 + if let Some(SyncInfo { 217 + latest_at, 218 + upstream_status, 219 + }) = sync_info 220 + { 221 + // mirror mode 222 + let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 + if !ok { 224 + overall_status = StatusCode::BAD_GATEWAY; 225 + } 226 + let latest = latest_at.get().await.ok(); 227 + ( 228 + overall_status, 229 + Json(serde_json::json!({ 230 + "server": "allegedly (mirror)", 231 + "version": env!("CARGO_PKG_VERSION"), 232 + "wrapped_plc": wrapped_status, 233 + "upstream_plc": upstream_status, 234 + "latest_at": latest, 235 + })), 236 + ) 237 + } else { 238 + // wrap mode 239 + ( 240 + overall_status, 241 + Json(serde_json::json!({ 242 + "server": "allegedly (mirror)", 243 + "version": env!("CARGO_PKG_VERSION"), 244 + "wrapped_plc": wrapped_status, 245 + })), 246 + ) 129 247 } 130 - ( 131 - overall_status, 132 - Json(serde_json::json!({ 133 - "server": "allegedly (mirror)", 134 - "version": env!("CARGO_PKG_VERSION"), 135 - "wrapped_plc": wrapped_status, 136 - "upstream_plc": upstream_status, 137 - })), 138 - ) 248 + } 249 + 250 + fn proxy_response(res: reqwest::Response) -> Response { 251 + let http_res: poem::http::Response<reqwest::Body> = res.into(); 252 + let (parts, reqw_body) = http_res.into_parts(); 253 + 254 + let parts = poem::ResponseParts { 255 + status: parts.status, 256 + version: parts.version, 257 + headers: parts.headers, 258 + extensions: parts.extensions, 259 + }; 260 + 261 + let body = http_body_util::BodyDataStream::new(reqw_body) 262 + .map_err(|e| std::io::Error::other(Box::new(e))); 263 + 264 + Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 139 265 } 140 266 141 267 #[handler] 142 - async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> { 268 + async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 143 269 let mut target = state.plc.clone(); 144 270 target.set_path(req.uri().path()); 145 - let upstream_res = state 271 + target.set_query(req.uri().query()); 272 + let wrapped_res = state 146 273 .client 147 274 .get(target) 148 275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server ··· 151 278 .await 152 279 .map_err(|e| { 153 280 log::error!("upstream req fail: {e}"); 154 - Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 281 + Error::from_string( 282 + failed_to_reach_named("wrapped reference PLC"), 283 + StatusCode::BAD_GATEWAY, 284 + ) 155 285 })?; 156 286 157 - let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 158 - let (parts, reqw_body) = http_res.into_parts(); 287 + Ok(proxy_response(wrapped_res)) 288 + } 159 289 160 - let parts = poem::ResponseParts { 161 - status: parts.status, 162 - version: parts.version, 163 - headers: parts.headers, 164 - extensions: parts.extensions, 165 - }; 290 + #[handler] 291 + async fn forward_create_op_upstream( 292 + Data(State { 293 + upstream, 294 + client, 295 + experimental, 296 + .. 297 + }): Data<&State>, 298 + Path(did): Path<String>, 299 + req: &Request, 300 + body: Body, 301 + ) -> Result<Response> { 302 + if let Some(expected_domain) = &experimental.acme_domain { 303 + let Some(found_host) = req.uri().host() else { 304 + return Ok(bad_create_op(&format!( 305 + "missing `Host` header, expected {expected_domain:?} for experimental requests." 306 + ))); 307 + }; 308 + if found_host != expected_domain { 309 + return Ok(bad_create_op(&format!( 310 + "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}" 311 + ))); 312 + } 313 + } 166 314 167 - let body = http_body_util::BodyDataStream::new(reqw_body) 168 - .map_err(|e| std::io::Error::other(Box::new(e))); 315 + // adjust proxied headers 316 + let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 317 + log::trace!("original request headers: {headers:?}"); 318 + headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 319 + let client_ua = headers 320 + .get(USER_AGENT) 321 + .map(|h| h.to_str().unwrap()) 322 + .unwrap_or("unknown"); 323 + headers.insert( 324 + USER_AGENT, 325 + format!("{UA} (forwarding from {client_ua:?})") 326 + .parse() 327 + .unwrap(), 328 + ); 329 + log::trace!("adjusted request headers: {headers:?}"); 169 330 170 - Ok(Response::from_parts( 171 - parts, 172 - poem::Body::from_bytes_stream(body), 173 - )) 331 + let mut target = upstream.clone(); 332 + target.set_path(&did); 333 + let upstream_res = client 334 + .post(target) 335 + .timeout(Duration::from_secs(15)) // be a little generous 336 + .headers(headers) 337 + .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 338 + .send() 339 + .await 340 + .map_err(|e| { 341 + log::warn!("upstream write fail: {e}"); 342 + Error::from_string( 343 + failed_to_reach_named("upstream PLC"), 344 + StatusCode::BAD_GATEWAY, 345 + ) 346 + })?; 347 + 348 + Ok(proxy_response(upstream_res)) 174 349 } 175 350 176 351 #[handler] ··· 182 357 183 358 Sorry, this server does not accept POST requests. 184 359 185 - You may wish to try upstream: {upstream} 360 + You may wish to try sending that to our upstream: {upstream}. 361 + 362 + If you operate this server, try running with `--experimental-write-upstream`. 186 363 "#, 187 364 logo("mirror (nope)") 188 365 ), ··· 195 372 domains: Vec<String>, 196 373 cache_path: PathBuf, 197 374 directory_url: String, 375 + ipv6: bool, 198 376 }, 199 377 Bind(SocketAddr), 200 378 } 201 379 202 - pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 380 + #[derive(Debug, Clone)] 381 + pub struct ExperimentalConf { 382 + pub acme_domain: Option<String>, 383 + pub write_upstream: bool, 384 + } 385 + 386 + pub async fn serve( 387 + upstream: Url, 388 + plc: Url, 389 + listen: ListenConf, 390 + experimental: ExperimentalConf, 391 + db: Option<Db>, 392 + ) -> anyhow::Result<&'static str> { 203 393 log::info!("starting server..."); 204 394 205 395 // not using crate CLIENT: don't want the retries etc ··· 209 399 .build() 210 400 .expect("reqwest client to build"); 211 401 402 + // when `db` is None, we're running in wrap mode. no db access, no upstream sync 403 + let sync_info = db.map(|db| SyncInfo { 404 + latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 405 + upstream_status: CachedValue::new( 406 + CheckUpstream(upstream.clone(), client.clone()), 407 + Duration::from_secs(6), 408 + ), 409 + }); 410 + 212 411 let state = State { 213 412 client, 214 413 plc, 215 414 upstream: upstream.clone(), 415 + sync_info, 416 + experimental: experimental.clone(), 216 417 }; 217 418 218 - let app = Route::new() 419 + let mut app = Route::new() 219 420 .at("/", get(hello)) 220 421 .at("/favicon.ico", get(favicon)) 221 422 .at("/_health", get(health)) 222 - .at("/:any", get(proxy).post(nope)) 423 + .at("/export", get(proxy)); 424 + 425 + if experimental.write_upstream { 426 + log::info!("enabling experimental write forwarding to upstream"); 427 + 428 + let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 429 + let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); 430 + 431 + let upstream_proxier = forward_create_op_upstream 432 + .with(GovernorMiddleware::new(did_limiter)) 433 + .with(GovernorMiddleware::new(ip_limiter)); 434 + 435 + app = app.at("/did:plc:*", get(proxy).post(upstream_proxier)); 436 + } else { 437 + app = app.at("/did:plc:*", get(proxy).post(nope)); 438 + } 439 + 440 + let app = app 223 441 .with(AddData::new(state)) 224 442 .with(Cors::new().allow_credentials(false)) 225 443 .with(Compression::new()) 226 - .with(GovernorMiddleware::new(Quota::per_minute( 444 + .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute( 227 445 3000.try_into().expect("ratelimit middleware to build"), 228 - ))) 446 + )))) 229 447 .with(CatchPanic::new()) 230 448 .with(Tracing); 231 449 ··· 234 452 domains, 235 453 cache_path, 236 454 directory_url, 455 + ipv6, 237 456 } => { 238 457 rustls::crypto::aws_lc_rs::default_provider() 239 458 .install_default() ··· 247 466 } 248 467 let auto_cert = auto_cert.build().expect("acme config to build"); 249 468 250 - let notice_task = tokio::task::spawn(run_insecure_notice()); 251 - let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 469 + log::trace!("auto_cert: {auto_cert:?}"); 470 + 471 + let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 472 + let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 473 + let app_res = run(app, listener.acme(auto_cert)).await; 252 474 log::warn!("server task ended, aborting insecure server task..."); 253 475 notice_task.abort(); 254 476 app_res?; ··· 272 494 } 273 495 274 496 /// kick off a tiny little server on a tokio task to tell people to use 443 275 - async fn run_insecure_notice() -> Result<(), std::io::Error> { 497 + async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 276 498 #[handler] 277 499 fn oop_plz_be_secure() -> (StatusCode, String) { 278 500 ( ··· 288 510 } 289 511 290 512 let app = Route::new() 291 - .at("/", get(oop_plz_be_secure)) 292 513 .at("/favicon.ico", get(favicon)) 514 + .nest("/", get(oop_plz_be_secure)) 293 515 .with(Tracing); 294 - Server::new(TcpListener::bind("0.0.0.0:80")) 295 - .name("allegedly (mirror:80 helper)") 296 - .run(app) 297 - .await 516 + Server::new(TcpListener::bind(if ipv6 { 517 + "[::]:80" 518 + } else { 519 + "0.0.0.0:80" 520 + })) 521 + .name("allegedly (mirror:80 helper)") 522 + .run(app) 523 + .await 298 524 }
+53 -8
src/poll.rs
··· 4 4 use thiserror::Error; 5 5 use tokio::sync::mpsc; 6 6 7 - // plc.directory ratelimit on /export is 500 per 5 mins 8 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 - 10 7 #[derive(Debug, Error)] 11 8 pub enum GetPageError { 12 9 #[error(transparent)] ··· 54 51 } 55 52 } 56 53 57 - /// PLC 54 + /// State for removing duplicates ops between PLC export page boundaries 58 55 #[derive(Debug, PartialEq)] 59 56 pub struct PageBoundaryState { 57 + /// The previous page's last timestamp 58 + /// 59 + /// Duplicate ops from /export only occur for the same exact timestamp 60 60 pub last_at: Dt, 61 + /// The previous page's ops at its last timestamp 61 62 keys_at: Vec<OpKey>, // expected to ~always be length one 62 63 } 63 64 64 - /// track keys at final createdAt to deduplicate the start of the next page 65 65 impl PageBoundaryState { 66 + /// Initialize the boundary state with a PLC page 66 67 pub fn new(page: &ExportPage) -> Option<Self> { 67 68 // grab the very last op 68 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; ··· 78 79 79 80 Some(me) 80 81 } 82 + /// Apply the deduplication and update state 83 + /// 84 + /// The beginning of the page will be modified to remove duplicates from the 85 + /// previous page. 86 + /// 87 + /// The end of the page is inspected to update the deduplicator state for 88 + /// the next page. 81 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 82 90 // walk ops forward, kicking previously-seen ops until created_at advances 83 91 let to_remove: Vec<usize> = page ··· 127 135 } 128 136 } 129 137 138 + /// Get one PLC export page 139 + /// 140 + /// Extracts the final op so it can be used to fetch the following page 130 141 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 131 142 log::trace!("Getting page: {url}"); 132 143 ··· 141 152 .split('\n') 142 153 .filter_map(|s| { 143 154 serde_json::from_str::<Op>(s) 144 - .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 155 + .inspect_err(|e| { 156 + if !s.is_empty() { 157 + log::warn!("failed to parse op: {e} ({s})") 158 + } 159 + }) 145 160 .ok() 146 161 }) 147 162 .collect(); ··· 151 166 Ok((ExportPage { ops }, last_op)) 152 167 } 153 168 169 + /// Poll an upstream PLC server for new ops 170 + /// 171 + /// Pages of operations are written to the `dest` channel. 172 + /// 173 + /// ```no_run 174 + /// # #[tokio::main] 175 + /// # async fn main() { 176 + /// use allegedly::{ExportPage, Op, poll_upstream}; 177 + /// 178 + /// let after = Some(chrono::Utc::now()); 179 + /// let upstream = "https://plc.wtf/export".parse().unwrap(); 180 + /// let throttle = std::time::Duration::from_millis(300); 181 + /// 182 + /// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 183 + /// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 184 + /// 185 + /// while let Some(ExportPage { ops }) = rx.recv().await { 186 + /// println!("received {} plc ops", ops.len()); 187 + /// 188 + /// for Op { did, cid, operation, .. } in ops { 189 + /// // in this example we're alerting when changes are found for one 190 + /// // specific identity 191 + /// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 192 + /// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 193 + /// } 194 + /// } 195 + /// } 196 + /// # } 197 + /// ``` 154 198 pub async fn poll_upstream( 155 199 after: Option<Dt>, 156 200 base: Url, 201 + throttle: Duration, 157 202 dest: mpsc::Sender<ExportPage>, 158 203 ) -> anyhow::Result<&'static str> { 159 - log::info!("starting upstream poller after {after:?}"); 160 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 204 + log::info!("starting upstream poller at {base} after {after:?}"); 205 + let mut tick = tokio::time::interval(throttle); 161 206 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 207 let mut boundary_state: Option<PageBoundaryState> = None; 163 208 loop {
+88 -33
src/ratelimit.rs
··· 8 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 9 use std::{ 10 10 convert::TryInto, 11 + hash::Hash, 11 12 net::{IpAddr, Ipv6Addr}, 12 13 sync::{Arc, LazyLock}, 13 14 time::Duration, ··· 20 21 type IP6_56 = [u8; 7]; 21 22 type IP6_48 = [u8; 6]; 22 23 24 + pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static { 25 + fn extract_key(&self, req: &Request) -> Result<K>; 26 + fn check_key(&self, ip: &K) -> Result<(), Duration>; 27 + fn housekeep(&self); 28 + } 29 + 23 30 fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 24 31 let period = quota.replenish_interval() / factor; 25 32 let burst = quota ··· 30 37 } 31 38 32 39 #[derive(Debug)] 33 - struct IpLimiters { 40 + pub struct CreatePlcOpLimiter { 41 + limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>, 42 + } 43 + 44 + impl CreatePlcOpLimiter { 45 + pub fn new(quota: Quota) -> Self { 46 + Self { 47 + limiter: RateLimiter::keyed(quota), 48 + } 49 + } 50 + } 51 + 52 + /// this must be used with an endpoint with a single path param for the did 53 + impl Limiter<String> for CreatePlcOpLimiter { 54 + fn extract_key(&self, req: &Request) -> Result<String> { 55 + let (did,) = req.path_params::<(String,)>()?; 56 + Ok(did) 57 + } 58 + fn check_key(&self, did: &String) -> Result<(), Duration> { 59 + self.limiter 60 + .check_key(did) 61 + .map_err(|e| e.wait_time_from(CLOCK.now())) 62 + } 63 + fn housekeep(&self) { 64 + log::debug!( 65 + "limiter size before housekeeping: {} dids", 66 + self.limiter.len() 67 + ); 68 + self.limiter.retain_recent(); 69 + } 70 + } 71 + 72 + #[derive(Debug)] 73 + pub struct IpLimiters { 34 74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 35 75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 36 76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, ··· 44 84 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 85 } 46 86 } 47 - pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 87 + } 88 + 89 + impl Limiter<IpAddr> for IpLimiters { 90 + fn extract_key(&self, req: &Request) -> Result<IpAddr> { 91 + Ok(req 92 + .remote_addr() 93 + .as_socket_addr() 94 + .expect("failed to get request's remote addr") // TODO 95 + .ip()) 96 + } 97 + fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> { 48 98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 49 99 match ip { 50 - addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 100 + addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf), 51 101 IpAddr::V6(a) => { 52 102 // always check all limiters 53 103 let check_ip = self ··· 74 124 } 75 125 } 76 126 } 127 + fn housekeep(&self) { 128 + log::debug!( 129 + "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 130 + self.per_ip.len(), 131 + self.ip6_56.len(), 132 + self.ip6_48.len(), 133 + ); 134 + self.per_ip.retain_recent(); 135 + self.ip6_56.retain_recent(); 136 + self.ip6_48.retain_recent(); 137 + } 77 138 } 78 139 79 140 /// Once the rate limit has been reached, the middleware will respond with 80 141 /// status code 429 (too many requests) and a `Retry-After` header with the amount 81 142 /// of time that needs to pass before another request will be allowed. 82 - #[derive(Debug)] 83 - pub struct GovernorMiddleware { 143 + // #[derive(Debug)] 144 + pub struct GovernorMiddleware<K> { 84 145 #[allow(dead_code)] 85 146 stop_on_drop: oneshot::Sender<()>, 86 - limiters: Arc<IpLimiters>, 147 + limiters: Arc<dyn Limiter<K>>, 87 148 } 88 149 89 - impl GovernorMiddleware { 150 + impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> { 90 151 /// Limit request rates 91 152 /// 92 153 /// a little gross but this spawns a tokio task for housekeeping: 93 154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 94 - pub fn new(quota: Quota) -> Self { 95 - let limiters = Arc::new(IpLimiters::new(quota)); 155 + pub fn new(limiters: impl Limiter<K>) -> Self { 156 + let limiters = Arc::new(limiters); 96 157 let (stop_on_drop, mut stopped) = oneshot::channel(); 97 158 tokio::task::spawn({ 98 159 let limiters = limiters.clone(); ··· 102 163 _ = &mut stopped => break, 103 164 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 104 165 }; 105 - log::debug!( 106 - "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 107 - limiters.per_ip.len(), 108 - limiters.ip6_56.len(), 109 - limiters.ip6_48.len(), 110 - ); 111 - limiters.per_ip.retain_recent(); 112 - limiters.ip6_56.retain_recent(); 113 - limiters.ip6_48.retain_recent(); 166 + limiters.housekeep(); 114 167 } 115 168 } 116 169 }); ··· 121 174 } 122 175 } 123 176 124 - impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 125 - type Output = GovernorMiddlewareImpl<E>; 177 + impl<E, K> Middleware<E> for GovernorMiddleware<K> 178 + where 179 + E: Endpoint, 180 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 181 + { 182 + type Output = GovernorMiddlewareImpl<E, K>; 126 183 fn transform(&self, ep: E) -> Self::Output { 127 184 GovernorMiddlewareImpl { 128 185 ep, ··· 131 188 } 132 189 } 133 190 134 - pub struct GovernorMiddlewareImpl<E> { 191 + pub struct GovernorMiddlewareImpl<E, K> { 135 192 ep: E, 136 - limiters: Arc<IpLimiters>, 193 + limiters: Arc<dyn Limiter<K>>, 137 194 } 138 195 139 - impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 196 + impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K> 197 + where 198 + E: Endpoint, 199 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 200 + { 140 201 type Output = E::Output; 141 202 142 203 async fn call(&self, req: Request) -> Result<Self::Output> { 143 - let remote = req 144 - .remote_addr() 145 - .as_socket_addr() 146 - .expect("failed to get request's remote addr") // TODO 147 - .ip(); 204 + let key = self.limiters.extract_key(&req)?; 148 205 149 - log::trace!("remote: {remote}"); 150 - 151 - match self.limiters.check_key(remote) { 206 + match self.limiters.check_key(&key) { 152 207 Ok(_) => { 153 - log::debug!("allowing remote {remote}"); 208 + log::debug!("allowing key {key:?}"); 154 209 self.ep.call(req).await 155 210 } 156 211 Err(d) => { 157 212 let wait_time = d.as_secs(); 158 213 159 - log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 214 + log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 160 215 161 216 let res = Response::builder() 162 217 .status(StatusCode::TOO_MANY_REQUESTS)