Server tools to backfill, tail, mirror, and verify PLC logs

Compare changes

Choose any two refs to compare.

+527 -99
+164 -1
Cargo.lock
··· 28 28 29 29 [[package]] 30 30 name = "allegedly" 31 - version = "0.3.0" 31 + version = "0.3.3" 32 32 dependencies = [ 33 33 "anyhow", 34 34 "async-compression", ··· 39 39 "http-body-util", 40 40 "log", 41 41 "native-tls", 42 + "opentelemetry", 43 + "opentelemetry-otlp", 44 + "opentelemetry_sdk", 42 45 "poem", 43 46 "postgres-native-tls", 44 47 "reqwest", ··· 52 55 "tokio-postgres", 53 56 "tokio-stream", 54 57 "tokio-util", 58 + "tracing", 59 + "tracing-opentelemetry", 55 60 "tracing-subscriber", 56 61 ] 57 62 ··· 1584 1589 ] 1585 1590 1586 1591 [[package]] 1592 + name = "opentelemetry" 1593 + version = "0.30.0" 1594 + source = "registry+https://github.com/rust-lang/crates.io-index" 1595 + checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" 1596 + dependencies = [ 1597 + "futures-core", 1598 + "futures-sink", 1599 + "js-sys", 1600 + "pin-project-lite", 1601 + "thiserror 2.0.16", 1602 + "tracing", 1603 + ] 1604 + 1605 + [[package]] 1606 + name = "opentelemetry-http" 1607 + version = "0.30.0" 1608 + source = "registry+https://github.com/rust-lang/crates.io-index" 1609 + checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" 1610 + dependencies = [ 1611 + "async-trait", 1612 + "bytes", 1613 + "http", 1614 + "opentelemetry", 1615 + "reqwest", 1616 + ] 1617 + 1618 + [[package]] 1619 + name = "opentelemetry-otlp" 1620 + version = "0.30.0" 1621 + source = "registry+https://github.com/rust-lang/crates.io-index" 1622 + checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" 1623 + dependencies = [ 1624 + "http", 1625 + "opentelemetry", 1626 + "opentelemetry-http", 1627 + "opentelemetry-proto", 1628 + "opentelemetry_sdk", 1629 + "prost", 1630 + "reqwest", 1631 + "thiserror 2.0.16", 1632 + "tracing", 1633 + ] 1634 + 1635 + [[package]] 1636 + name = "opentelemetry-proto" 1637 + version = "0.30.0" 1638 + source = "registry+https://github.com/rust-lang/crates.io-index" 1639 + checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" 1640 + dependencies = [ 1641 + "opentelemetry", 1642 + "opentelemetry_sdk", 1643 + "prost", 1644 + "tonic", 1645 + ] 1646 + 1647 + [[package]] 1648 + name = "opentelemetry_sdk" 1649 + version = "0.30.0" 1650 + source = "registry+https://github.com/rust-lang/crates.io-index" 1651 + checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" 1652 + dependencies = [ 1653 + "futures-channel", 1654 + "futures-executor", 1655 + "futures-util", 1656 + "opentelemetry", 1657 + "percent-encoding", 1658 + "rand 0.9.2", 1659 + "serde_json", 1660 + "thiserror 2.0.16", 1661 + "tokio", 1662 + "tokio-stream", 1663 + ] 1664 + 1665 + [[package]] 1587 1666 name = "parking_lot" 1588 1667 version = "0.11.2" 1589 1668 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1666 1745 ] 1667 1746 1668 1747 [[package]] 1748 + name = "pin-project" 1749 + version = "1.1.10" 1750 + source = "registry+https://github.com/rust-lang/crates.io-index" 1751 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1752 + dependencies = [ 1753 + "pin-project-internal", 1754 + ] 1755 + 1756 + [[package]] 1757 + name = "pin-project-internal" 1758 + version = "1.1.10" 1759 + source = "registry+https://github.com/rust-lang/crates.io-index" 1760 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1761 + dependencies = [ 1762 + "proc-macro2", 1763 + "quote", 1764 + "syn", 1765 + ] 1766 + 1767 + [[package]] 1669 1768 name = "pin-project-lite" 1670 1769 version = "0.2.16" 1671 1770 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1837 1936 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1838 1937 dependencies = [ 1839 1938 "unicode-ident", 1939 + ] 1940 + 1941 + [[package]] 1942 + name = "prost" 1943 + version = "0.13.5" 1944 + source = "registry+https://github.com/rust-lang/crates.io-index" 1945 + checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 1946 + dependencies = [ 1947 + "bytes", 1948 + "prost-derive", 1949 + ] 1950 + 1951 + [[package]] 1952 + name = "prost-derive" 1953 + version = "0.13.5" 1954 + source = "registry+https://github.com/rust-lang/crates.io-index" 1955 + checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" 1956 + dependencies = [ 1957 + "anyhow", 1958 + "itertools", 1959 + "proc-macro2", 1960 + "quote", 1961 + "syn", 1840 1962 ] 1841 1963 1842 1964 [[package]] ··· 2057 2179 source = "registry+https://github.com/rust-lang/crates.io-index" 2058 2180 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2059 2181 dependencies = [ 2182 + "async-compression", 2060 2183 "base64", 2061 2184 "bytes", 2062 2185 "encoding_rs", 2186 + "futures-channel", 2063 2187 "futures-core", 2064 2188 "futures-util", 2065 2189 "h2", ··· 2802 2926 ] 2803 2927 2804 2928 [[package]] 2929 + name = "tonic" 2930 + version = "0.13.1" 2931 + source = "registry+https://github.com/rust-lang/crates.io-index" 2932 + checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" 2933 + dependencies = [ 2934 + "async-trait", 2935 + "base64", 2936 + "bytes", 2937 + "http", 2938 + "http-body", 2939 + "http-body-util", 2940 + "percent-encoding", 2941 + "pin-project", 2942 + "prost", 2943 + "tokio-stream", 2944 + "tower-layer", 2945 + "tower-service", 2946 + "tracing", 2947 + ] 2948 + 2949 + [[package]] 2805 2950 name = "tower" 2806 2951 version = "0.5.2" 2807 2952 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2887 3032 "log", 2888 3033 "once_cell", 2889 3034 "tracing-core", 3035 + ] 3036 + 3037 + [[package]] 3038 + name = "tracing-opentelemetry" 3039 + version = "0.31.0" 3040 + source = "registry+https://github.com/rust-lang/crates.io-index" 3041 + checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" 3042 + dependencies = [ 3043 + "js-sys", 3044 + "once_cell", 3045 + "opentelemetry", 3046 + "opentelemetry_sdk", 3047 + "smallvec", 3048 + "tracing", 3049 + "tracing-core", 3050 + "tracing-log", 3051 + "tracing-subscriber", 3052 + "web-time", 2890 3053 ] 2891 3054 2892 3055 [[package]]
+7 -2
Cargo.toml
··· 2 2 name = "allegedly" 3 3 description = "public ledger server tools and services (for the PLC)" 4 4 license = "MIT OR Apache-2.0" 5 - version = "0.3.0" 5 + version = "0.3.3" 6 6 edition = "2024" 7 7 default-run = "allegedly" 8 8 ··· 16 16 http-body-util = "0.1.3" 17 17 log = "0.4.28" 18 18 native-tls = "0.2.14" 19 + opentelemetry = "0.30.0" 20 + opentelemetry-otlp = { version = "0.30.0" } 21 + opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 19 22 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 23 postgres-native-tls = "0.5.1" 21 - reqwest = { version = "0.12.23", features = ["stream", "json"] } 24 + reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 22 25 reqwest-middleware = "0.4.2" 23 26 reqwest-retry = "0.7.0" 24 27 rustls = "0.23.32" ··· 29 32 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 30 33 tokio-stream = { version = "0.1.17", features = ["io-util"] } 31 34 tokio-util = { version = "0.7.16", features = ["compat"] } 35 + tracing = "0.1.41" 36 + tracing-opentelemetry = "0.31.0" 32 37 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
··· 1 + use allegedly::{ExportPage, poll_upstream}; 2 + 3 + #[tokio::main] 4 + async fn main() { 5 + // set to `None` to replay from the beginning of the PLC history 6 + let after = Some(chrono::Utc::now()); 7 + 8 + // the PLC server to poll for new ops 9 + let upstream = "https://plc.wtf/export".parse().unwrap(); 10 + 11 + // self-rate-limit (plc.directory's limit interval is 600ms) 12 + let throttle = std::time::Duration::from_millis(300); 13 + 14 + // pages are sent out of the poller via a tokio mpsc channel 15 + let (tx, mut rx) = tokio::sync::mpsc::channel(1); 16 + 17 + // spawn a tokio task to run the poller 18 + tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 19 + 20 + // receive pages of plc ops from the poller 21 + while let Some(ExportPage { ops }) = rx.recv().await { 22 + println!("received {} plc ops", ops.len()); 23 + 24 + for op in ops { 25 + // in this example we're alerting when changes are found for one 26 + // specific identity 27 + if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 28 + println!( 29 + "Update found for {}! cid={}\n -> operation: {}", 30 + op.did, 31 + op.cid, 32 + op.operation.get() 33 + ); 34 + } 35 + } 36 + } 37 + }
+14 -1
readme.md
··· 36 36 --experimental-write-upstream 37 37 ``` 38 38 39 + - Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream 40 + 41 + ```bash 42 + sudo allegedly wrap \ 43 + --wrap "http://127.0.0.1:3000" \ 44 + --acme-ipv6 \ 45 + --acme-cache-path ./acme-cache \ 46 + --acme-domain "plc.wtf" \ 47 + --experimental-acme-domain "experimental.plc.wtf" \ 48 + --experimental-write-upstream \ 49 + --upstream "https://plc.wtf" \ 50 + ``` 51 + 39 52 40 53 add `--help` to any command for more info about it 41 54 ··· 66 79 - monitoring of the various tasks 67 80 - health check pings 68 81 - expose metrics/tracing 69 - - read-only flag for mirror wrapper 82 + - [x] read-only flag for mirror wrapper 70 83 - bundle: write directly to s3-compatible object storage 71 84 - helpers for automating periodic `bundle` runs 72 85
+34 -6
src/bin/allegedly.rs
··· 1 - use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 1 + use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2 + use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 3 use clap::{CommandFactory, Parser, Subcommand}; 3 - use std::{path::PathBuf, time::Instant}; 4 + use std::{path::PathBuf, time::Duration, time::Instant}; 4 5 use tokio::fs::create_dir_all; 5 6 use tokio::sync::mpsc; 6 7 ··· 48 49 Mirror { 49 50 #[command(flatten)] 50 51 args: mirror::Args, 52 + #[command(flatten)] 53 + instrumentation: InstrumentationArgs, 54 + }, 55 + /// Wrap any did-method-plc server, without syncing upstream (read-only) 56 + Wrap { 57 + #[command(flatten)] 58 + args: mirror::Args, 59 + #[command(flatten)] 60 + instrumentation: InstrumentationArgs, 51 61 }, 52 62 /// Poll an upstream PLC server and log new ops to stdout 53 63 Tail { ··· 57 67 }, 58 68 } 59 69 70 + impl Commands { 71 + fn enable_otel(&self) -> bool { 72 + match self { 73 + Commands::Mirror { 74 + instrumentation, .. 75 + } 76 + | Commands::Wrap { 77 + instrumentation, .. 78 + } => instrumentation.enable_opentelemetry, 79 + _ => false, 80 + } 81 + } 82 + } 83 + 60 84 #[tokio::main] 61 85 async fn main() -> anyhow::Result<()> { 62 86 let args = Cli::parse(); 63 87 let matches = Cli::command().get_matches(); 64 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 65 - bin_init(name); 89 + bin_init(args.command.enable_otel()); 90 + log::info!("{}", logo(name)); 66 91 67 92 let globals = args.globals.clone(); 68 93 ··· 76 101 } => { 77 102 let mut url = globals.upstream; 78 103 url.set_path("/export"); 104 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 79 105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 80 106 tokio::task::spawn(async move { 81 - poll_upstream(Some(after), url, tx) 107 + poll_upstream(Some(after), url, throttle, tx) 82 108 .await 83 109 .expect("to poll upstream") 84 110 }); ··· 90 116 .await 91 117 .expect("to write bundles to output files"); 92 118 } 93 - Commands::Mirror { args } => mirror::run(globals, args).await?, 119 + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 94 121 Commands::Tail { after } => { 95 122 let mut url = globals.upstream; 96 123 url.set_path("/export"); 97 124 let start_at = after.or_else(|| Some(chrono::Utc::now())); 125 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 98 126 let (tx, rx) = mpsc::channel(1); 99 127 tokio::task::spawn(async move { 100 - poll_upstream(start_at, url, tx) 128 + poll_upstream(start_at, url, throttle, tx) 101 129 .await 102 130 .expect("to poll upstream") 103 131 });
+15 -7
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 + bin::{GlobalArgs, bin_init}, 4 + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 4 5 }; 5 6 use clap::Parser; 6 7 use reqwest::Url; 7 - use std::path::PathBuf; 8 + use std::{path::PathBuf, time::Duration}; 8 9 use tokio::{ 9 10 sync::{mpsc, oneshot}, 10 11 task::JoinSet, ··· 53 54 } 54 55 55 56 pub async fn run( 56 - GlobalArgs { upstream }: GlobalArgs, 57 + GlobalArgs { 58 + upstream, 59 + upstream_throttle_ms, 60 + }: GlobalArgs, 57 61 Args { 58 62 http, 59 63 dir, ··· 98 102 } 99 103 let mut upstream = upstream; 100 104 upstream.set_path("/export"); 101 - tasks.spawn(poll_upstream(None, upstream, poll_tx)); 105 + let throttle = Duration::from_millis(upstream_throttle_ms); 106 + tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 102 107 tasks.spawn(full_pages(poll_out, full_tx)); 103 108 tasks.spawn(pages_to_stdout(full_out, None)); 104 109 } else { ··· 128 133 129 134 // and the catch-up source... 130 135 if let Some(last) = found_last_out { 136 + let throttle = Duration::from_millis(upstream_throttle_ms); 131 137 tasks.spawn(async move { 132 138 let mut upstream = upstream; 133 139 upstream.set_path("/export"); 134 - poll_upstream(last.await?, upstream, poll_tx).await 140 + 141 + poll_upstream(last.await?, upstream, throttle, poll_tx).await 135 142 }); 136 143 } 137 144 ··· 193 200 #[tokio::main] 194 201 async fn main() -> anyhow::Result<()> { 195 202 let args = CliArgs::parse(); 196 - bin_init("backfill"); 203 + bin_init(false); 204 + log::info!("{}", logo("backfill")); 197 205 run(args.globals, args.args).await?; 198 206 Ok(()) 199 207 }
+28
src/bin/instrumentation/mod.rs
··· 1 + use opentelemetry::trace::TracerProvider as _; 2 + use opentelemetry_otlp::{Protocol, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::registry::LookupSpan; 7 + 8 + pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer> 9 + where 10 + S: Subscriber + for<'span> LookupSpan<'span>, 11 + { 12 + let exporter = opentelemetry_otlp::SpanExporter::builder() 13 + .with_http() 14 + .with_protocol(Protocol::HttpBinary) 15 + .build() 16 + .expect("to build otel otlp exporter"); 17 + 18 + let provider = SdkTracerProvider::builder() 19 + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( 20 + 1.0, 21 + )))) 22 + .with_id_generator(RandomIdGenerator::default()) 23 + .with_batch_exporter(exporter) 24 + .build(); 25 + 26 + let tracer = provider.tracer("tracing-otel-subscriber"); 27 + tracing_opentelemetry::layer().with_tracer(tracer) 28 + }
+42 -20
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 - Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve, 2 + Db, ExperimentalConf, ListenConf, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + logo, pages_to_pg, poll_upstream, serve, 3 5 }; 4 6 use clap::Parser; 5 7 use reqwest::Url; 6 - use std::{net::SocketAddr, path::PathBuf}; 8 + use std::{net::SocketAddr, path::PathBuf, time::Duration}; 7 9 use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 8 10 9 11 #[derive(Debug, clap::Args)] ··· 13 15 wrap: Url, 14 16 /// the wrapped did-method-plc server's database (write access required) 15 17 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 16 - wrap_pg: Url, 18 + wrap_pg: Option<Url>, 17 19 /// path to tls cert for the wrapped postgres db, if needed 18 20 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 19 21 wrap_pg_cert: Option<PathBuf>, ··· 60 62 } 61 63 62 64 pub async fn run( 63 - GlobalArgs { upstream }: GlobalArgs, 65 + GlobalArgs { 66 + upstream, 67 + upstream_throttle_ms, 68 + }: GlobalArgs, 64 69 Args { 65 70 wrap, 66 71 wrap_pg, ··· 73 78 experimental_acme_domain, 74 79 experimental_write_upstream, 75 80 }: Args, 81 + sync: bool, 76 82 ) -> anyhow::Result<()> { 77 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 78 - 79 - // TODO: allow starting up with polling backfill from beginning? 80 - log::debug!("getting the latest op from the db..."); 81 - let latest = db 82 - .get_latest() 83 - .await? 84 - .expect("there to be at least one op in the db. did you backfill?"); 85 - 86 83 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 87 84 (_, false, Some(cache_path)) => { 88 85 create_dir_all(&cache_path).await?; ··· 109 106 110 107 let mut tasks = JoinSet::new(); 111 108 112 - let (send_page, recv_page) = mpsc::channel(8); 109 + let db = if sync { 110 + let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 + "a wrapped reference postgres must be provided to sync" 112 + ))?; 113 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 113 114 114 - let mut poll_url = upstream.clone(); 115 - poll_url.set_path("/export"); 115 + // TODO: allow starting up with polling backfill from beginning? 116 + log::debug!("getting the latest op from the db..."); 117 + let latest = db 118 + .get_latest() 119 + .await? 120 + .expect("there to be at least one op in the db. did you backfill?"); 116 121 117 - tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 118 - tasks.spawn(pages_to_pg(db.clone(), recv_page)); 122 + let (send_page, recv_page) = mpsc::channel(8); 123 + 124 + let mut poll_url = upstream.clone(); 125 + poll_url.set_path("/export"); 126 + let throttle = Duration::from_millis(upstream_throttle_ms); 127 + 128 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 130 + Some(db) 131 + } else { 132 + None 133 + }; 134 + 119 135 tasks.spawn(serve( 120 136 upstream, 121 137 wrap, ··· 152 168 #[command(flatten)] 153 169 globals: GlobalArgs, 154 170 #[command(flatten)] 171 + instrumentation: InstrumentationArgs, 172 + #[command(flatten)] 155 173 args: Args, 174 + /// Run the mirror in wrap mode, no upstream synchronization (read-only) 175 + #[arg(long, action)] 176 + wrap_mode: bool, 156 177 } 157 178 158 179 #[allow(dead_code)] 159 180 #[tokio::main] 160 181 async fn main() -> anyhow::Result<()> { 161 182 let args = CliArgs::parse(); 162 - bin_init("mirror"); 163 - run(args.globals, args.args).await?; 183 + bin_init(args.instrumentation.enable_opentelemetry); 184 + log::info!("{}", logo("mirror")); 185 + run(args.globals, args.args, !args.wrap_mode).await?; 164 186 Ok(()) 165 187 }
+44
src/bin/mod.rs
··· 1 + mod instrumentation; 2 + 1 3 use reqwest::Url; 4 + use tracing_subscriber::layer::SubscriberExt; 2 5 3 6 #[derive(Debug, Clone, clap::Args)] 4 7 pub struct GlobalArgs { ··· 6 9 #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 10 #[clap(default_value = "https://plc.directory")] 8 11 pub upstream: Url, 12 + /// Self-rate-limit upstream request interval 13 + /// 14 + /// plc.directory's rate limiting is 500 requests per 5 mins (600ms) 15 + #[arg(long, global = true, env = "ALLEGEDLY_UPSTREAM_THROTTLE_MS")] 16 + #[clap(default_value = "600")] 17 + pub upstream_throttle_ms: u64, 18 + } 19 + 20 + #[derive(Debug, Default, Clone, clap::Args)] 21 + pub struct InstrumentationArgs { 22 + /// Export traces to an OTLP collector 23 + /// 24 + /// Configure the colletctor via standard env vars: 25 + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" 26 + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" 27 + /// - `OTEL_SERVICE_NAME` eg "my-app" 28 + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] 29 + pub enable_opentelemetry: bool, 30 + } 31 + 32 + pub fn bin_init(enable_otlp: bool) { 33 + let filter = tracing_subscriber::EnvFilter::builder() 34 + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) 35 + .from_env_lossy(); 36 + 37 + let stderr_log = tracing_subscriber::fmt::layer() 38 + .with_writer(std::io::stderr) 39 + .pretty(); 40 + 41 + let otel = if enable_otlp { 42 + Some(instrumentation::otel_layer()) 43 + } else { 44 + None 45 + }; 46 + 47 + let subscriber = tracing_subscriber::Registry::default() 48 + .with(filter) 49 + .with(stderr_log) 50 + .with(otel); 51 + 52 + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); 9 53 } 10 54 11 55 #[allow(dead_code)]
+1
src/client.rs
··· 12 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 13 let inner = Client::builder() 14 14 .user_agent(UA) 15 + .gzip(true) 15 16 .build() 16 17 .expect("reqwest client to build"); 17 18
-13
src/lib.rs
··· 145 145 env!("CARGO_PKG_VERSION"), 146 146 ) 147 147 } 148 - 149 - pub fn bin_init(name: &str) { 150 - if std::env::var_os("RUST_LOG").is_none() { 151 - unsafe { std::env::set_var("RUST_LOG", "info") }; 152 - } 153 - let filter = tracing_subscriber::EnvFilter::from_default_env(); 154 - tracing_subscriber::fmt() 155 - .with_writer(std::io::stderr) 156 - .with_env_filter(filter) 157 - .init(); 158 - 159 - log::info!("{}", logo(name)); 160 - }
+88 -41
src/mirror.rs
··· 19 19 client: Client, 20 20 plc: Url, 21 21 upstream: Url, 22 + sync_info: Option<SyncInfo>, 23 + experimental: ExperimentalConf, 24 + } 25 + 26 + /// server info that only applies in mirror (synchronizing) mode 27 + #[derive(Clone)] 28 + struct SyncInfo { 22 29 latest_at: CachedValue<Dt, GetLatestAt>, 23 30 upstream_status: CachedValue<PlcStatus, CheckUpstream>, 24 - experimental: ExperimentalConf, 25 31 } 26 32 27 33 #[handler] 28 34 fn hello( 29 35 Data(State { 36 + sync_info, 30 37 upstream, 31 38 experimental: exp, 32 39 .. 33 40 }): Data<&State>, 34 41 req: &Request, 35 42 ) -> String { 43 + // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 + let pre_info = if sync_info.is_some() { 45 + format!( 46 + r#" 47 + This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 48 + synchronizes a local PLC reference server instance[2] (why?[3]). 49 + 50 + 51 + Configured upstream: 52 + 53 + {upstream} 54 + 55 + "# 56 + ) 57 + } else { 58 + format!( 59 + r#" 60 + This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse- 61 + proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy. 62 + 63 + 64 + Configured upstream (only used if experimental op forwarding is enabled): 65 + 66 + {upstream} 67 + 68 + "# 69 + ) 70 + }; 71 + 36 72 let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 37 73 (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 38 74 (_, None, _) => { ··· 42 78 " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 43 79 } 44 80 (_, Some(d), _) => format!( 45 - r#"\ 46 - - POST /* Rejected, but experimental upstream op forwarding is 47 - available at `POST {d}/:did`!"# 81 + r#" - POST /* Rejected, but experimental upstream op forwarding is 82 + available at `POST https://{d}/:did`!"# 48 83 ), 49 84 }; 50 85 51 86 format!( 52 87 r#"{} 53 - 54 - This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 55 - synchronizes a local PLC reference server instance[2] (why?[3]). 56 - 57 - 58 - Configured upstream: 59 - 60 - {upstream} 61 - 88 + {pre_info} 62 89 63 90 Available APIs: 64 91 ··· 177 204 Data(State { 178 205 plc, 179 206 client, 180 - latest_at, 181 - upstream_status, 207 + sync_info, 182 208 .. 183 209 }): Data<&State>, 184 210 ) -> impl IntoResponse { ··· 187 213 if !ok { 188 214 overall_status = StatusCode::BAD_GATEWAY; 189 215 } 190 - let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 191 - if !ok { 192 - overall_status = StatusCode::BAD_GATEWAY; 216 + if let Some(SyncInfo { 217 + latest_at, 218 + upstream_status, 219 + }) = sync_info 220 + { 221 + // mirror mode 222 + let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 + if !ok { 224 + overall_status = StatusCode::BAD_GATEWAY; 225 + } 226 + let latest = latest_at.get().await.ok(); 227 + ( 228 + overall_status, 229 + Json(serde_json::json!({ 230 + "server": "allegedly (mirror)", 231 + "version": env!("CARGO_PKG_VERSION"), 232 + "wrapped_plc": wrapped_status, 233 + "upstream_plc": upstream_status, 234 + "latest_at": latest, 235 + })), 236 + ) 237 + } else { 238 + // wrap mode 239 + ( 240 + overall_status, 241 + Json(serde_json::json!({ 242 + "server": "allegedly (mirror)", 243 + "version": env!("CARGO_PKG_VERSION"), 244 + "wrapped_plc": wrapped_status, 245 + })), 246 + ) 193 247 } 194 - let latest = latest_at.get().await.ok(); 195 - ( 196 - overall_status, 197 - Json(serde_json::json!({ 198 - "server": "allegedly (mirror)", 199 - "version": env!("CARGO_PKG_VERSION"), 200 - "wrapped_plc": wrapped_status, 201 - "upstream_plc": upstream_status, 202 - "latest_at": latest, 203 - })), 204 - ) 205 248 } 206 249 207 250 fn proxy_response(res: reqwest::Response) -> Response { ··· 225 268 async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 226 269 let mut target = state.plc.clone(); 227 270 target.set_path(req.uri().path()); 271 + target.set_query(req.uri().query()); 228 272 let wrapped_res = state 229 273 .client 230 274 .get(target) ··· 344 388 plc: Url, 345 389 listen: ListenConf, 346 390 experimental: ExperimentalConf, 347 - db: Db, 391 + db: Option<Db>, 348 392 ) -> anyhow::Result<&'static str> { 349 393 log::info!("starting server..."); 350 394 ··· 355 399 .build() 356 400 .expect("reqwest client to build"); 357 401 358 - let latest_at = CachedValue::new(GetLatestAt(db), Duration::from_secs(2)); 359 - let upstream_status = CachedValue::new( 360 - CheckUpstream(upstream.clone(), client.clone()), 361 - Duration::from_secs(6), 362 - ); 402 + // when `db` is None, we're running in wrap mode. no db access, no upstream sync 403 + let sync_info = db.map(|db| SyncInfo { 404 + latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 405 + upstream_status: CachedValue::new( 406 + CheckUpstream(upstream.clone(), client.clone()), 407 + Duration::from_secs(6), 408 + ), 409 + }); 363 410 364 411 let state = State { 365 412 client, 366 413 plc, 367 414 upstream: upstream.clone(), 368 - latest_at, 369 - upstream_status, 415 + sync_info, 370 416 experimental: experimental.clone(), 371 417 }; 372 418 373 419 let mut app = Route::new() 374 420 .at("/", get(hello)) 375 421 .at("/favicon.ico", get(favicon)) 376 - .at("/_health", get(health)); 422 + .at("/_health", get(health)) 423 + .at("/export", get(proxy)); 377 424 378 425 if experimental.write_upstream { 379 426 log::info!("enabling experimental write forwarding to upstream"); ··· 385 432 .with(GovernorMiddleware::new(did_limiter)) 386 433 .with(GovernorMiddleware::new(ip_limiter)); 387 434 388 - app = app.at("/:any", get(proxy).post(upstream_proxier)); 435 + app = app.at("/did:plc:*", get(proxy).post(upstream_proxier)); 389 436 } else { 390 - app = app.at("/:any", get(proxy).post(nope)); 437 + app = app.at("/did:plc:*", get(proxy).post(nope)); 391 438 } 392 439 393 440 let app = app ··· 463 510 } 464 511 465 512 let app = Route::new() 466 - .at("/", get(oop_plz_be_secure)) 467 513 .at("/favicon.ico", get(favicon)) 514 + .nest("/", get(oop_plz_be_secure)) 468 515 .with(Tracing); 469 516 Server::new(TcpListener::bind(if ipv6 { 470 517 "[::]:80"
+53 -8
src/poll.rs
··· 4 4 use thiserror::Error; 5 5 use tokio::sync::mpsc; 6 6 7 - // plc.directory ratelimit on /export is 500 per 5 mins 8 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 - 10 7 #[derive(Debug, Error)] 11 8 pub enum GetPageError { 12 9 #[error(transparent)] ··· 54 51 } 55 52 } 56 53 57 - /// PLC 54 + /// State for removing duplicates ops between PLC export page boundaries 58 55 #[derive(Debug, PartialEq)] 59 56 pub struct PageBoundaryState { 57 + /// The previous page's last timestamp 58 + /// 59 + /// Duplicate ops from /export only occur for the same exact timestamp 60 60 pub last_at: Dt, 61 + /// The previous page's ops at its last timestamp 61 62 keys_at: Vec<OpKey>, // expected to ~always be length one 62 63 } 63 64 64 - /// track keys at final createdAt to deduplicate the start of the next page 65 65 impl PageBoundaryState { 66 + /// Initialize the boundary state with a PLC page 66 67 pub fn new(page: &ExportPage) -> Option<Self> { 67 68 // grab the very last op 68 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; ··· 78 79 79 80 Some(me) 80 81 } 82 + /// Apply the deduplication and update state 83 + /// 84 + /// The beginning of the page will be modified to remove duplicates from the 85 + /// previous page. 86 + /// 87 + /// The end of the page is inspected to update the deduplicator state for 88 + /// the next page. 81 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 82 90 // walk ops forward, kicking previously-seen ops until created_at advances 83 91 let to_remove: Vec<usize> = page ··· 127 135 } 128 136 } 129 137 138 + /// Get one PLC export page 139 + /// 140 + /// Extracts the final op so it can be used to fetch the following page 130 141 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 131 142 log::trace!("Getting page: {url}"); 132 143 ··· 141 152 .split('\n') 142 153 .filter_map(|s| { 143 154 serde_json::from_str::<Op>(s) 144 - .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 155 + .inspect_err(|e| { 156 + if !s.is_empty() { 157 + log::warn!("failed to parse op: {e} ({s})") 158 + } 159 + }) 145 160 .ok() 146 161 }) 147 162 .collect(); ··· 151 166 Ok((ExportPage { ops }, last_op)) 152 167 } 153 168 169 + /// Poll an upstream PLC server for new ops 170 + /// 171 + /// Pages of operations are written to the `dest` channel. 172 + /// 173 + /// ```no_run 174 + /// # #[tokio::main] 175 + /// # async fn main() { 176 + /// use allegedly::{ExportPage, Op, poll_upstream}; 177 + /// 178 + /// let after = Some(chrono::Utc::now()); 179 + /// let upstream = "https://plc.wtf/export".parse().unwrap(); 180 + /// let throttle = std::time::Duration::from_millis(300); 181 + /// 182 + /// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 183 + /// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 184 + /// 185 + /// while let Some(ExportPage { ops }) = rx.recv().await { 186 + /// println!("received {} plc ops", ops.len()); 187 + /// 188 + /// for Op { did, cid, operation, .. } in ops { 189 + /// // in this example we're alerting when changes are found for one 190 + /// // specific identity 191 + /// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 192 + /// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 193 + /// } 194 + /// } 195 + /// } 196 + /// # } 197 + /// ``` 154 198 pub async fn poll_upstream( 155 199 after: Option<Dt>, 156 200 base: Url, 201 + throttle: Duration, 157 202 dest: mpsc::Sender<ExportPage>, 158 203 ) -> anyhow::Result<&'static str> { 159 - log::info!("starting upstream poller after {after:?}"); 160 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 204 + log::info!("starting upstream poller at {base} after {after:?}"); 205 + let mut tick = tokio::time::interval(throttle); 161 206 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 207 let mut boundary_state: Option<PageBoundaryState> = None; 163 208 loop {