Server tools to backfill, tail, mirror, and verify PLC logs

Compare changes

Choose any two refs to compare.

+942 -163
+164 -1
Cargo.lock
··· 28 29 [[package]] 30 name = "allegedly" 31 - version = "0.2.0" 32 dependencies = [ 33 "anyhow", 34 "async-compression", ··· 39 "http-body-util", 40 "log", 41 "native-tls", 42 "poem", 43 "postgres-native-tls", 44 "reqwest", ··· 52 "tokio-postgres", 53 "tokio-stream", 54 "tokio-util", 55 "tracing-subscriber", 56 ] 57 ··· 1584 ] 1585 1586 [[package]] 1587 name = "parking_lot" 1588 version = "0.11.2" 1589 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1666 ] 1667 1668 [[package]] 1669 name = "pin-project-lite" 1670 version = "0.2.16" 1671 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1837 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1838 dependencies = [ 1839 "unicode-ident", 1840 ] 1841 1842 [[package]] ··· 2057 source = "registry+https://github.com/rust-lang/crates.io-index" 2058 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2059 dependencies = [ 2060 "base64", 2061 "bytes", 2062 "encoding_rs", 2063 "futures-core", 2064 "futures-util", 2065 "h2", ··· 2802 ] 2803 2804 [[package]] 2805 name = "tower" 2806 version = "0.5.2" 2807 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2887 "log", 2888 "once_cell", 2889 "tracing-core", 2890 ] 2891 2892 [[package]]
··· 28 29 [[package]] 30 name = "allegedly" 31 + version = "0.3.3" 32 dependencies = [ 33 "anyhow", 34 "async-compression", ··· 39 "http-body-util", 40 "log", 41 "native-tls", 42 + "opentelemetry", 43 + "opentelemetry-otlp", 44 + "opentelemetry_sdk", 45 "poem", 46 "postgres-native-tls", 47 "reqwest", ··· 55 "tokio-postgres", 56 "tokio-stream", 57 "tokio-util", 58 + "tracing", 59 + "tracing-opentelemetry", 60 "tracing-subscriber", 61 ] 62 ··· 1589 ] 1590 1591 [[package]] 1592 + name = "opentelemetry" 1593 + version = "0.30.0" 1594 + source = "registry+https://github.com/rust-lang/crates.io-index" 1595 + checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" 1596 + dependencies = [ 1597 + "futures-core", 1598 + "futures-sink", 1599 + "js-sys", 1600 + "pin-project-lite", 1601 + "thiserror 2.0.16", 1602 + "tracing", 1603 + ] 1604 + 1605 + [[package]] 1606 + name = "opentelemetry-http" 1607 + version = "0.30.0" 1608 + source = "registry+https://github.com/rust-lang/crates.io-index" 1609 + checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" 1610 + dependencies = [ 1611 + "async-trait", 1612 + "bytes", 1613 + "http", 1614 + "opentelemetry", 1615 + "reqwest", 1616 + ] 1617 + 1618 + [[package]] 1619 + name = "opentelemetry-otlp" 1620 + version = "0.30.0" 1621 + source = "registry+https://github.com/rust-lang/crates.io-index" 1622 + checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" 1623 + dependencies = [ 1624 + "http", 1625 + "opentelemetry", 1626 + "opentelemetry-http", 1627 + "opentelemetry-proto", 1628 + "opentelemetry_sdk", 1629 + "prost", 1630 + "reqwest", 1631 + "thiserror 2.0.16", 1632 + "tracing", 1633 + ] 1634 + 1635 + [[package]] 1636 + name = "opentelemetry-proto" 1637 + version = "0.30.0" 1638 + source = "registry+https://github.com/rust-lang/crates.io-index" 1639 + checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" 1640 + dependencies = [ 1641 + "opentelemetry", 1642 + "opentelemetry_sdk", 1643 + "prost", 1644 + "tonic", 1645 + ] 1646 + 1647 + [[package]] 1648 + name = "opentelemetry_sdk" 1649 + version = "0.30.0" 1650 + source = "registry+https://github.com/rust-lang/crates.io-index" 1651 + checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" 1652 + dependencies = [ 1653 + "futures-channel", 1654 + "futures-executor", 1655 + "futures-util", 1656 + "opentelemetry", 1657 + "percent-encoding", 1658 + "rand 0.9.2", 1659 + "serde_json", 1660 + "thiserror 2.0.16", 1661 + "tokio", 1662 + "tokio-stream", 1663 + ] 1664 + 1665 + [[package]] 1666 name = "parking_lot" 1667 version = "0.11.2" 1668 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1745 ] 1746 1747 [[package]] 1748 + name = "pin-project" 1749 + version = "1.1.10" 1750 + source = "registry+https://github.com/rust-lang/crates.io-index" 1751 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1752 + dependencies = [ 1753 + "pin-project-internal", 1754 + ] 1755 + 1756 + [[package]] 1757 + name = "pin-project-internal" 1758 + version = "1.1.10" 1759 + source = "registry+https://github.com/rust-lang/crates.io-index" 1760 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1761 + dependencies = [ 1762 + "proc-macro2", 1763 + "quote", 1764 + "syn", 1765 + ] 1766 + 1767 + [[package]] 1768 name = "pin-project-lite" 1769 version = "0.2.16" 1770 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1936 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1937 dependencies = [ 1938 "unicode-ident", 1939 + ] 1940 + 1941 + [[package]] 1942 + name = "prost" 1943 + version = "0.13.5" 1944 + source = "registry+https://github.com/rust-lang/crates.io-index" 1945 + checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 1946 + dependencies = [ 1947 + "bytes", 1948 + "prost-derive", 1949 + ] 1950 + 1951 + [[package]] 1952 + name = "prost-derive" 1953 + version = "0.13.5" 1954 + source = "registry+https://github.com/rust-lang/crates.io-index" 1955 + checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" 1956 + dependencies = [ 1957 + "anyhow", 1958 + "itertools", 1959 + "proc-macro2", 1960 + "quote", 1961 + "syn", 1962 ] 1963 1964 [[package]] ··· 2179 source = "registry+https://github.com/rust-lang/crates.io-index" 2180 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2181 dependencies = [ 2182 + "async-compression", 2183 "base64", 2184 "bytes", 2185 "encoding_rs", 2186 + "futures-channel", 2187 "futures-core", 2188 "futures-util", 2189 "h2", ··· 2926 ] 2927 2928 [[package]] 2929 + name = "tonic" 2930 + version = "0.13.1" 2931 + source = "registry+https://github.com/rust-lang/crates.io-index" 2932 + checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" 2933 + dependencies = [ 2934 + "async-trait", 2935 + "base64", 2936 + "bytes", 2937 + "http", 2938 + "http-body", 2939 + "http-body-util", 2940 + "percent-encoding", 2941 + "pin-project", 2942 + "prost", 2943 + "tokio-stream", 2944 + "tower-layer", 2945 + "tower-service", 2946 + "tracing", 2947 + ] 2948 + 2949 + [[package]] 2950 name = "tower" 2951 version = "0.5.2" 2952 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3032 "log", 3033 "once_cell", 3034 "tracing-core", 3035 + ] 3036 + 3037 + [[package]] 3038 + name = "tracing-opentelemetry" 3039 + version = "0.31.0" 3040 + source = "registry+https://github.com/rust-lang/crates.io-index" 3041 + checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" 3042 + dependencies = [ 3043 + "js-sys", 3044 + "once_cell", 3045 + "opentelemetry", 3046 + "opentelemetry_sdk", 3047 + "smallvec", 3048 + "tracing", 3049 + "tracing-core", 3050 + "tracing-log", 3051 + "tracing-subscriber", 3052 + "web-time", 3053 ] 3054 3055 [[package]]
+7 -2
Cargo.toml
··· 2 name = "allegedly" 3 description = "public ledger server tools and services (for the PLC)" 4 license = "MIT OR Apache-2.0" 5 - version = "0.2.1" 6 edition = "2024" 7 default-run = "allegedly" 8 ··· 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 native-tls = "0.2.14" 19 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 postgres-native-tls = "0.5.1" 21 - reqwest = { version = "0.12.23", features = ["stream", "json"] } 22 reqwest-middleware = "0.4.2" 23 reqwest-retry = "0.7.0" 24 rustls = "0.23.32" ··· 29 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 30 tokio-stream = { version = "0.1.17", features = ["io-util"] } 31 tokio-util = { version = "0.7.16", features = ["compat"] } 32 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
··· 2 name = "allegedly" 3 description = "public ledger server tools and services (for the PLC)" 4 license = "MIT OR Apache-2.0" 5 + version = "0.3.3" 6 edition = "2024" 7 default-run = "allegedly" 8 ··· 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 native-tls = "0.2.14" 19 + opentelemetry = "0.30.0" 20 + opentelemetry-otlp = { version = "0.30.0" } 21 + opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 22 poem = { version = "3.1.12", features = ["acme", "compression"] } 23 postgres-native-tls = "0.5.1" 24 + reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 25 reqwest-middleware = "0.4.2" 26 reqwest-retry = "0.7.0" 27 rustls = "0.23.32" ··· 32 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 33 tokio-stream = { version = "0.1.17", features = ["io-util"] } 34 tokio-util = { version = "0.7.16", features = ["compat"] } 35 + tracing = "0.1.41" 36 + tracing-opentelemetry = "0.31.0" 37 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···
··· 1 + use allegedly::{ExportPage, poll_upstream}; 2 + 3 + #[tokio::main] 4 + async fn main() { 5 + // set to `None` to replay from the beginning of the PLC history 6 + let after = Some(chrono::Utc::now()); 7 + 8 + // the PLC server to poll for new ops 9 + let upstream = "https://plc.wtf/export".parse().unwrap(); 10 + 11 + // self-rate-limit (plc.directory's limit interval is 600ms) 12 + let throttle = std::time::Duration::from_millis(300); 13 + 14 + // pages are sent out of the poller via a tokio mpsc channel 15 + let (tx, mut rx) = tokio::sync::mpsc::channel(1); 16 + 17 + // spawn a tokio task to run the poller 18 + tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 19 + 20 + // receive pages of plc ops from the poller 21 + while let Some(ExportPage { ops }) = rx.recv().await { 22 + println!("received {} plc ops", ops.len()); 23 + 24 + for op in ops { 25 + // in this example we're alerting when changes are found for one 26 + // specific identity 27 + if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 28 + println!( 29 + "Update found for {}! cid={}\n -> operation: {}", 30 + op.did, 31 + op.cid, 32 + op.operation.get() 33 + ); 34 + } 35 + } 36 + } 37 + }
+25 -7
readme.md
··· 26 sudo allegedly mirror \ 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 --acme-domain "plc.wtf" \ 30 --acme-cache-path ./acme-cache \ 31 - --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 ``` 33 34 ··· 61 - monitoring of the various tasks 62 - health check pings 63 - expose metrics/tracing 64 - - read-only flag for mirror wrapper 65 - bundle: write directly to s3-compatible object storage 66 - helpers for automating periodic `bundle` runs 67 68 69 ### new things 70 71 - - experimental: websocket version of /export 72 - - experimental: accept writes by forwarding them upstream 73 - - experimental: serve a tlog 74 - - experimental: embed a log database directly for fast and efficient mirroring 75 - - experimental: support multiple upstreams? 76 77 - [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range 78 - [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
··· 26 sudo allegedly mirror \ 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 + --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \ 30 --acme-domain "plc.wtf" \ 31 + --acme-domain "alt.plc.wtf" \ 32 + --experimental-acme-domain "experimental.plc.wtf" \ 33 --acme-cache-path ./acme-cache \ 34 + --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \ 35 + --acme-ipv6 \ 36 + --experimental-write-upstream 37 + ``` 38 + 39 + - Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream 40 + 41 + ```bash 42 + sudo allegedly wrap \ 43 + --wrap "http://127.0.0.1:3000" \ 44 + --acme-ipv6 \ 45 + --acme-cache-path ./acme-cache \ 46 + --acme-domain "plc.wtf" \ 47 + --experimental-acme-domain "experimental.plc.wtf" \ 48 + --experimental-write-upstream \ 49 + --upstream "https://plc.wtf" \ 50 ``` 51 52 ··· 79 - monitoring of the various tasks 80 - health check pings 81 - expose metrics/tracing 82 + - [x] read-only flag for mirror wrapper 83 - bundle: write directly to s3-compatible object storage 84 - helpers for automating periodic `bundle` runs 85 86 87 ### new things 88 89 + - [ ] experimental: websocket version of /export 90 + - [x] experimental: accept writes by forwarding them upstream 91 + - [ ] experimental: serve a tlog 92 + - [ ] experimental: embed a log database directly for fast and efficient mirroring 93 + - [ ] experimental: support multiple upstreams? 94 95 - [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range 96 - [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
+34 -6
src/bin/allegedly.rs
··· 1 - use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 use clap::{CommandFactory, Parser, Subcommand}; 3 - use std::{path::PathBuf, time::Instant}; 4 use tokio::fs::create_dir_all; 5 use tokio::sync::mpsc; 6 ··· 48 Mirror { 49 #[command(flatten)] 50 args: mirror::Args, 51 }, 52 /// Poll an upstream PLC server and log new ops to stdout 53 Tail { ··· 57 }, 58 } 59 60 #[tokio::main] 61 async fn main() -> anyhow::Result<()> { 62 let args = Cli::parse(); 63 let matches = Cli::command().get_matches(); 64 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 65 - bin_init(name); 66 67 let globals = args.globals.clone(); 68 ··· 76 } => { 77 let mut url = globals.upstream; 78 url.set_path("/export"); 79 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 80 tokio::task::spawn(async move { 81 - poll_upstream(Some(after), url, tx) 82 .await 83 .expect("to poll upstream") 84 }); ··· 90 .await 91 .expect("to write bundles to output files"); 92 } 93 - Commands::Mirror { args } => mirror::run(globals, args).await?, 94 Commands::Tail { after } => { 95 let mut url = globals.upstream; 96 url.set_path("/export"); 97 let start_at = after.or_else(|| Some(chrono::Utc::now())); 98 let (tx, rx) = mpsc::channel(1); 99 tokio::task::spawn(async move { 100 - poll_upstream(start_at, url, tx) 101 .await 102 .expect("to poll upstream") 103 });
··· 1 + use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2 + use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 3 use clap::{CommandFactory, Parser, Subcommand}; 4 + use std::{path::PathBuf, time::Duration, time::Instant}; 5 use tokio::fs::create_dir_all; 6 use tokio::sync::mpsc; 7 ··· 49 Mirror { 50 #[command(flatten)] 51 args: mirror::Args, 52 + #[command(flatten)] 53 + instrumentation: InstrumentationArgs, 54 + }, 55 + /// Wrap any did-method-plc server, without syncing upstream (read-only) 56 + Wrap { 57 + #[command(flatten)] 58 + args: mirror::Args, 59 + #[command(flatten)] 60 + instrumentation: InstrumentationArgs, 61 }, 62 /// Poll an upstream PLC server and log new ops to stdout 63 Tail { ··· 67 }, 68 } 69 70 + impl Commands { 71 + fn enable_otel(&self) -> bool { 72 + match self { 73 + Commands::Mirror { 74 + instrumentation, .. 75 + } 76 + | Commands::Wrap { 77 + instrumentation, .. 78 + } => instrumentation.enable_opentelemetry, 79 + _ => false, 80 + } 81 + } 82 + } 83 + 84 #[tokio::main] 85 async fn main() -> anyhow::Result<()> { 86 let args = Cli::parse(); 87 let matches = Cli::command().get_matches(); 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 89 + bin_init(args.command.enable_otel()); 90 + log::info!("{}", logo(name)); 91 92 let globals = args.globals.clone(); 93 ··· 101 } => { 102 let mut url = globals.upstream; 103 url.set_path("/export"); 104 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 106 tokio::task::spawn(async move { 107 + poll_upstream(Some(after), url, throttle, tx) 108 .await 109 .expect("to poll upstream") 110 }); ··· 116 .await 117 .expect("to write bundles to output files"); 118 } 119 + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 121 Commands::Tail { after } => { 122 let mut url = globals.upstream; 123 url.set_path("/export"); 124 let start_at = after.or_else(|| Some(chrono::Utc::now())); 125 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 126 let (tx, rx) = mpsc::channel(1); 127 tokio::task::spawn(async move { 128 + poll_upstream(start_at, url, throttle, tx) 129 .await 130 .expect("to poll upstream") 131 });
+15 -7
src/bin/backfill.rs
··· 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 4 }; 5 use clap::Parser; 6 use reqwest::Url; 7 - use std::path::PathBuf; 8 use tokio::{ 9 sync::{mpsc, oneshot}, 10 task::JoinSet, ··· 53 } 54 55 pub async fn run( 56 - GlobalArgs { upstream }: GlobalArgs, 57 Args { 58 http, 59 dir, ··· 98 } 99 let mut upstream = upstream; 100 upstream.set_path("/export"); 101 - tasks.spawn(poll_upstream(None, upstream, poll_tx)); 102 tasks.spawn(full_pages(poll_out, full_tx)); 103 tasks.spawn(pages_to_stdout(full_out, None)); 104 } else { ··· 128 129 // and the catch-up source... 130 if let Some(last) = found_last_out { 131 tasks.spawn(async move { 132 let mut upstream = upstream; 133 upstream.set_path("/export"); 134 - poll_upstream(last.await?, upstream, poll_tx).await 135 }); 136 } 137 ··· 193 #[tokio::main] 194 async fn main() -> anyhow::Result<()> { 195 let args = CliArgs::parse(); 196 - bin_init("backfill"); 197 run(args.globals, args.args).await?; 198 Ok(()) 199 }
··· 1 use allegedly::{ 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 + bin::{GlobalArgs, bin_init}, 4 + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 5 }; 6 use clap::Parser; 7 use reqwest::Url; 8 + use std::{path::PathBuf, time::Duration}; 9 use tokio::{ 10 sync::{mpsc, oneshot}, 11 task::JoinSet, ··· 54 } 55 56 pub async fn run( 57 + GlobalArgs { 58 + upstream, 59 + upstream_throttle_ms, 60 + }: GlobalArgs, 61 Args { 62 http, 63 dir, ··· 102 } 103 let mut upstream = upstream; 104 upstream.set_path("/export"); 105 + let throttle = Duration::from_millis(upstream_throttle_ms); 106 + tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 107 tasks.spawn(full_pages(poll_out, full_tx)); 108 tasks.spawn(pages_to_stdout(full_out, None)); 109 } else { ··· 133 134 // and the catch-up source... 135 if let Some(last) = found_last_out { 136 + let throttle = Duration::from_millis(upstream_throttle_ms); 137 tasks.spawn(async move { 138 let mut upstream = upstream; 139 upstream.set_path("/export"); 140 + 141 + poll_upstream(last.await?, upstream, throttle, poll_tx).await 142 }); 143 } 144 ··· 200 #[tokio::main] 201 async fn main() -> anyhow::Result<()> { 202 let args = CliArgs::parse(); 203 + bin_init(false); 204 + log::info!("{}", logo("backfill")); 205 run(args.globals, args.args).await?; 206 Ok(()) 207 }
+28
src/bin/instrumentation/mod.rs
···
··· 1 + use opentelemetry::trace::TracerProvider as _; 2 + use opentelemetry_otlp::{Protocol, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::registry::LookupSpan; 7 + 8 + pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer> 9 + where 10 + S: Subscriber + for<'span> LookupSpan<'span>, 11 + { 12 + let exporter = opentelemetry_otlp::SpanExporter::builder() 13 + .with_http() 14 + .with_protocol(Protocol::HttpBinary) 15 + .build() 16 + .expect("to build otel otlp exporter"); 17 + 18 + let provider = SdkTracerProvider::builder() 19 + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( 20 + 1.0, 21 + )))) 22 + .with_id_generator(RandomIdGenerator::default()) 23 + .with_batch_exporter(exporter) 24 + .build(); 25 + 26 + let tracer = provider.tracer("tracing-otel-subscriber"); 27 + tracing_opentelemetry::layer().with_tracer(tracer) 28 + }
+82 -23
src/bin/mirror.rs
··· 1 - use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 2 use clap::Parser; 3 use reqwest::Url; 4 - use std::{net::SocketAddr, path::PathBuf}; 5 use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 6 7 #[derive(Debug, clap::Args)] ··· 11 wrap: Url, 12 /// the wrapped did-method-plc server's database (write access required) 13 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 14 - wrap_pg: Url, 15 /// path to tls cert for the wrapped postgres db, if needed 16 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 17 wrap_pg_cert: Option<PathBuf>, ··· 39 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 acme_directory_url: Url, 42 } 43 44 pub async fn run( 45 - GlobalArgs { upstream }: GlobalArgs, 46 Args { 47 wrap, 48 wrap_pg, ··· 51 acme_domain, 52 acme_cache_path, 53 acme_directory_url, 54 }: Args, 55 ) -> anyhow::Result<()> { 56 - let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 57 - 58 - // TODO: allow starting up with polling backfill from beginning? 59 - log::debug!("getting the latest op from the db..."); 60 - let latest = db 61 - .get_latest() 62 - .await? 63 - .expect("there to be at least one op in the db. did you backfill?"); 64 - 65 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 66 (_, false, Some(cache_path)) => { 67 - log::info!("configuring acme for https at {acme_domain:?}..."); 68 create_dir_all(&cache_path).await?; 69 ListenConf::Acme { 70 - domains: acme_domain, 71 cache_path, 72 directory_url: acme_directory_url.to_string(), 73 } 74 } 75 (bind, true, None) => ListenConf::Bind(bind), 76 (_, _, _) => unreachable!(), 77 }; 78 79 let mut tasks = JoinSet::new(); 80 81 - let (send_page, recv_page) = mpsc::channel(8); 82 83 - let mut poll_url = upstream.clone(); 84 - poll_url.set_path("/export"); 85 86 - tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 87 - tasks.spawn(pages_to_pg(db.clone(), recv_page)); 88 - tasks.spawn(serve(upstream, wrap, listen_conf)); 89 90 while let Some(next) = tasks.join_next().await { 91 match next { ··· 115 #[command(flatten)] 116 globals: GlobalArgs, 117 #[command(flatten)] 118 args: Args, 119 } 120 121 #[allow(dead_code)] 122 #[tokio::main] 123 async fn main() -> anyhow::Result<()> { 124 let args = CliArgs::parse(); 125 - bin_init("mirror"); 126 - run(args.globals, args.args).await?; 127 Ok(()) 128 }
··· 1 + use allegedly::{ 2 + Db, ExperimentalConf, ListenConf, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + logo, pages_to_pg, poll_upstream, serve, 5 + }; 6 use clap::Parser; 7 use reqwest::Url; 8 + use std::{net::SocketAddr, path::PathBuf, time::Duration}; 9 use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 10 11 #[derive(Debug, clap::Args)] ··· 15 wrap: Url, 16 /// the wrapped did-method-plc server's database (write access required) 17 #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 18 + wrap_pg: Option<Url>, 19 /// path to tls cert for the wrapped postgres db, if needed 20 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 21 wrap_pg_cert: Option<PathBuf>, ··· 43 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 44 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 45 acme_directory_url: Url, 46 + /// try to listen for ipv6 47 + #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")] 48 + acme_ipv6: bool, 49 + /// only accept experimental requests at this hostname 50 + /// 51 + /// a cert will be provisioned for it from letsencrypt. if you're not using 52 + /// acme (eg., behind a tls-terminating reverse proxy), open a feature request. 53 + #[arg( 54 + long, 55 + requires("acme_domain"), 56 + env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN" 57 + )] 58 + experimental_acme_domain: Option<String>, 59 + /// accept writes! by forwarding them upstream 60 + #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 61 + experimental_write_upstream: bool, 62 } 63 64 pub async fn run( 65 + GlobalArgs { 66 + upstream, 67 + upstream_throttle_ms, 68 + }: GlobalArgs, 69 Args { 70 wrap, 71 wrap_pg, ··· 74 acme_domain, 75 acme_cache_path, 76 acme_directory_url, 77 + acme_ipv6, 78 + experimental_acme_domain, 79 + experimental_write_upstream, 80 }: Args, 81 + sync: bool, 82 ) -> anyhow::Result<()> { 83 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 84 (_, false, Some(cache_path)) => { 85 create_dir_all(&cache_path).await?; 86 + let mut domains = acme_domain.clone(); 87 + if let Some(ref experimental_domain) = experimental_acme_domain { 88 + domains.push(experimental_domain.clone()) 89 + } 90 + log::info!("configuring acme for https at {domains:?}..."); 91 ListenConf::Acme { 92 + domains, 93 cache_path, 94 directory_url: acme_directory_url.to_string(), 95 + ipv6: acme_ipv6, 96 } 97 } 98 (bind, true, None) => ListenConf::Bind(bind), 99 (_, _, _) => unreachable!(), 100 }; 101 102 + let experimental_conf = ExperimentalConf { 103 + acme_domain: experimental_acme_domain, 104 + write_upstream: experimental_write_upstream, 105 + }; 106 + 107 let mut tasks = JoinSet::new(); 108 109 + let db = if sync { 110 + let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 + "a wrapped reference postgres must be provided to sync" 112 + ))?; 113 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 114 115 + // TODO: allow starting up with polling backfill from beginning? 116 + log::debug!("getting the latest op from the db..."); 117 + let latest = db 118 + .get_latest() 119 + .await? 120 + .expect("there to be at least one op in the db. did you backfill?"); 121 122 + let (send_page, recv_page) = mpsc::channel(8); 123 + 124 + let mut poll_url = upstream.clone(); 125 + poll_url.set_path("/export"); 126 + let throttle = Duration::from_millis(upstream_throttle_ms); 127 + 128 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 130 + Some(db) 131 + } else { 132 + None 133 + }; 134 + 135 + tasks.spawn(serve( 136 + upstream, 137 + wrap, 138 + listen_conf, 139 + experimental_conf, 140 + db.clone(), 141 + )); 142 143 while let Some(next) = tasks.join_next().await { 144 match next { ··· 168 #[command(flatten)] 169 globals: GlobalArgs, 170 #[command(flatten)] 171 + instrumentation: InstrumentationArgs, 172 + #[command(flatten)] 173 args: Args, 174 + /// Run the mirror in wrap mode, no upstream synchronization (read-only) 175 + #[arg(long, action)] 176 + wrap_mode: bool, 177 } 178 179 #[allow(dead_code)] 180 #[tokio::main] 181 async fn main() -> anyhow::Result<()> { 182 let args = CliArgs::parse(); 183 + bin_init(args.instrumentation.enable_opentelemetry); 184 + log::info!("{}", logo("mirror")); 185 + run(args.globals, args.args, !args.wrap_mode).await?; 186 Ok(()) 187 }
+44
src/bin/mod.rs
··· 1 use reqwest::Url; 2 3 #[derive(Debug, Clone, clap::Args)] 4 pub struct GlobalArgs { ··· 6 #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 #[clap(default_value = "https://plc.directory")] 8 pub upstream: Url, 9 } 10 11 #[allow(dead_code)]
··· 1 + mod instrumentation; 2 + 3 use reqwest::Url; 4 + use tracing_subscriber::layer::SubscriberExt; 5 6 #[derive(Debug, Clone, clap::Args)] 7 pub struct GlobalArgs { ··· 9 #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 10 #[clap(default_value = "https://plc.directory")] 11 pub upstream: Url, 12 + /// Self-rate-limit upstream request interval 13 + /// 14 + /// plc.directory's rate limiting is 500 requests per 5 mins (600ms) 15 + #[arg(long, global = true, env = "ALLEGEDLY_UPSTREAM_THROTTLE_MS")] 16 + #[clap(default_value = "600")] 17 + pub upstream_throttle_ms: u64, 18 + } 19 + 20 + #[derive(Debug, Default, Clone, clap::Args)] 21 + pub struct InstrumentationArgs { 22 + /// Export traces to an OTLP collector 23 + /// 24 + /// Configure the colletctor via standard env vars: 25 + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" 26 + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" 27 + /// - `OTEL_SERVICE_NAME` eg "my-app" 28 + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] 29 + pub enable_opentelemetry: bool, 30 + } 31 + 32 + pub fn bin_init(enable_otlp: bool) { 33 + let filter = tracing_subscriber::EnvFilter::builder() 34 + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) 35 + .from_env_lossy(); 36 + 37 + let stderr_log = tracing_subscriber::fmt::layer() 38 + .with_writer(std::io::stderr) 39 + .pretty(); 40 + 41 + let otel = if enable_otlp { 42 + Some(instrumentation::otel_layer()) 43 + } else { 44 + None 45 + }; 46 + 47 + let subscriber = tracing_subscriber::Registry::default() 48 + .with(filter) 49 + .with(stderr_log) 50 + .with(otel); 51 + 52 + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); 53 } 54 55 #[allow(dead_code)]
+73
src/cached_value.rs
···
··· 1 + use std::error::Error; 2 + use std::sync::Arc; 3 + use std::time::{Duration, Instant}; 4 + use tokio::sync::Mutex; 5 + 6 + pub trait Fetcher<T> { 7 + fn fetch(&self) -> impl Future<Output = Result<T, Box<dyn Error>>>; 8 + } 9 + 10 + #[derive(Debug)] 11 + struct ExpiringValue<T: Clone> { 12 + value: T, 13 + expires: Instant, 14 + } 15 + 16 + impl<T: Clone> ExpiringValue<T> { 17 + fn get(&self, now: Instant) -> Option<T> { 18 + if now <= self.expires { 19 + log::trace!("returning val (fresh for {:?})", self.expires - now); 20 + Some(self.value.clone()) 21 + } else { 22 + log::trace!("hiding expired val"); 23 + None 24 + } 25 + } 26 + } 27 + 28 + // TODO: generic over the fetcher's actual error type 29 + #[derive(Clone)] 30 + pub struct CachedValue<T: Clone, F: Fetcher<T>> { 31 + latest: Arc<Mutex<Option<ExpiringValue<T>>>>, 32 + fetcher: F, 33 + validitiy: Duration, 34 + } 35 + 36 + impl<T: Clone, F: Fetcher<T>> CachedValue<T, F> { 37 + pub fn new(f: F, validitiy: Duration) -> Self { 38 + Self { 39 + latest: Default::default(), 40 + fetcher: f, 41 + validitiy, 42 + } 43 + } 44 + pub async fn get(&self) -> Result<T, Box<dyn Error>> { 45 + let now = Instant::now(); 46 + return self.get_impl(now).await; 47 + } 48 + async fn get_impl(&self, now: Instant) -> Result<T, Box<dyn Error>> { 49 + let mut val = self.latest.lock().await; 50 + if let Some(v) = val.as_ref().and_then(|v| v.get(now)) { 51 + return Ok(v); 52 + } 53 + log::debug!( 54 + "value {}, fetching...", 55 + if val.is_some() { 56 + "expired" 57 + } else { 58 + "not present" 59 + } 60 + ); 61 + let new = self 62 + .fetcher 63 + .fetch() 64 + .await 65 + .inspect_err(|e| log::warn!("value fetch failed, next access will retry: {e}"))?; 66 + log::debug!("fetched ok, saving a copy for cache."); 67 + *val = Some(ExpiringValue { 68 + value: new.clone(), 69 + expires: now + self.validitiy, 70 + }); 71 + Ok(new) 72 + } 73 + }
+1
src/client.rs
··· 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 let inner = Client::builder() 14 .user_agent(UA) 15 .build() 16 .expect("reqwest client to build"); 17
··· 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 let inner = Client::builder() 14 .user_agent(UA) 15 + .gzip(true) 16 .build() 17 .expect("reqwest client to build"); 18
+4 -15
src/lib.rs
··· 2 use tokio::sync::{mpsc, oneshot}; 3 4 mod backfill; 5 mod client; 6 mod mirror; 7 mod plc_pg; ··· 12 pub mod bin; 13 14 pub use backfill::backfill; 15 pub use client::{CLIENT, UA}; 16 - pub use mirror::{ListenConf, serve}; 17 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 18 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 19 - pub use ratelimit::GovernorMiddleware; 20 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 21 22 pub type Dt = chrono::DateTime<chrono::Utc>; ··· 143 env!("CARGO_PKG_VERSION"), 144 ) 145 } 146 - 147 - pub fn bin_init(name: &str) { 148 - if std::env::var_os("RUST_LOG").is_none() { 149 - unsafe { std::env::set_var("RUST_LOG", "info") }; 150 - } 151 - let filter = tracing_subscriber::EnvFilter::from_default_env(); 152 - tracing_subscriber::fmt() 153 - .with_writer(std::io::stderr) 154 - .with_env_filter(filter) 155 - .init(); 156 - 157 - log::info!("{}", logo(name)); 158 - }
··· 2 use tokio::sync::{mpsc, oneshot}; 3 4 mod backfill; 5 + mod cached_value; 6 mod client; 7 mod mirror; 8 mod plc_pg; ··· 13 pub mod bin; 14 15 pub use backfill::backfill; 16 + pub use cached_value::{CachedValue, Fetcher}; 17 pub use client::{CLIENT, UA}; 18 + pub use mirror::{ExperimentalConf, ListenConf, serve}; 19 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 20 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21 + pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 22 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 23 24 pub type Dt = chrono::DateTime<chrono::Utc>; ··· 145 env!("CARGO_PKG_VERSION"), 146 ) 147 }
+287 -61
src/mirror.rs
··· 1 - use crate::{GovernorMiddleware, UA, logo}; 2 use futures::TryStreamExt; 3 use governor::Quota; 4 use poem::{ 5 - Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, 6 - handler, 7 - http::StatusCode, 8 listener::{Listener, TcpListener, acme::AutoCert}, 9 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 10 - web::{Data, Json}, 11 }; 12 use reqwest::{Client, Url}; 13 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 14 15 - #[derive(Debug, Clone)] 16 struct State { 17 client: Client, 18 plc: Url, 19 upstream: Url, 20 } 21 22 - #[handler] 23 - fn hello(Data(State { upstream, .. }): Data<&State>) -> String { 24 - format!( 25 - r#"{} 26 27 This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 28 synchronizes a local PLC reference server instance[2] (why?[3]). 29 ··· 32 33 {upstream} 34 35 36 Available APIs: 37 ··· 40 - GET /* Proxies to wrapped server; see PLC API docs: 41 https://web.plc.directory/api/redoc 42 43 - - POST /* Always rejected. This is a mirror. 44 45 46 - tip: try `GET /{{did}}` to resolve an identity 47 48 49 - Allegedly is a suit of open-source CLI tools for working with PLC logs: 50 51 - https://tangled.org/@microcosm.blue/Allegedly 52 53 54 [1] https://web.plc.directory ··· 64 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 65 } 66 67 - fn failed_to_reach_wrapped() -> String { 68 format!( 69 r#"{} 70 71 - Failed to reach the wrapped reference PLC server. Sorry. 72 "#, 73 logo("mirror 502 :( ") 74 ) 75 } 76 77 - async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) { 78 use serde_json::json; 79 80 let mut url = url.clone(); ··· 110 } 111 } 112 113 #[handler] 114 async fn health( 115 Data(State { 116 plc, 117 client, 118 - upstream, 119 }): Data<&State>, 120 ) -> impl IntoResponse { 121 let mut overall_status = StatusCode::OK; ··· 123 if !ok { 124 overall_status = StatusCode::BAD_GATEWAY; 125 } 126 - let (ok, upstream_status) = plc_status(upstream, client).await; 127 - if !ok { 128 - overall_status = StatusCode::BAD_GATEWAY; 129 } 130 - ( 131 - overall_status, 132 - Json(serde_json::json!({ 133 - "server": "allegedly (mirror)", 134 - "version": env!("CARGO_PKG_VERSION"), 135 - "wrapped_plc": wrapped_status, 136 - "upstream_plc": upstream_status, 137 - })), 138 - ) 139 } 140 141 #[handler] 142 - async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> { 143 let mut target = state.plc.clone(); 144 target.set_path(req.uri().path()); 145 - let upstream_res = state 146 .client 147 .get(target) 148 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server ··· 151 .await 152 .map_err(|e| { 153 log::error!("upstream req fail: {e}"); 154 - Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 155 })?; 156 157 - let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 158 - let (parts, reqw_body) = http_res.into_parts(); 159 160 - let parts = poem::ResponseParts { 161 - status: parts.status, 162 - version: parts.version, 163 - headers: parts.headers, 164 - extensions: parts.extensions, 165 - }; 166 167 - let body = http_body_util::BodyDataStream::new(reqw_body) 168 - .map_err(|e| std::io::Error::other(Box::new(e))); 169 170 - Ok(Response::from_parts( 171 - parts, 172 - poem::Body::from_bytes_stream(body), 173 - )) 174 } 175 176 #[handler] ··· 182 183 Sorry, this server does not accept POST requests. 184 185 - You may wish to try upstream: {upstream} 186 "#, 187 logo("mirror (nope)") 188 ), ··· 195 domains: Vec<String>, 196 cache_path: PathBuf, 197 directory_url: String, 198 }, 199 Bind(SocketAddr), 200 } 201 202 - pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 203 log::info!("starting server..."); 204 205 // not using crate CLIENT: don't want the retries etc ··· 209 .build() 210 .expect("reqwest client to build"); 211 212 let state = State { 213 client, 214 plc, 215 upstream: upstream.clone(), 216 }; 217 218 - let app = Route::new() 219 .at("/", get(hello)) 220 .at("/favicon.ico", get(favicon)) 221 .at("/_health", get(health)) 222 - .at("/:any", get(proxy).post(nope)) 223 .with(AddData::new(state)) 224 .with(Cors::new().allow_credentials(false)) 225 .with(Compression::new()) 226 - .with(GovernorMiddleware::new(Quota::per_minute( 227 3000.try_into().expect("ratelimit middleware to build"), 228 - ))) 229 .with(CatchPanic::new()) 230 .with(Tracing); 231 ··· 234 domains, 235 cache_path, 236 directory_url, 237 } => { 238 rustls::crypto::aws_lc_rs::default_provider() 239 .install_default() ··· 247 } 248 let auto_cert = auto_cert.build().expect("acme config to build"); 249 250 - let notice_task = tokio::task::spawn(run_insecure_notice()); 251 - let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 252 log::warn!("server task ended, aborting insecure server task..."); 253 notice_task.abort(); 254 app_res?; ··· 272 } 273 274 /// kick off a tiny little server on a tokio task to tell people to use 443 275 - async fn run_insecure_notice() -> Result<(), std::io::Error> { 276 #[handler] 277 fn oop_plz_be_secure() -> (StatusCode, String) { 278 ( ··· 288 } 289 290 let app = Route::new() 291 - .at("/", get(oop_plz_be_secure)) 292 .at("/favicon.ico", get(favicon)) 293 .with(Tracing); 294 - Server::new(TcpListener::bind("0.0.0.0:80")) 295 - .name("allegedly (mirror:80 helper)") 296 - .run(app) 297 - .await 298 }
··· 1 + use crate::{ 2 + CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 3 + }; 4 use futures::TryStreamExt; 5 use governor::Quota; 6 use poem::{ 7 + Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 8 + get, handler, 9 + http::{StatusCode, header::USER_AGENT}, 10 listener::{Listener, TcpListener, acme::AutoCert}, 11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 12 + web::{Data, Json, Path}, 13 }; 14 use reqwest::{Client, Url}; 15 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 16 17 + #[derive(Clone)] 18 struct State { 19 client: Client, 20 plc: Url, 21 upstream: Url, 22 + sync_info: Option<SyncInfo>, 23 + experimental: ExperimentalConf, 24 } 25 26 + /// server info that only applies in mirror (synchronizing) mode 27 + #[derive(Clone)] 28 + struct SyncInfo { 29 + latest_at: CachedValue<Dt, GetLatestAt>, 30 + upstream_status: CachedValue<PlcStatus, CheckUpstream>, 31 + } 32 33 + #[handler] 34 + fn hello( 35 + Data(State { 36 + sync_info, 37 + upstream, 38 + experimental: exp, 39 + .. 40 + }): Data<&State>, 41 + req: &Request, 42 + ) -> String { 43 + // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 + let pre_info = if sync_info.is_some() { 45 + format!( 46 + r#" 47 This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 48 synchronizes a local PLC reference server instance[2] (why?[3]). 49 ··· 52 53 {upstream} 54 55 + "# 56 + ) 57 + } else { 58 + format!( 59 + r#" 60 + This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse- 61 + proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy. 62 + 63 + 64 + Configured upstream (only used if experimental op forwarding is enabled): 65 + 66 + {upstream} 67 + 68 + "# 69 + ) 70 + }; 71 + 72 + let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 73 + (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 74 + (_, None, _) => { 75 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 76 + } 77 + (_, Some(d), Some(f)) if f == d => { 78 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 79 + } 80 + (_, Some(d), _) => format!( 81 + r#" - POST /* Rejected, but experimental upstream op forwarding is 82 + available at `POST https://{d}/:did`!"# 83 + ), 84 + }; 85 + 86 + format!( 87 + r#"{} 88 + {pre_info} 89 90 Available APIs: 91 ··· 94 - GET /* Proxies to wrapped server; see PLC API docs: 95 https://web.plc.directory/api/redoc 96 97 + tip: try `GET /{{did}}` to resolve an identity 98 99 + {post_info} 100 101 102 + Allegedly is a suite of open-source CLI tools from for working with PLC logs, 103 + from microcosm: 104 105 + https://tangled.org/@microcosm.blue/Allegedly 106 107 + https://microcosm.blue 108 109 110 [1] https://web.plc.directory ··· 120 include_bytes!("../favicon.ico").with_content_type("image/x-icon") 121 } 122 123 + fn failed_to_reach_named(name: &str) -> String { 124 format!( 125 r#"{} 126 127 + Failed to reach the {name} server. Sorry. 128 "#, 129 logo("mirror 502 :( ") 130 ) 131 } 132 133 + fn bad_create_op(reason: &str) -> Response { 134 + Response::builder() 135 + .status(StatusCode::BAD_REQUEST) 136 + .body(format!( 137 + r#"{} 138 + 139 + NooOOOooooo: {reason} 140 + "#, 141 + logo("mirror 400 >:( ") 142 + )) 143 + } 144 + 145 + type PlcStatus = (bool, serde_json::Value); 146 + 147 + async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 148 use serde_json::json; 149 150 let mut url = url.clone(); ··· 180 } 181 } 182 183 + #[derive(Clone)] 184 + struct GetLatestAt(Db); 185 + impl Fetcher<Dt> for GetLatestAt { 186 + async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 187 + let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 188 + "expected to find at least one thing in the db" 189 + ))?; 190 + Ok(now) 191 + } 192 + } 193 + 194 + #[derive(Clone)] 195 + struct CheckUpstream(Url, Client); 196 + impl Fetcher<PlcStatus> for CheckUpstream { 197 + async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 198 + Ok(plc_status(&self.0, &self.1).await) 199 + } 200 + } 201 + 202 #[handler] 203 async fn health( 204 Data(State { 205 plc, 206 client, 207 + sync_info, 208 + .. 209 }): Data<&State>, 210 ) -> impl IntoResponse { 211 let mut overall_status = StatusCode::OK; ··· 213 if !ok { 214 overall_status = StatusCode::BAD_GATEWAY; 215 } 216 + if let Some(SyncInfo { 217 + latest_at, 218 + upstream_status, 219 + }) = sync_info 220 + { 221 + // mirror mode 222 + let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 + if !ok { 224 + overall_status = StatusCode::BAD_GATEWAY; 225 + } 226 + let latest = latest_at.get().await.ok(); 227 + ( 228 + overall_status, 229 + Json(serde_json::json!({ 230 + "server": "allegedly (mirror)", 231 + "version": env!("CARGO_PKG_VERSION"), 232 + "wrapped_plc": wrapped_status, 233 + "upstream_plc": upstream_status, 234 + "latest_at": latest, 235 + })), 236 + ) 237 + } else { 238 + // wrap mode 239 + ( 240 + overall_status, 241 + Json(serde_json::json!({ 242 + "server": "allegedly (mirror)", 243 + "version": env!("CARGO_PKG_VERSION"), 244 + "wrapped_plc": wrapped_status, 245 + })), 246 + ) 247 } 248 + } 249 + 250 + fn proxy_response(res: reqwest::Response) -> Response { 251 + let http_res: poem::http::Response<reqwest::Body> = res.into(); 252 + let (parts, reqw_body) = http_res.into_parts(); 253 + 254 + let parts = poem::ResponseParts { 255 + status: parts.status, 256 + version: parts.version, 257 + headers: parts.headers, 258 + extensions: parts.extensions, 259 + }; 260 + 261 + let body = http_body_util::BodyDataStream::new(reqw_body) 262 + .map_err(|e| std::io::Error::other(Box::new(e))); 263 + 264 + Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 265 } 266 267 #[handler] 268 + async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 269 let mut target = state.plc.clone(); 270 target.set_path(req.uri().path()); 271 + target.set_query(req.uri().query()); 272 + let wrapped_res = state 273 .client 274 .get(target) 275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server ··· 278 .await 279 .map_err(|e| { 280 log::error!("upstream req fail: {e}"); 281 + Error::from_string( 282 + failed_to_reach_named("wrapped reference PLC"), 283 + StatusCode::BAD_GATEWAY, 284 + ) 285 })?; 286 287 + Ok(proxy_response(wrapped_res)) 288 + } 289 290 + #[handler] 291 + async fn forward_create_op_upstream( 292 + Data(State { 293 + upstream, 294 + client, 295 + experimental, 296 + .. 297 + }): Data<&State>, 298 + Path(did): Path<String>, 299 + req: &Request, 300 + body: Body, 301 + ) -> Result<Response> { 302 + if let Some(expected_domain) = &experimental.acme_domain { 303 + let Some(found_host) = req.uri().host() else { 304 + return Ok(bad_create_op(&format!( 305 + "missing `Host` header, expected {expected_domain:?} for experimental requests." 306 + ))); 307 + }; 308 + if found_host != expected_domain { 309 + return Ok(bad_create_op(&format!( 310 + "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}" 311 + ))); 312 + } 313 + } 314 315 + // adjust proxied headers 316 + let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 317 + log::trace!("original request headers: {headers:?}"); 318 + headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 319 + let client_ua = headers 320 + .get(USER_AGENT) 321 + .map(|h| h.to_str().unwrap()) 322 + .unwrap_or("unknown"); 323 + headers.insert( 324 + USER_AGENT, 325 + format!("{UA} (forwarding from {client_ua:?})") 326 + .parse() 327 + .unwrap(), 328 + ); 329 + log::trace!("adjusted request headers: {headers:?}"); 330 331 + let mut target = upstream.clone(); 332 + target.set_path(&did); 333 + let upstream_res = client 334 + .post(target) 335 + .timeout(Duration::from_secs(15)) // be a little generous 336 + .headers(headers) 337 + .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 338 + .send() 339 + .await 340 + .map_err(|e| { 341 + log::warn!("upstream write fail: {e}"); 342 + Error::from_string( 343 + failed_to_reach_named("upstream PLC"), 344 + StatusCode::BAD_GATEWAY, 345 + ) 346 + })?; 347 + 348 + Ok(proxy_response(upstream_res)) 349 } 350 351 #[handler] ··· 357 358 Sorry, this server does not accept POST requests. 359 360 + You may wish to try sending that to our upstream: {upstream}. 361 + 362 + If you operate this server, try running with `--experimental-write-upstream`. 363 "#, 364 logo("mirror (nope)") 365 ), ··· 372 domains: Vec<String>, 373 cache_path: PathBuf, 374 directory_url: String, 375 + ipv6: bool, 376 }, 377 Bind(SocketAddr), 378 } 379 380 + #[derive(Debug, Clone)] 381 + pub struct ExperimentalConf { 382 + pub acme_domain: Option<String>, 383 + pub write_upstream: bool, 384 + } 385 + 386 + pub async fn serve( 387 + upstream: Url, 388 + plc: Url, 389 + listen: ListenConf, 390 + experimental: ExperimentalConf, 391 + db: Option<Db>, 392 + ) -> anyhow::Result<&'static str> { 393 log::info!("starting server..."); 394 395 // not using crate CLIENT: don't want the retries etc ··· 399 .build() 400 .expect("reqwest client to build"); 401 402 + // when `db` is None, we're running in wrap mode. no db access, no upstream sync 403 + let sync_info = db.map(|db| SyncInfo { 404 + latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 405 + upstream_status: CachedValue::new( 406 + CheckUpstream(upstream.clone(), client.clone()), 407 + Duration::from_secs(6), 408 + ), 409 + }); 410 + 411 let state = State { 412 client, 413 plc, 414 upstream: upstream.clone(), 415 + sync_info, 416 + experimental: experimental.clone(), 417 }; 418 419 + let mut app = Route::new() 420 .at("/", get(hello)) 421 .at("/favicon.ico", get(favicon)) 422 .at("/_health", get(health)) 423 + .at("/export", get(proxy)); 424 + 425 + if experimental.write_upstream { 426 + log::info!("enabling experimental write forwarding to upstream"); 427 + 428 + let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 429 + let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); 430 + 431 + let upstream_proxier = forward_create_op_upstream 432 + .with(GovernorMiddleware::new(did_limiter)) 433 + .with(GovernorMiddleware::new(ip_limiter)); 434 + 435 + app = app.at("/did:plc:*", get(proxy).post(upstream_proxier)); 436 + } else { 437 + app = app.at("/did:plc:*", get(proxy).post(nope)); 438 + } 439 + 440 + let app = app 441 .with(AddData::new(state)) 442 .with(Cors::new().allow_credentials(false)) 443 .with(Compression::new()) 444 + .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute( 445 3000.try_into().expect("ratelimit middleware to build"), 446 + )))) 447 .with(CatchPanic::new()) 448 .with(Tracing); 449 ··· 452 domains, 453 cache_path, 454 directory_url, 455 + ipv6, 456 } => { 457 rustls::crypto::aws_lc_rs::default_provider() 458 .install_default() ··· 466 } 467 let auto_cert = auto_cert.build().expect("acme config to build"); 468 469 + log::trace!("auto_cert: {auto_cert:?}"); 470 + 471 + let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 472 + let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 473 + let app_res = run(app, listener.acme(auto_cert)).await; 474 log::warn!("server task ended, aborting insecure server task..."); 475 notice_task.abort(); 476 app_res?; ··· 494 } 495 496 /// kick off a tiny little server on a tokio task to tell people to use 443 497 + async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 498 #[handler] 499 fn oop_plz_be_secure() -> (StatusCode, String) { 500 ( ··· 510 } 511 512 let app = Route::new() 513 .at("/favicon.ico", get(favicon)) 514 + .nest("/", get(oop_plz_be_secure)) 515 .with(Tracing); 516 + Server::new(TcpListener::bind(if ipv6 { 517 + "[::]:80" 518 + } else { 519 + "0.0.0.0:80" 520 + })) 521 + .name("allegedly (mirror:80 helper)") 522 + .run(app) 523 + .await 524 }
+53 -8
src/poll.rs
··· 4 use thiserror::Error; 5 use tokio::sync::mpsc; 6 7 - // plc.directory ratelimit on /export is 500 per 5 mins 8 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 - 10 #[derive(Debug, Error)] 11 pub enum GetPageError { 12 #[error(transparent)] ··· 54 } 55 } 56 57 - /// PLC 58 #[derive(Debug, PartialEq)] 59 pub struct PageBoundaryState { 60 pub last_at: Dt, 61 keys_at: Vec<OpKey>, // expected to ~always be length one 62 } 63 64 - /// track keys at final createdAt to deduplicate the start of the next page 65 impl PageBoundaryState { 66 pub fn new(page: &ExportPage) -> Option<Self> { 67 // grab the very last op 68 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; ··· 78 79 Some(me) 80 } 81 fn apply_to_next(&mut self, page: &mut ExportPage) { 82 // walk ops forward, kicking previously-seen ops until created_at advances 83 let to_remove: Vec<usize> = page ··· 127 } 128 } 129 130 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 131 log::trace!("Getting page: {url}"); 132 ··· 141 .split('\n') 142 .filter_map(|s| { 143 serde_json::from_str::<Op>(s) 144 - .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 145 .ok() 146 }) 147 .collect(); ··· 151 Ok((ExportPage { ops }, last_op)) 152 } 153 154 pub async fn poll_upstream( 155 after: Option<Dt>, 156 base: Url, 157 dest: mpsc::Sender<ExportPage>, 158 ) -> anyhow::Result<&'static str> { 159 - log::info!("starting upstream poller after {after:?}"); 160 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 161 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 let mut boundary_state: Option<PageBoundaryState> = None; 163 loop {
··· 4 use thiserror::Error; 5 use tokio::sync::mpsc; 6 7 #[derive(Debug, Error)] 8 pub enum GetPageError { 9 #[error(transparent)] ··· 51 } 52 } 53 54 + /// State for removing duplicates ops between PLC export page boundaries 55 #[derive(Debug, PartialEq)] 56 pub struct PageBoundaryState { 57 + /// The previous page's last timestamp 58 + /// 59 + /// Duplicate ops from /export only occur for the same exact timestamp 60 pub last_at: Dt, 61 + /// The previous page's ops at its last timestamp 62 keys_at: Vec<OpKey>, // expected to ~always be length one 63 } 64 65 impl PageBoundaryState { 66 + /// Initialize the boundary state with a PLC page 67 pub fn new(page: &ExportPage) -> Option<Self> { 68 // grab the very last op 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; ··· 79 80 Some(me) 81 } 82 + /// Apply the deduplication and update state 83 + /// 84 + /// The beginning of the page will be modified to remove duplicates from the 85 + /// previous page. 86 + /// 87 + /// The end of the page is inspected to update the deduplicator state for 88 + /// the next page. 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 90 // walk ops forward, kicking previously-seen ops until created_at advances 91 let to_remove: Vec<usize> = page ··· 135 } 136 } 137 138 + /// Get one PLC export page 139 + /// 140 + /// Extracts the final op so it can be used to fetch the following page 141 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 log::trace!("Getting page: {url}"); 143 ··· 152 .split('\n') 153 .filter_map(|s| { 154 serde_json::from_str::<Op>(s) 155 + .inspect_err(|e| { 156 + if !s.is_empty() { 157 + log::warn!("failed to parse op: {e} ({s})") 158 + } 159 + }) 160 .ok() 161 }) 162 .collect(); ··· 166 Ok((ExportPage { ops }, last_op)) 167 } 168 169 + /// Poll an upstream PLC server for new ops 170 + /// 171 + /// Pages of operations are written to the `dest` channel. 172 + /// 173 + /// ```no_run 174 + /// # #[tokio::main] 175 + /// # async fn main() { 176 + /// use allegedly::{ExportPage, Op, poll_upstream}; 177 + /// 178 + /// let after = Some(chrono::Utc::now()); 179 + /// let upstream = "https://plc.wtf/export".parse().unwrap(); 180 + /// let throttle = std::time::Duration::from_millis(300); 181 + /// 182 + /// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 183 + /// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 184 + /// 185 + /// while let Some(ExportPage { ops }) = rx.recv().await { 186 + /// println!("received {} plc ops", ops.len()); 187 + /// 188 + /// for Op { did, cid, operation, .. } in ops { 189 + /// // in this example we're alerting when changes are found for one 190 + /// // specific identity 191 + /// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 192 + /// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 193 + /// } 194 + /// } 195 + /// } 196 + /// # } 197 + /// ``` 198 pub async fn poll_upstream( 199 after: Option<Dt>, 200 base: Url, 201 + throttle: Duration, 202 dest: mpsc::Sender<ExportPage>, 203 ) -> anyhow::Result<&'static str> { 204 + log::info!("starting upstream poller at {base} after {after:?}"); 205 + let mut tick = tokio::time::interval(throttle); 206 let mut prev_last: Option<LastOp> = after.map(Into::into); 207 let mut boundary_state: Option<PageBoundaryState> = None; 208 loop {
+88 -33
src/ratelimit.rs
··· 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 use std::{ 10 convert::TryInto, 11 net::{IpAddr, Ipv6Addr}, 12 sync::{Arc, LazyLock}, 13 time::Duration, ··· 20 type IP6_56 = [u8; 7]; 21 type IP6_48 = [u8; 6]; 22 23 fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 24 let period = quota.replenish_interval() / factor; 25 let burst = quota ··· 30 } 31 32 #[derive(Debug)] 33 - struct IpLimiters { 34 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 35 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 36 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, ··· 44 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 } 46 } 47 - pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 48 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 49 match ip { 50 - addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 51 IpAddr::V6(a) => { 52 // always check all limiters 53 let check_ip = self ··· 74 } 75 } 76 } 77 } 78 79 /// Once the rate limit has been reached, the middleware will respond with 80 /// status code 429 (too many requests) and a `Retry-After` header with the amount 81 /// of time that needs to pass before another request will be allowed. 82 - #[derive(Debug)] 83 - pub struct GovernorMiddleware { 84 #[allow(dead_code)] 85 stop_on_drop: oneshot::Sender<()>, 86 - limiters: Arc<IpLimiters>, 87 } 88 89 - impl GovernorMiddleware { 90 /// Limit request rates 91 /// 92 /// a little gross but this spawns a tokio task for housekeeping: 93 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 94 - pub fn new(quota: Quota) -> Self { 95 - let limiters = Arc::new(IpLimiters::new(quota)); 96 let (stop_on_drop, mut stopped) = oneshot::channel(); 97 tokio::task::spawn({ 98 let limiters = limiters.clone(); ··· 102 _ = &mut stopped => break, 103 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 104 }; 105 - log::debug!( 106 - "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 107 - limiters.per_ip.len(), 108 - limiters.ip6_56.len(), 109 - limiters.ip6_48.len(), 110 - ); 111 - limiters.per_ip.retain_recent(); 112 - limiters.ip6_56.retain_recent(); 113 - limiters.ip6_48.retain_recent(); 114 } 115 } 116 }); ··· 121 } 122 } 123 124 - impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 125 - type Output = GovernorMiddlewareImpl<E>; 126 fn transform(&self, ep: E) -> Self::Output { 127 GovernorMiddlewareImpl { 128 ep, ··· 131 } 132 } 133 134 - pub struct GovernorMiddlewareImpl<E> { 135 ep: E, 136 - limiters: Arc<IpLimiters>, 137 } 138 139 - impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 140 type Output = E::Output; 141 142 async fn call(&self, req: Request) -> Result<Self::Output> { 143 - let remote = req 144 - .remote_addr() 145 - .as_socket_addr() 146 - .expect("failed to get request's remote addr") // TODO 147 - .ip(); 148 149 - log::trace!("remote: {remote}"); 150 - 151 - match self.limiters.check_key(remote) { 152 Ok(_) => { 153 - log::debug!("allowing remote {remote}"); 154 self.ep.call(req).await 155 } 156 Err(d) => { 157 let wait_time = d.as_secs(); 158 159 - log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 160 161 let res = Response::builder() 162 .status(StatusCode::TOO_MANY_REQUESTS)
··· 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 use std::{ 10 convert::TryInto, 11 + hash::Hash, 12 net::{IpAddr, Ipv6Addr}, 13 sync::{Arc, LazyLock}, 14 time::Duration, ··· 21 type IP6_56 = [u8; 7]; 22 type IP6_48 = [u8; 6]; 23 24 + pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static { 25 + fn extract_key(&self, req: &Request) -> Result<K>; 26 + fn check_key(&self, ip: &K) -> Result<(), Duration>; 27 + fn housekeep(&self); 28 + } 29 + 30 fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 31 let period = quota.replenish_interval() / factor; 32 let burst = quota ··· 37 } 38 39 #[derive(Debug)] 40 + pub struct CreatePlcOpLimiter { 41 + limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>, 42 + } 43 + 44 + impl CreatePlcOpLimiter { 45 + pub fn new(quota: Quota) -> Self { 46 + Self { 47 + limiter: RateLimiter::keyed(quota), 48 + } 49 + } 50 + } 51 + 52 + /// this must be used with an endpoint with a single path param for the did 53 + impl Limiter<String> for CreatePlcOpLimiter { 54 + fn extract_key(&self, req: &Request) -> Result<String> { 55 + let (did,) = req.path_params::<(String,)>()?; 56 + Ok(did) 57 + } 58 + fn check_key(&self, did: &String) -> Result<(), Duration> { 59 + self.limiter 60 + .check_key(did) 61 + .map_err(|e| e.wait_time_from(CLOCK.now())) 62 + } 63 + fn housekeep(&self) { 64 + log::debug!( 65 + "limiter size before housekeeping: {} dids", 66 + self.limiter.len() 67 + ); 68 + self.limiter.retain_recent(); 69 + } 70 + } 71 + 72 + #[derive(Debug)] 73 + pub struct IpLimiters { 74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, ··· 84 ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 85 } 86 } 87 + } 88 + 89 + impl Limiter<IpAddr> for IpLimiters { 90 + fn extract_key(&self, req: &Request) -> Result<IpAddr> { 91 + Ok(req 92 + .remote_addr() 93 + .as_socket_addr() 94 + .expect("failed to get request's remote addr") // TODO 95 + .ip()) 96 + } 97 + fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> { 98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 99 match ip { 100 + addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf), 101 IpAddr::V6(a) => { 102 // always check all limiters 103 let check_ip = self ··· 124 } 125 } 126 } 127 + fn housekeep(&self) { 128 + log::debug!( 129 + "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 130 + self.per_ip.len(), 131 + self.ip6_56.len(), 132 + self.ip6_48.len(), 133 + ); 134 + self.per_ip.retain_recent(); 135 + self.ip6_56.retain_recent(); 136 + self.ip6_48.retain_recent(); 137 + } 138 } 139 140 /// Once the rate limit has been reached, the middleware will respond with 141 /// status code 429 (too many requests) and a `Retry-After` header with the amount 142 /// of time that needs to pass before another request will be allowed. 143 + // #[derive(Debug)] 144 + pub struct GovernorMiddleware<K> { 145 #[allow(dead_code)] 146 stop_on_drop: oneshot::Sender<()>, 147 + limiters: Arc<dyn Limiter<K>>, 148 } 149 150 + impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> { 151 /// Limit request rates 152 /// 153 /// a little gross but this spawns a tokio task for housekeeping: 154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 155 + pub fn new(limiters: impl Limiter<K>) -> Self { 156 + let limiters = Arc::new(limiters); 157 let (stop_on_drop, mut stopped) = oneshot::channel(); 158 tokio::task::spawn({ 159 let limiters = limiters.clone(); ··· 163 _ = &mut stopped => break, 164 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 165 }; 166 + limiters.housekeep(); 167 } 168 } 169 }); ··· 174 } 175 } 176 177 + impl<E, K> Middleware<E> for GovernorMiddleware<K> 178 + where 179 + E: Endpoint, 180 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 181 + { 182 + type Output = GovernorMiddlewareImpl<E, K>; 183 fn transform(&self, ep: E) -> Self::Output { 184 GovernorMiddlewareImpl { 185 ep, ··· 188 } 189 } 190 191 + pub struct GovernorMiddlewareImpl<E, K> { 192 ep: E, 193 + limiters: Arc<dyn Limiter<K>>, 194 } 195 196 + impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K> 197 + where 198 + E: Endpoint, 199 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 200 + { 201 type Output = E::Output; 202 203 async fn call(&self, req: Request) -> Result<Self::Output> { 204 + let key = self.limiters.extract_key(&req)?; 205 206 + match self.limiters.check_key(&key) { 207 Ok(_) => { 208 + log::debug!("allowing key {key:?}"); 209 self.ep.call(req).await 210 } 211 Err(d) => { 212 let wait_time = d.as_secs(); 213 214 + log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 215 216 let res = Response::builder() 217 .status(StatusCode::TOO_MANY_REQUESTS)