Server tools to backfill, tail, mirror, and verify PLC logs

Compare changes

Choose any two refs to compare.

+1606 -694
+178 -1
Cargo.lock
··· 28 29 [[package]] 30 name = "allegedly" 31 - version = "0.2.0" 32 dependencies = [ 33 "anyhow", 34 "async-compression", ··· 38 "governor", 39 "http-body-util", 40 "log", 41 "poem", 42 "reqwest", 43 "reqwest-middleware", 44 "reqwest-retry", ··· 50 "tokio-postgres", 51 "tokio-stream", 52 "tokio-util", 53 "tracing-subscriber", 54 ] 55 ··· 1582 ] 1583 1584 [[package]] 1585 name = "parking_lot" 1586 version = "0.11.2" 1587 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1664 ] 1665 1666 [[package]] 1667 name = "pin-project-lite" 1668 version = "0.2.16" 1669 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1742 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1743 1744 [[package]] 1745 name = "postgres-protocol" 1746 version = "0.6.8" 1747 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1823 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1824 dependencies = [ 1825 "unicode-ident", 1826 ] 1827 1828 [[package]] ··· 2043 source = "registry+https://github.com/rust-lang/crates.io-index" 2044 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2045 dependencies = [ 2046 "base64", 2047 "bytes", 2048 "encoding_rs", 2049 "futures-core", 2050 "futures-util", 2051 "h2", ··· 2788 ] 2789 2790 [[package]] 2791 name = "tower" 2792 version = "0.5.2" 2793 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2873 "log", 2874 "once_cell", 2875 "tracing-core", 2876 ] 2877 2878 [[package]]
··· 28 29 [[package]] 30 name = "allegedly" 31 + version = "0.3.3" 32 dependencies = [ 33 "anyhow", 34 "async-compression", ··· 38 "governor", 39 "http-body-util", 40 "log", 41 + "native-tls", 42 + "opentelemetry", 43 + "opentelemetry-otlp", 44 + "opentelemetry_sdk", 45 "poem", 46 + "postgres-native-tls", 47 "reqwest", 48 "reqwest-middleware", 49 "reqwest-retry", ··· 55 "tokio-postgres", 56 "tokio-stream", 57 "tokio-util", 58 + "tracing", 59 + "tracing-opentelemetry", 60 "tracing-subscriber", 61 ] 62 ··· 1589 ] 1590 1591 [[package]] 1592 + name = "opentelemetry" 1593 + version = "0.30.0" 1594 + source = "registry+https://github.com/rust-lang/crates.io-index" 1595 + checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" 1596 + dependencies = [ 1597 + "futures-core", 1598 + "futures-sink", 1599 + "js-sys", 1600 + "pin-project-lite", 1601 + "thiserror 2.0.16", 1602 + "tracing", 1603 + ] 1604 + 1605 + [[package]] 1606 + name = "opentelemetry-http" 1607 + version = "0.30.0" 1608 + source = "registry+https://github.com/rust-lang/crates.io-index" 1609 + checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" 1610 + dependencies = [ 1611 + "async-trait", 1612 + "bytes", 1613 + "http", 1614 + "opentelemetry", 1615 + "reqwest", 1616 + ] 1617 + 1618 + [[package]] 1619 + name = "opentelemetry-otlp" 1620 + version = "0.30.0" 1621 + source = "registry+https://github.com/rust-lang/crates.io-index" 1622 + checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" 1623 + dependencies = [ 1624 + "http", 1625 + "opentelemetry", 1626 + "opentelemetry-http", 1627 + "opentelemetry-proto", 1628 + "opentelemetry_sdk", 1629 + "prost", 1630 + "reqwest", 1631 + "thiserror 2.0.16", 1632 + "tracing", 1633 + ] 1634 + 1635 + [[package]] 1636 + name = "opentelemetry-proto" 1637 + version = "0.30.0" 1638 + source = "registry+https://github.com/rust-lang/crates.io-index" 1639 + checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" 1640 + dependencies = [ 1641 + "opentelemetry", 1642 + "opentelemetry_sdk", 1643 + "prost", 1644 + "tonic", 1645 + ] 1646 + 1647 + [[package]] 1648 + name = "opentelemetry_sdk" 1649 + version = "0.30.0" 1650 + source = "registry+https://github.com/rust-lang/crates.io-index" 1651 + checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" 1652 + dependencies = [ 1653 + "futures-channel", 1654 + "futures-executor", 1655 + "futures-util", 1656 + "opentelemetry", 1657 + "percent-encoding", 1658 + "rand 0.9.2", 1659 + "serde_json", 1660 + "thiserror 2.0.16", 1661 + "tokio", 1662 + "tokio-stream", 1663 + ] 1664 + 1665 + [[package]] 1666 name = "parking_lot" 1667 version = "0.11.2" 1668 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1745 ] 1746 1747 [[package]] 1748 + name = "pin-project" 1749 + version = "1.1.10" 1750 + source = "registry+https://github.com/rust-lang/crates.io-index" 1751 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1752 + dependencies = [ 1753 + "pin-project-internal", 1754 + ] 1755 + 1756 + [[package]] 1757 + name = "pin-project-internal" 1758 + version = "1.1.10" 1759 + source = "registry+https://github.com/rust-lang/crates.io-index" 1760 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1761 + dependencies = [ 1762 + "proc-macro2", 1763 + "quote", 1764 + "syn", 1765 + ] 1766 + 1767 + [[package]] 1768 name = "pin-project-lite" 1769 version = "0.2.16" 1770 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1843 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1844 1845 [[package]] 1846 + name = "postgres-native-tls" 1847 + version = "0.5.1" 1848 + source = "registry+https://github.com/rust-lang/crates.io-index" 1849 + checksum = "a1f39498473c92f7b6820ae970382c1d83178a3454c618161cb772e8598d9f6f" 1850 + dependencies = [ 1851 + "native-tls", 1852 + "tokio", 1853 + "tokio-native-tls", 1854 + "tokio-postgres", 1855 + ] 1856 + 1857 + [[package]] 1858 name = "postgres-protocol" 1859 version = "0.6.8" 1860 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1936 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1937 dependencies = [ 1938 "unicode-ident", 1939 + ] 1940 + 1941 + [[package]] 1942 + name = "prost" 1943 + version = "0.13.5" 1944 + source = "registry+https://github.com/rust-lang/crates.io-index" 1945 + checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 1946 + dependencies = [ 1947 + "bytes", 1948 + "prost-derive", 1949 + ] 1950 + 1951 + [[package]] 1952 + name = "prost-derive" 1953 + version = "0.13.5" 1954 + source = "registry+https://github.com/rust-lang/crates.io-index" 1955 + checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" 1956 + dependencies = [ 1957 + "anyhow", 1958 + "itertools", 1959 + "proc-macro2", 1960 + "quote", 1961 + "syn", 1962 ] 1963 1964 [[package]] ··· 2179 source = "registry+https://github.com/rust-lang/crates.io-index" 2180 checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" 2181 dependencies = [ 2182 + "async-compression", 2183 "base64", 2184 "bytes", 2185 "encoding_rs", 2186 + "futures-channel", 2187 "futures-core", 2188 "futures-util", 2189 "h2", ··· 2926 ] 2927 2928 [[package]] 2929 + name = "tonic" 2930 + version = "0.13.1" 2931 + source = "registry+https://github.com/rust-lang/crates.io-index" 2932 + checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" 2933 + dependencies = [ 2934 + "async-trait", 2935 + "base64", 2936 + "bytes", 2937 + "http", 2938 + "http-body", 2939 + "http-body-util", 2940 + "percent-encoding", 2941 + "pin-project", 2942 + "prost", 2943 + "tokio-stream", 2944 + "tower-layer", 2945 + "tower-service", 2946 + "tracing", 2947 + ] 2948 + 2949 + [[package]] 2950 name = "tower" 2951 version = "0.5.2" 2952 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3032 "log", 3033 "once_cell", 3034 "tracing-core", 3035 + ] 3036 + 3037 + [[package]] 3038 + name = "tracing-opentelemetry" 3039 + version = "0.31.0" 3040 + source = "registry+https://github.com/rust-lang/crates.io-index" 3041 + checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" 3042 + dependencies = [ 3043 + "js-sys", 3044 + "once_cell", 3045 + "opentelemetry", 3046 + "opentelemetry_sdk", 3047 + "smallvec", 3048 + "tracing", 3049 + "tracing-core", 3050 + "tracing-log", 3051 + "tracing-subscriber", 3052 + "web-time", 3053 ] 3054 3055 [[package]]
+9 -2
Cargo.toml
··· 2 name = "allegedly" 3 description = "public ledger server tools and services (for the PLC)" 4 license = "MIT OR Apache-2.0" 5 - version = "0.2.0" 6 edition = "2024" 7 default-run = "allegedly" 8 ··· 15 governor = "0.10.1" 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 poem = { version = "3.1.12", features = ["acme", "compression"] } 19 - reqwest = { version = "0.12.23", features = ["stream", "json"] } 20 reqwest-middleware = "0.4.2" 21 reqwest-retry = "0.7.0" 22 rustls = "0.23.32" ··· 27 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 28 tokio-stream = { version = "0.1.17", features = ["io-util"] } 29 tokio-util = { version = "0.7.16", features = ["compat"] } 30 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
··· 2 name = "allegedly" 3 description = "public ledger server tools and services (for the PLC)" 4 license = "MIT OR Apache-2.0" 5 + version = "0.3.3" 6 edition = "2024" 7 default-run = "allegedly" 8 ··· 15 governor = "0.10.1" 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 + native-tls = "0.2.14" 19 + opentelemetry = "0.30.0" 20 + opentelemetry-otlp = { version = "0.30.0" } 21 + opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 22 poem = { version = "3.1.12", features = ["acme", "compression"] } 23 + postgres-native-tls = "0.5.1" 24 + reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } 25 reqwest-middleware = "0.4.2" 26 reqwest-retry = "0.7.0" 27 rustls = "0.23.32" ··· 32 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 33 tokio-stream = { version = "0.1.17", features = ["io-util"] } 34 tokio-util = { version = "0.7.16", features = ["compat"] } 35 + tracing = "0.1.41" 36 + tracing-opentelemetry = "0.31.0" 37 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+37
examples/poll.rs
···
··· 1 + use allegedly::{ExportPage, poll_upstream}; 2 + 3 + #[tokio::main] 4 + async fn main() { 5 + // set to `None` to replay from the beginning of the PLC history 6 + let after = Some(chrono::Utc::now()); 7 + 8 + // the PLC server to poll for new ops 9 + let upstream = "https://plc.wtf/export".parse().unwrap(); 10 + 11 + // self-rate-limit (plc.directory's limit interval is 600ms) 12 + let throttle = std::time::Duration::from_millis(300); 13 + 14 + // pages are sent out of the poller via a tokio mpsc channel 15 + let (tx, mut rx) = tokio::sync::mpsc::channel(1); 16 + 17 + // spawn a tokio task to run the poller 18 + tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 19 + 20 + // receive pages of plc ops from the poller 21 + while let Some(ExportPage { ops }) = rx.recv().await { 22 + println!("received {} plc ops", ops.len()); 23 + 24 + for op in ops { 25 + // in this example we're alerting when changes are found for one 26 + // specific identity 27 + if op.did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 28 + println!( 29 + "Update found for {}! cid={}\n -> operation: {}", 30 + op.did, 31 + op.cid, 32 + op.operation.get() 33 + ); 34 + } 35 + } 36 + } 37 + }
favicon.ico

This is a binary file and will not be displayed.

+26 -8
readme.md
··· 26 sudo allegedly mirror \ 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 --acme-domain "plc.wtf" \ 30 - --acme-cache-dir ./acme-cache \ 31 - --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 ``` 33 34 ··· 61 - monitoring of the various tasks 62 - health check pings 63 - expose metrics/tracing 64 - - read-only flag for mirror wrapper 65 - bundle: write directly to s3-compatible object storage 66 - helpers for automating periodic `bundle` runs 67 68 69 ### new things 70 71 - - experimental: websocket version of /export 72 - - experimental: accept writes by forwarding them upstream 73 - - experimental: serve a tlog 74 - - experimental: embed a log database directly for fast and efficient mirroring 75 - - experimental: support multiple upstreams? 76 77 - [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range 78 - [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
··· 26 sudo allegedly mirror \ 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 + --wrap-pg-cert "/opt/allegedly/postgres-cert.pem" \ 30 --acme-domain "plc.wtf" \ 31 + --acme-domain "alt.plc.wtf" \ 32 + --experimental-acme-domain "experimental.plc.wtf" \ 33 + --acme-cache-path ./acme-cache \ 34 + --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" \ 35 + --acme-ipv6 \ 36 + --experimental-write-upstream 37 + ``` 38 + 39 + - Reverse-proxy to any PLC server, terminating TLS and forwarding writes upstream 40 + 41 + ```bash 42 + sudo allegedly wrap \ 43 + --wrap "http://127.0.0.1:3000" \ 44 + --acme-ipv6 \ 45 + --acme-cache-path ./acme-cache \ 46 + --acme-domain "plc.wtf" \ 47 + --experimental-acme-domain "experimental.plc.wtf" \ 48 + --experimental-write-upstream \ 49 + --upstream "https://plc.wtf" \ 50 ``` 51 52 ··· 79 - monitoring of the various tasks 80 - health check pings 81 - expose metrics/tracing 82 + - [x] read-only flag for mirror wrapper 83 - bundle: write directly to s3-compatible object storage 84 - helpers for automating periodic `bundle` runs 85 86 87 ### new things 88 89 + - [ ] experimental: websocket version of /export 90 + - [x] experimental: accept writes by forwarding them upstream 91 + - [ ] experimental: serve a tlog 92 + - [ ] experimental: embed a log database directly for fast and efficient mirroring 93 + - [ ] experimental: support multiple upstreams? 94 95 - [ ] new command todo: `zip` or `check` or `diff`: compare two plc logs over some time range 96 - [ ] new command to consider: `scatter` or something: broadcast plc writes to multiple upstreams
+12 -5
src/backfill.rs
··· 13 dest: mpsc::Sender<ExportPage>, 14 source_workers: usize, 15 until: Option<Dt>, 16 - ) -> anyhow::Result<()> { 17 // queue up the week bundles that should be available 18 let weeks = Arc::new(Mutex::new( 19 until ··· 39 while let Some(week) = weeks.lock().await.pop() { 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 - week_to_pages(source.clone(), week, dest.clone()).await?; 43 } 44 log::info!("done with the weeks ig"); 45 Ok(()) ··· 50 51 // wait for the big backfill to finish 52 while let Some(res) = workers.join_next().await { 53 - res??; 54 } 55 - log::info!("finished fetching backfill in {:?}", t_step.elapsed()); 56 - Ok(()) 57 }
··· 13 dest: mpsc::Sender<ExportPage>, 14 source_workers: usize, 15 until: Option<Dt>, 16 + ) -> anyhow::Result<&'static str> { 17 // queue up the week bundles that should be available 18 let weeks = Arc::new(Mutex::new( 19 until ··· 39 while let Some(week) = weeks.lock().await.pop() { 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 + week_to_pages(source.clone(), week, dest.clone()) 43 + .await 44 + .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?; 45 } 46 log::info!("done with the weeks ig"); 47 Ok(()) ··· 52 53 // wait for the big backfill to finish 54 while let Some(res) = workers.join_next().await { 55 + res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 56 + .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 57 } 58 + log::info!( 59 + "finished fetching backfill in {:?}. senders remaining: {}", 60 + t_step.elapsed(), 61 + dest.strong_count() 62 + ); 63 + Ok("backfill") 64 }
+65 -223
src/bin/allegedly.rs
··· 1 - use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill, 3 - backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve, 4 - }; 5 use clap::{CommandFactory, Parser, Subcommand}; 6 - use reqwest::Url; 7 - use std::{net::SocketAddr, path::PathBuf, time::Instant}; 8 - use tokio::sync::{mpsc, oneshot}; 9 10 #[derive(Debug, Parser)] 11 struct Cli { 12 - /// Upstream PLC server 13 - #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 14 - #[clap(default_value = "https://plc.directory")] 15 - upstream: Url, 16 #[command(subcommand)] 17 command: Commands, 18 } ··· 21 enum Commands { 22 /// Use weekly bundled ops to get a complete directory mirror FAST 23 Backfill { 24 - /// Remote URL prefix to fetch bundles from 25 - #[arg(long)] 26 - #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 27 - http: Url, 28 - /// Local folder to fetch bundles from (overrides `http`) 29 - #[arg(long)] 30 - dir: Option<PathBuf>, 31 - /// Parallel bundle fetchers 32 - /// 33 - /// Default: 4 for http fetches, 1 for local folder 34 - #[arg(long)] 35 - source_workers: Option<usize>, 36 - /// Bulk load into did-method-plc-compatible postgres instead of stdout 37 - /// 38 - /// Pass a postgres connection url like "postgresql://localhost:5432" 39 - #[arg(long)] 40 - to_postgres: Option<Url>, 41 - /// Delete all operations from the postgres db before starting 42 - /// 43 - /// only used if `--to-postgres` is present 44 - #[arg(long, action)] 45 - postgres_reset: bool, 46 - /// Stop at the week ending before this date 47 - #[arg(long)] 48 - until: Option<Dt>, 49 - /// After the weekly imports, poll upstream until we're caught up 50 - #[arg(long, action)] 51 - catch_up: bool, 52 }, 53 /// Scrape a PLC server, collecting ops into weekly bundles 54 /// ··· 73 }, 74 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 75 Mirror { 76 - /// the wrapped did-method-plc server 77 - #[arg(long, env = "ALLEGEDLY_WRAP")] 78 - wrap: Url, 79 - /// the wrapped did-method-plc server's database (write access required) 80 - #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 81 - wrap_pg: Url, 82 - /// wrapping server listen address 83 - #[arg(short, long, env = "ALLEGEDLY_BIND")] 84 - #[clap(default_value = "127.0.0.1:8000")] 85 - bind: SocketAddr, 86 - /// obtain a certificate from letsencrypt 87 - /// 88 - /// for now this will force listening on all interfaces at :80 and :443 89 - /// (:80 will serve an "https required" error, *will not* redirect) 90 - #[arg( 91 - long, 92 - conflicts_with("bind"), 93 - requires("acme_cache_path"), 94 - env = "ALLEGEDLY_ACME_DOMAIN" 95 - )] 96 - acme_domain: Vec<String>, 97 - /// which local directory to keep the letsencrypt certs in 98 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 99 - acme_cache_path: Option<PathBuf>, 100 - /// which public acme directory to use 101 - /// 102 - /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 103 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 104 - #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 105 - acme_directory_url: Url, 106 }, 107 /// Poll an upstream PLC server and log new ops to stdout 108 Tail { ··· 112 }, 113 } 114 115 - async fn pages_to_stdout( 116 - mut rx: mpsc::Receiver<ExportPage>, 117 - notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 118 - ) -> anyhow::Result<()> { 119 - let mut last_at = None; 120 - while let Some(page) = rx.recv().await { 121 - for op in &page.ops { 122 - println!("{op}"); 123 - } 124 - if notify_last_at.is_some() 125 - && let Some(s) = PageBoundaryState::new(&page) 126 - { 127 - last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 128 } 129 } 130 - if let Some(notify) = notify_last_at { 131 - log::trace!("notifying last_at: {last_at:?}"); 132 - if notify.send(last_at).is_err() { 133 - log::error!("receiver for last_at dropped, can't notify"); 134 - }; 135 - } 136 - Ok(()) 137 - } 138 - 139 - /// page forwarder who drops its channels on receipt of a small page 140 - /// 141 - /// PLC will return up to 1000 ops on a page, and returns full pages until it 142 - /// has caught up, so this is a (hacky?) way to stop polling once we're up. 143 - fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> { 144 - let (tx, fwd) = mpsc::channel(1); 145 - tokio::task::spawn(async move { 146 - while let Some(page) = rx.recv().await 147 - && page.ops.len() > 900 148 - { 149 - tx.send(page).await.unwrap(); 150 - } 151 - }); 152 - fwd 153 } 154 155 #[tokio::main] 156 - async fn main() { 157 let args = Cli::parse(); 158 let matches = Cli::command().get_matches(); 159 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 160 - bin_init(name); 161 162 let t0 = Instant::now(); 163 match args.command { 164 - Commands::Backfill { 165 - http, 166 - dir, 167 - source_workers, 168 - to_postgres, 169 - postgres_reset, 170 - until, 171 - catch_up, 172 - } => { 173 - let (tx, rx) = mpsc::channel(32); // these are big pages 174 - tokio::task::spawn(async move { 175 - if let Some(dir) = dir { 176 - log::info!("Reading weekly bundles from local folder {dir:?}"); 177 - backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until) 178 - .await 179 - .unwrap(); 180 - } else { 181 - log::info!("Fetching weekly bundles from from {http}"); 182 - backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until) 183 - .await 184 - .unwrap(); 185 - } 186 - }); 187 - 188 - // postgres writer will notify us as soon as the very last op's time is known 189 - // so we can start catching up while pg is restoring indexes and stuff 190 - let (notify_last_at, rx_last) = if catch_up { 191 - let (tx, rx) = oneshot::channel(); 192 - (Some(tx), Some(rx)) 193 - } else { 194 - (None, None) 195 - }; 196 - 197 - let to_postgres_url_bulk = to_postgres.clone(); 198 - let bulk_out_write = tokio::task::spawn(async move { 199 - if let Some(ref url) = to_postgres_url_bulk { 200 - let db = Db::new(url.as_str()).await.unwrap(); 201 - backfill_to_pg(db, postgres_reset, rx, notify_last_at) 202 - .await 203 - .unwrap(); 204 - } else { 205 - pages_to_stdout(rx, notify_last_at).await.unwrap(); 206 - } 207 - }); 208 - 209 - if let Some(rx_last) = rx_last { 210 - let mut upstream = args.upstream; 211 - upstream.set_path("/export"); 212 - // wait until the time for `after` is known 213 - let last_at = rx_last.await.unwrap(); 214 - log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff"); 215 - let (tx, rx) = mpsc::channel(256); // these are small pages 216 - tokio::task::spawn( 217 - async move { poll_upstream(last_at, upstream, tx).await.unwrap() }, 218 - ); 219 - bulk_out_write.await.unwrap(); 220 - log::info!("writing catch-up pages"); 221 - let full_pages = full_pages(rx); 222 - if let Some(url) = to_postgres { 223 - let db = Db::new(url.as_str()).await.unwrap(); 224 - pages_to_pg(db, full_pages).await.unwrap(); 225 - } else { 226 - pages_to_stdout(full_pages, None).await.unwrap(); 227 - } 228 - } 229 - } 230 Commands::Bundle { 231 dest, 232 after, 233 clobber, 234 } => { 235 - let mut url = args.upstream; 236 url.set_path("/export"); 237 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 238 - tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() }); 239 - log::trace!("ensuring output directory exists"); 240 - std::fs::create_dir_all(&dest).unwrap(); 241 - pages_to_weeks(rx, dest, clobber).await.unwrap(); 242 - } 243 - Commands::Mirror { 244 - wrap, 245 - wrap_pg, 246 - bind, 247 - acme_domain, 248 - acme_cache_path, 249 - acme_directory_url, 250 - } => { 251 - let db = Db::new(wrap_pg.as_str()).await.unwrap(); 252 - let latest = db 253 - .get_latest() 254 - .await 255 - .unwrap() 256 - .expect("there to be at least one op in the db. did you backfill?"); 257 - 258 - let (tx, rx) = mpsc::channel(2); 259 - // upstream poller 260 - let mut url = args.upstream.clone(); 261 tokio::task::spawn(async move { 262 - log::info!("starting poll reader..."); 263 - url.set_path("/export"); 264 - tokio::task::spawn( 265 - async move { poll_upstream(Some(latest), url, tx).await.unwrap() }, 266 - ); 267 - }); 268 - // db writer 269 - let poll_db = db.clone(); 270 - tokio::task::spawn(async move { 271 - log::info!("starting db writer..."); 272 - pages_to_pg(poll_db, rx).await.unwrap(); 273 }); 274 - 275 - let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 276 - (_, false, Some(cache_path)) => ListenConf::Acme { 277 - domains: acme_domain, 278 - cache_path, 279 - directory_url: acme_directory_url.to_string(), 280 - }, 281 - (bind, true, None) => ListenConf::Bind(bind), 282 - (_, _, _) => unreachable!(), 283 - }; 284 - 285 - serve(&args.upstream, wrap, listen_conf).await.unwrap(); 286 } 287 Commands::Tail { after } => { 288 - let mut url = args.upstream; 289 url.set_path("/export"); 290 let start_at = after.or_else(|| Some(chrono::Utc::now())); 291 let (tx, rx) = mpsc::channel(1); 292 - tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 293 - pages_to_stdout(rx, None).await.unwrap(); 294 } 295 } 296 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 297 }
··· 1 + use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2 + use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 3 use clap::{CommandFactory, Parser, Subcommand}; 4 + use std::{path::PathBuf, time::Duration, time::Instant}; 5 + use tokio::fs::create_dir_all; 6 + use tokio::sync::mpsc; 7 + 8 + mod backfill; 9 + mod mirror; 10 11 #[derive(Debug, Parser)] 12 struct Cli { 13 + #[command(flatten)] 14 + globals: GlobalArgs, 15 + 16 #[command(subcommand)] 17 command: Commands, 18 } ··· 21 enum Commands { 22 /// Use weekly bundled ops to get a complete directory mirror FAST 23 Backfill { 24 + #[command(flatten)] 25 + args: backfill::Args, 26 }, 27 /// Scrape a PLC server, collecting ops into weekly bundles 28 /// ··· 47 }, 48 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 49 Mirror { 50 + #[command(flatten)] 51 + args: mirror::Args, 52 + #[command(flatten)] 53 + instrumentation: InstrumentationArgs, 54 + }, 55 + /// Wrap any did-method-plc server, without syncing upstream (read-only) 56 + Wrap { 57 + #[command(flatten)] 58 + args: mirror::Args, 59 + #[command(flatten)] 60 + instrumentation: InstrumentationArgs, 61 }, 62 /// Poll an upstream PLC server and log new ops to stdout 63 Tail { ··· 67 }, 68 } 69 70 + impl Commands { 71 + fn enable_otel(&self) -> bool { 72 + match self { 73 + Commands::Mirror { 74 + instrumentation, .. 75 + } 76 + | Commands::Wrap { 77 + instrumentation, .. 78 + } => instrumentation.enable_opentelemetry, 79 + _ => false, 80 } 81 } 82 } 83 84 #[tokio::main] 85 + async fn main() -> anyhow::Result<()> { 86 let args = Cli::parse(); 87 let matches = Cli::command().get_matches(); 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 89 + bin_init(args.command.enable_otel()); 90 + log::info!("{}", logo(name)); 91 + 92 + let globals = args.globals.clone(); 93 94 let t0 = Instant::now(); 95 match args.command { 96 + Commands::Backfill { args } => backfill::run(globals, args).await?, 97 Commands::Bundle { 98 dest, 99 after, 100 clobber, 101 } => { 102 + let mut url = globals.upstream; 103 url.set_path("/export"); 104 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 105 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 106 tokio::task::spawn(async move { 107 + poll_upstream(Some(after), url, throttle, tx) 108 + .await 109 + .expect("to poll upstream") 110 }); 111 + log::trace!("ensuring output directory exists"); 112 + create_dir_all(&dest) 113 + .await 114 + .expect("to ensure output dir exists"); 115 + pages_to_weeks(rx, dest, clobber) 116 + .await 117 + .expect("to write bundles to output files"); 118 } 119 + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 121 Commands::Tail { after } => { 122 + let mut url = globals.upstream; 123 url.set_path("/export"); 124 let start_at = after.or_else(|| Some(chrono::Utc::now())); 125 + let throttle = Duration::from_millis(globals.upstream_throttle_ms); 126 let (tx, rx) = mpsc::channel(1); 127 + tokio::task::spawn(async move { 128 + poll_upstream(start_at, url, throttle, tx) 129 + .await 130 + .expect("to poll upstream") 131 + }); 132 + pages_to_stdout(rx, None) 133 + .await 134 + .expect("to write pages to stdout"); 135 } 136 } 137 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 138 + Ok(()) 139 }
+207
src/bin/backfill.rs
···
··· 1 + use allegedly::{ 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 + bin::{GlobalArgs, bin_init}, 4 + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 5 + }; 6 + use clap::Parser; 7 + use reqwest::Url; 8 + use std::{path::PathBuf, time::Duration}; 9 + use tokio::{ 10 + sync::{mpsc, oneshot}, 11 + task::JoinSet, 12 + }; 13 + 14 + pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 15 + 16 + #[derive(Debug, clap::Args)] 17 + pub struct Args { 18 + /// Remote URL prefix to fetch bundles from 19 + #[arg(long)] 20 + #[clap(default_value = DEFAULT_HTTP)] 21 + http: Url, 22 + /// Local folder to fetch bundles from (overrides `http`) 23 + #[arg(long)] 24 + dir: Option<PathBuf>, 25 + /// Don't do weekly bulk-loading at all. 26 + /// 27 + /// overrides `http` and `dir`, makes catch_up redundant 28 + #[arg(long, action)] 29 + no_bulk: bool, 30 + /// Parallel bundle fetchers 31 + /// 32 + /// Default: 4 for http fetches, 1 for local folder 33 + #[arg(long)] 34 + source_workers: Option<usize>, 35 + /// Bulk load into did-method-plc-compatible postgres instead of stdout 36 + /// 37 + /// Pass a postgres connection url like "postgresql://localhost:5432" 38 + #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 39 + to_postgres: Option<Url>, 40 + /// Cert for postgres (if needed) 41 + #[arg(long)] 42 + postgres_cert: Option<PathBuf>, 43 + /// Delete all operations from the postgres db before starting 44 + /// 45 + /// only used if `--to-postgres` is present 46 + #[arg(long, action)] 47 + postgres_reset: bool, 48 + /// Stop at the week ending before this date 49 + #[arg(long)] 50 + until: Option<Dt>, 51 + /// After the weekly imports, poll upstream until we're caught up 52 + #[arg(long, action)] 53 + catch_up: bool, 54 + } 55 + 56 + pub async fn run( 57 + GlobalArgs { 58 + upstream, 59 + upstream_throttle_ms, 60 + }: GlobalArgs, 61 + Args { 62 + http, 63 + dir, 64 + no_bulk, 65 + source_workers, 66 + to_postgres, 67 + postgres_cert, 68 + postgres_reset, 69 + until, 70 + catch_up, 71 + }: Args, 72 + ) -> anyhow::Result<()> { 73 + let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 74 + 75 + let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 76 + 77 + // a bulk sink can notify us as soon as the very last op's time is known 78 + // so we can start catching up while the sink might restore indexes and such 79 + let (found_last_tx, found_last_out) = if catch_up { 80 + let (tx, rx) = oneshot::channel(); 81 + (Some(tx), Some(rx)) 82 + } else { 83 + (None, None) 84 + }; 85 + 86 + let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 87 + let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 88 + 89 + // set up sources 90 + if no_bulk { 91 + // simple mode, just poll upstream from teh beginning 92 + if http != DEFAULT_HTTP.parse()? { 93 + log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 94 + } 95 + if let Some(d) = dir { 96 + log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 97 + } 98 + if let Some(u) = until { 99 + log::warn!( 100 + "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 101 + ); 102 + } 103 + let mut upstream = upstream; 104 + upstream.set_path("/export"); 105 + let throttle = Duration::from_millis(upstream_throttle_ms); 106 + tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 107 + tasks.spawn(full_pages(poll_out, full_tx)); 108 + tasks.spawn(pages_to_stdout(full_out, None)); 109 + } else { 110 + // fun mode 111 + 112 + // set up bulk sources 113 + if let Some(dir) = dir { 114 + if http != DEFAULT_HTTP.parse()? { 115 + anyhow::bail!( 116 + "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 117 + ); 118 + } 119 + tasks.spawn(backfill( 120 + FolderSource(dir), 121 + bulk_tx, 122 + source_workers.unwrap_or(1), 123 + until, 124 + )); 125 + } else { 126 + tasks.spawn(backfill( 127 + HttpSource(http), 128 + bulk_tx, 129 + source_workers.unwrap_or(4), 130 + until, 131 + )); 132 + } 133 + 134 + // and the catch-up source... 135 + if let Some(last) = found_last_out { 136 + let throttle = Duration::from_millis(upstream_throttle_ms); 137 + tasks.spawn(async move { 138 + let mut upstream = upstream; 139 + upstream.set_path("/export"); 140 + 141 + poll_upstream(last.await?, upstream, throttle, poll_tx).await 142 + }); 143 + } 144 + 145 + // set up sinks 146 + if let Some(pg_url) = to_postgres { 147 + log::trace!("connecting to postgres..."); 148 + let db = Db::new(pg_url.as_str(), postgres_cert).await?; 149 + log::trace!("connected to postgres"); 150 + 151 + tasks.spawn(backfill_to_pg( 152 + db.clone(), 153 + postgres_reset, 154 + bulk_out, 155 + found_last_tx, 156 + )); 157 + if catch_up { 158 + tasks.spawn(pages_to_pg(db, full_out)); 159 + } 160 + } else { 161 + tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 162 + if catch_up { 163 + tasks.spawn(pages_to_stdout(full_out, None)); 164 + } 165 + } 166 + } 167 + 168 + while let Some(next) = tasks.join_next().await { 169 + match next { 170 + Err(e) if e.is_panic() => { 171 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 172 + return Err(e.into()); 173 + } 174 + Err(e) => { 175 + log::error!("a joinset task failed to join: {e}"); 176 + return Err(e.into()); 177 + } 178 + Ok(Err(e)) => { 179 + log::error!("a joinset task completed with error: {e}"); 180 + return Err(e); 181 + } 182 + Ok(Ok(name)) => { 183 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 184 + } 185 + } 186 + } 187 + 188 + Ok(()) 189 + } 190 + 191 + #[derive(Debug, Parser)] 192 + struct CliArgs { 193 + #[command(flatten)] 194 + globals: GlobalArgs, 195 + #[command(flatten)] 196 + args: Args, 197 + } 198 + 199 + #[allow(dead_code)] 200 + #[tokio::main] 201 + async fn main() -> anyhow::Result<()> { 202 + let args = CliArgs::parse(); 203 + bin_init(false); 204 + log::info!("{}", logo("backfill")); 205 + run(args.globals, args.args).await?; 206 + Ok(()) 207 + }
+28
src/bin/instrumentation/mod.rs
···
··· 1 + use opentelemetry::trace::TracerProvider as _; 2 + use opentelemetry_otlp::{Protocol, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::registry::LookupSpan; 7 + 8 + pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer> 9 + where 10 + S: Subscriber + for<'span> LookupSpan<'span>, 11 + { 12 + let exporter = opentelemetry_otlp::SpanExporter::builder() 13 + .with_http() 14 + .with_protocol(Protocol::HttpBinary) 15 + .build() 16 + .expect("to build otel otlp exporter"); 17 + 18 + let provider = SdkTracerProvider::builder() 19 + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( 20 + 1.0, 21 + )))) 22 + .with_id_generator(RandomIdGenerator::default()) 23 + .with_batch_exporter(exporter) 24 + .build(); 25 + 26 + let tracer = provider.tracer("tracing-otel-subscriber"); 27 + tracing_opentelemetry::layer().with_tracer(tracer) 28 + }
+187
src/bin/mirror.rs
···
··· 1 + use allegedly::{ 2 + Db, ExperimentalConf, ListenConf, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + logo, pages_to_pg, poll_upstream, serve, 5 + }; 6 + use clap::Parser; 7 + use reqwest::Url; 8 + use std::{net::SocketAddr, path::PathBuf, time::Duration}; 9 + use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 10 + 11 + #[derive(Debug, clap::Args)] 12 + pub struct Args { 13 + /// the wrapped did-method-plc server 14 + #[arg(long, env = "ALLEGEDLY_WRAP")] 15 + wrap: Url, 16 + /// the wrapped did-method-plc server's database (write access required) 17 + #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 18 + wrap_pg: Option<Url>, 19 + /// path to tls cert for the wrapped postgres db, if needed 20 + #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 21 + wrap_pg_cert: Option<PathBuf>, 22 + /// wrapping server listen address 23 + #[arg(short, long, env = "ALLEGEDLY_BIND")] 24 + #[clap(default_value = "127.0.0.1:8000")] 25 + bind: SocketAddr, 26 + /// obtain a certificate from letsencrypt 27 + /// 28 + /// for now this will force listening on all interfaces at :80 and :443 29 + /// (:80 will serve an "https required" error, *will not* redirect) 30 + #[arg( 31 + long, 32 + conflicts_with("bind"), 33 + requires("acme_cache_path"), 34 + env = "ALLEGEDLY_ACME_DOMAIN" 35 + )] 36 + acme_domain: Vec<String>, 37 + /// which local directory to keep the letsencrypt certs in 38 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 39 + acme_cache_path: Option<PathBuf>, 40 + /// which public acme directory to use 41 + /// 42 + /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 43 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 44 + #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 45 + acme_directory_url: Url, 46 + /// try to listen for ipv6 47 + #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")] 48 + acme_ipv6: bool, 49 + /// only accept experimental requests at this hostname 50 + /// 51 + /// a cert will be provisioned for it from letsencrypt. if you're not using 52 + /// acme (eg., behind a tls-terminating reverse proxy), open a feature request. 53 + #[arg( 54 + long, 55 + requires("acme_domain"), 56 + env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN" 57 + )] 58 + experimental_acme_domain: Option<String>, 59 + /// accept writes! by forwarding them upstream 60 + #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 61 + experimental_write_upstream: bool, 62 + } 63 + 64 + pub async fn run( 65 + GlobalArgs { 66 + upstream, 67 + upstream_throttle_ms, 68 + }: GlobalArgs, 69 + Args { 70 + wrap, 71 + wrap_pg, 72 + wrap_pg_cert, 73 + bind, 74 + acme_domain, 75 + acme_cache_path, 76 + acme_directory_url, 77 + acme_ipv6, 78 + experimental_acme_domain, 79 + experimental_write_upstream, 80 + }: Args, 81 + sync: bool, 82 + ) -> anyhow::Result<()> { 83 + let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 84 + (_, false, Some(cache_path)) => { 85 + create_dir_all(&cache_path).await?; 86 + let mut domains = acme_domain.clone(); 87 + if let Some(ref experimental_domain) = experimental_acme_domain { 88 + domains.push(experimental_domain.clone()) 89 + } 90 + log::info!("configuring acme for https at {domains:?}..."); 91 + ListenConf::Acme { 92 + domains, 93 + cache_path, 94 + directory_url: acme_directory_url.to_string(), 95 + ipv6: acme_ipv6, 96 + } 97 + } 98 + (bind, true, None) => ListenConf::Bind(bind), 99 + (_, _, _) => unreachable!(), 100 + }; 101 + 102 + let experimental_conf = ExperimentalConf { 103 + acme_domain: experimental_acme_domain, 104 + write_upstream: experimental_write_upstream, 105 + }; 106 + 107 + let mut tasks = JoinSet::new(); 108 + 109 + let db = if sync { 110 + let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 111 + "a wrapped reference postgres must be provided to sync" 112 + ))?; 113 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 114 + 115 + // TODO: allow starting up with polling backfill from beginning? 116 + log::debug!("getting the latest op from the db..."); 117 + let latest = db 118 + .get_latest() 119 + .await? 120 + .expect("there to be at least one op in the db. did you backfill?"); 121 + 122 + let (send_page, recv_page) = mpsc::channel(8); 123 + 124 + let mut poll_url = upstream.clone(); 125 + poll_url.set_path("/export"); 126 + let throttle = Duration::from_millis(upstream_throttle_ms); 127 + 128 + tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 129 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 130 + Some(db) 131 + } else { 132 + None 133 + }; 134 + 135 + tasks.spawn(serve( 136 + upstream, 137 + wrap, 138 + listen_conf, 139 + experimental_conf, 140 + db.clone(), 141 + )); 142 + 143 + while let Some(next) = tasks.join_next().await { 144 + match next { 145 + Err(e) if e.is_panic() => { 146 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 147 + return Err(e.into()); 148 + } 149 + Err(e) => { 150 + log::error!("a joinset task failed to join: {e}"); 151 + return Err(e.into()); 152 + } 153 + Ok(Err(e)) => { 154 + log::error!("a joinset task completed with error: {e}"); 155 + return Err(e); 156 + } 157 + Ok(Ok(name)) => { 158 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 159 + } 160 + } 161 + } 162 + 163 + Ok(()) 164 + } 165 + 166 + #[derive(Debug, Parser)] 167 + struct CliArgs { 168 + #[command(flatten)] 169 + globals: GlobalArgs, 170 + #[command(flatten)] 171 + instrumentation: InstrumentationArgs, 172 + #[command(flatten)] 173 + args: Args, 174 + /// Run the mirror in wrap mode, no upstream synchronization (read-only) 175 + #[arg(long, action)] 176 + wrap_mode: bool, 177 + } 178 + 179 + #[allow(dead_code)] 180 + #[tokio::main] 181 + async fn main() -> anyhow::Result<()> { 182 + let args = CliArgs::parse(); 183 + bin_init(args.instrumentation.enable_opentelemetry); 184 + log::info!("{}", logo("mirror")); 185 + run(args.globals, args.args, !args.wrap_mode).await?; 186 + Ok(()) 187 + }
+58
src/bin/mod.rs
···
··· 1 + mod instrumentation; 2 + 3 + use reqwest::Url; 4 + use tracing_subscriber::layer::SubscriberExt; 5 + 6 + #[derive(Debug, Clone, clap::Args)] 7 + pub struct GlobalArgs { 8 + /// Upstream PLC server 9 + #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 10 + #[clap(default_value = "https://plc.directory")] 11 + pub upstream: Url, 12 + /// Self-rate-limit upstream request interval 13 + /// 14 + /// plc.directory's rate limiting is 500 requests per 5 mins (600ms) 15 + #[arg(long, global = true, env = "ALLEGEDLY_UPSTREAM_THROTTLE_MS")] 16 + #[clap(default_value = "600")] 17 + pub upstream_throttle_ms: u64, 18 + } 19 + 20 + #[derive(Debug, Default, Clone, clap::Args)] 21 + pub struct InstrumentationArgs { 22 + /// Export traces to an OTLP collector 23 + /// 24 + /// Configure the colletctor via standard env vars: 25 + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" 26 + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" 27 + /// - `OTEL_SERVICE_NAME` eg "my-app" 28 + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] 29 + pub enable_opentelemetry: bool, 30 + } 31 + 32 + pub fn bin_init(enable_otlp: bool) { 33 + let filter = tracing_subscriber::EnvFilter::builder() 34 + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) 35 + .from_env_lossy(); 36 + 37 + let stderr_log = tracing_subscriber::fmt::layer() 38 + .with_writer(std::io::stderr) 39 + .pretty(); 40 + 41 + let otel = if enable_otlp { 42 + Some(instrumentation::otel_layer()) 43 + } else { 44 + None 45 + }; 46 + 47 + let subscriber = tracing_subscriber::Registry::default() 48 + .with(filter) 49 + .with(stderr_log) 50 + .with(otel); 51 + 52 + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); 53 + } 54 + 55 + #[allow(dead_code)] 56 + fn main() { 57 + panic!("this is not actually a module") 58 + }
+73
src/cached_value.rs
···
··· 1 + use std::error::Error; 2 + use std::sync::Arc; 3 + use std::time::{Duration, Instant}; 4 + use tokio::sync::Mutex; 5 + 6 + pub trait Fetcher<T> { 7 + fn fetch(&self) -> impl Future<Output = Result<T, Box<dyn Error>>>; 8 + } 9 + 10 + #[derive(Debug)] 11 + struct ExpiringValue<T: Clone> { 12 + value: T, 13 + expires: Instant, 14 + } 15 + 16 + impl<T: Clone> ExpiringValue<T> { 17 + fn get(&self, now: Instant) -> Option<T> { 18 + if now <= self.expires { 19 + log::trace!("returning val (fresh for {:?})", self.expires - now); 20 + Some(self.value.clone()) 21 + } else { 22 + log::trace!("hiding expired val"); 23 + None 24 + } 25 + } 26 + } 27 + 28 + // TODO: generic over the fetcher's actual error type 29 + #[derive(Clone)] 30 + pub struct CachedValue<T: Clone, F: Fetcher<T>> { 31 + latest: Arc<Mutex<Option<ExpiringValue<T>>>>, 32 + fetcher: F, 33 + validitiy: Duration, 34 + } 35 + 36 + impl<T: Clone, F: Fetcher<T>> CachedValue<T, F> { 37 + pub fn new(f: F, validitiy: Duration) -> Self { 38 + Self { 39 + latest: Default::default(), 40 + fetcher: f, 41 + validitiy, 42 + } 43 + } 44 + pub async fn get(&self) -> Result<T, Box<dyn Error>> { 45 + let now = Instant::now(); 46 + return self.get_impl(now).await; 47 + } 48 + async fn get_impl(&self, now: Instant) -> Result<T, Box<dyn Error>> { 49 + let mut val = self.latest.lock().await; 50 + if let Some(v) = val.as_ref().and_then(|v| v.get(now)) { 51 + return Ok(v); 52 + } 53 + log::debug!( 54 + "value {}, fetching...", 55 + if val.is_some() { 56 + "expired" 57 + } else { 58 + "not present" 59 + } 60 + ); 61 + let new = self 62 + .fetcher 63 + .fetch() 64 + .await 65 + .inspect_err(|e| log::warn!("value fetch failed, next access will retry: {e}"))?; 66 + log::debug!("fetched ok, saving a copy for cache."); 67 + *val = Some(ExpiringValue { 68 + value: new.clone(), 69 + expires: now + self.validitiy, 70 + }); 71 + Ok(new) 72 + } 73 + }
+5 -1
src/client.rs
··· 10 ); 11 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 - let inner = Client::builder().user_agent(UA).build().unwrap(); 14 15 let policy = ExponentialBackoff::builder().build_with_max_retries(12); 16
··· 10 ); 11 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 + let inner = Client::builder() 14 + .user_agent(UA) 15 + .gzip(true) 16 + .build() 17 + .expect("reqwest client to build"); 18 19 let policy = ExponentialBackoff::builder().build_with_max_retries(12); 20
+83 -25
src/lib.rs
··· 1 - use serde::Deserialize; 2 3 mod backfill; 4 mod client; 5 mod mirror; 6 mod plc_pg; ··· 8 mod ratelimit; 9 mod weekly; 10 11 pub use backfill::backfill; 12 pub use client::{CLIENT, UA}; 13 - pub use mirror::{ListenConf, serve}; 14 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 15 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 16 - pub use ratelimit::GovernorMiddleware; 17 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 18 19 pub type Dt = chrono::DateTime<chrono::Utc>; ··· 23 /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 24 #[derive(Debug)] 25 pub struct ExportPage { 26 - pub ops: Vec<String>, 27 } 28 29 impl ExportPage { ··· 35 /// A fully-deserialized plc operation 36 /// 37 /// including the plc's wrapping with timestmap and nullified state 38 - #[derive(Debug, Deserialize)] 39 #[serde(rename_all = "camelCase")] 40 - pub struct Op<'a> { 41 - pub did: &'a str, 42 - pub cid: &'a str, 43 pub created_at: Dt, 44 pub nullified: bool, 45 - #[serde(borrow)] 46 - pub operation: &'a serde_json::value::RawValue, 47 } 48 49 /// Database primary key for an op ··· 53 pub cid: String, 54 } 55 56 - impl From<&Op<'_>> for OpKey { 57 - fn from(Op { did, cid, .. }: &Op<'_>) -> Self { 58 Self { 59 did: did.to_string(), 60 cid: cid.to_string(), ··· 62 } 63 } 64 65 pub fn logo(name: &str) -> String { 66 format!( 67 r" ··· 74 env!("CARGO_PKG_VERSION"), 75 ) 76 } 77 - 78 - pub fn bin_init(name: &str) { 79 - if std::env::var_os("RUST_LOG").is_none() { 80 - unsafe { std::env::set_var("RUST_LOG", "info") }; 81 - } 82 - let filter = tracing_subscriber::EnvFilter::from_default_env(); 83 - tracing_subscriber::fmt() 84 - .with_writer(std::io::stderr) 85 - .with_env_filter(filter) 86 - .init(); 87 - 88 - log::info!("{}", logo(name)); 89 - }
··· 1 + use serde::{Deserialize, Serialize}; 2 + use tokio::sync::{mpsc, oneshot}; 3 4 mod backfill; 5 + mod cached_value; 6 mod client; 7 mod mirror; 8 mod plc_pg; ··· 10 mod ratelimit; 11 mod weekly; 12 13 + pub mod bin; 14 + 15 pub use backfill::backfill; 16 + pub use cached_value::{CachedValue, Fetcher}; 17 pub use client::{CLIENT, UA}; 18 + pub use mirror::{ExperimentalConf, ListenConf, serve}; 19 pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 20 pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21 + pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 22 pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 23 24 pub type Dt = chrono::DateTime<chrono::Utc>; ··· 28 /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 29 #[derive(Debug)] 30 pub struct ExportPage { 31 + pub ops: Vec<Op>, 32 } 33 34 impl ExportPage { ··· 40 /// A fully-deserialized plc operation 41 /// 42 /// including the plc's wrapping with timestmap and nullified state 43 + #[derive(Debug, Clone, Deserialize, Serialize)] 44 #[serde(rename_all = "camelCase")] 45 + pub struct Op { 46 + pub did: String, 47 + pub cid: String, 48 pub created_at: Dt, 49 pub nullified: bool, 50 + pub operation: Box<serde_json::value::RawValue>, 51 + } 52 + 53 + #[cfg(test)] 54 + impl PartialEq for Op { 55 + fn eq(&self, other: &Self) -> bool { 56 + self.did == other.did 57 + && self.cid == other.cid 58 + && self.created_at == other.created_at 59 + && self.nullified == other.nullified 60 + && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 61 + == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 62 + } 63 } 64 65 /// Database primary key for an op ··· 69 pub cid: String, 70 } 71 72 + impl From<&Op> for OpKey { 73 + fn from(Op { did, cid, .. }: &Op) -> Self { 74 Self { 75 did: did.to_string(), 76 cid: cid.to_string(), ··· 78 } 79 } 80 81 + /// page forwarder who drops its channels on receipt of a small page 82 + /// 83 + /// PLC will return up to 1000 ops on a page, and returns full pages until it 84 + /// has caught up, so this is a (hacky?) way to stop polling once we're up. 85 + pub async fn full_pages( 86 + mut rx: mpsc::Receiver<ExportPage>, 87 + tx: mpsc::Sender<ExportPage>, 88 + ) -> anyhow::Result<&'static str> { 89 + while let Some(page) = rx.recv().await { 90 + let n = page.ops.len(); 91 + if n < 900 { 92 + let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 93 + let Some(age) = last_age else { 94 + log::info!("full_pages done, empty final page"); 95 + return Ok("full pages (hmm)"); 96 + }; 97 + if age <= chrono::TimeDelta::hours(6) { 98 + log::info!("full_pages done, final page of {n} ops"); 99 + } else { 100 + log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 101 + } 102 + return Ok("full pages (cool)"); 103 + } 104 + log::trace!("full_pages: continuing with page of {n} ops"); 105 + tx.send(page).await?; 106 + } 107 + Err(anyhow::anyhow!( 108 + "full_pages ran out of source material, sender closed" 109 + )) 110 + } 111 + 112 + pub async fn pages_to_stdout( 113 + mut rx: mpsc::Receiver<ExportPage>, 114 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 115 + ) -> anyhow::Result<&'static str> { 116 + let mut last_at = None; 117 + while let Some(page) = rx.recv().await { 118 + for op in &page.ops { 119 + println!("{}", serde_json::to_string(op)?); 120 + } 121 + if notify_last_at.is_some() 122 + && let Some(s) = PageBoundaryState::new(&page) 123 + { 124 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 125 + } 126 + } 127 + if let Some(notify) = notify_last_at { 128 + log::trace!("notifying last_at: {last_at:?}"); 129 + if notify.send(last_at).is_err() { 130 + log::error!("receiver for last_at dropped, can't notify"); 131 + }; 132 + } 133 + Ok("pages_to_stdout") 134 + } 135 + 136 pub fn logo(name: &str) -> String { 137 format!( 138 r" ··· 145 env!("CARGO_PKG_VERSION"), 146 ) 147 }
+320 -72
src/mirror.rs
··· 1 - use crate::{GovernorMiddleware, UA, logo}; 2 use futures::TryStreamExt; 3 use governor::Quota; 4 use poem::{ 5 - Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, get, 6 - handler, 7 - http::StatusCode, 8 listener::{Listener, TcpListener, acme::AutoCert}, 9 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 10 - web::{Data, Json}, 11 }; 12 use reqwest::{Client, Url}; 13 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 14 15 - #[derive(Debug, Clone)] 16 struct State { 17 client: Client, 18 plc: Url, 19 upstream: Url, 20 } 21 22 #[handler] 23 - fn hello(Data(State { upstream, .. }): Data<&State>) -> String { 24 - format!( 25 - r#"{} 26 - 27 - This is a PLC[1] mirror running Allegedly[2] in mirror mode. Allegedly synchronizes and proxies to a downstream PLC reference server instance[3] (why?[4]). 28 29 30 Configured upstream: 31 32 {upstream} 33 34 35 Available APIs: 36 37 - - All PLC GET requests [5]. 38 - - Rejects POSTs. This is a mirror. 39 40 - try `GET /{{did}}` to resolve an identity 41 42 43 - [1] https://web.plc.directory 44 - [2] https://tangled.org/@microcosm.blue/Allegedly 45 - [3] https://github.com/did-method-plc/did-method-plc 46 - [4] https://updates.microcosm.blue/3lz7nwvh4zc2u 47 - [5] https://web.plc.directory/api/redoc 48 49 "#, 50 logo("mirror") 51 ) 52 } 53 54 - fn failed_to_reach_wrapped() -> String { 55 format!( 56 r#"{} 57 58 - Failed to reach the wrapped reference PLC server. Sorry. 59 "#, 60 logo("mirror 502 :( ") 61 ) 62 } 63 64 - async fn plc_status(url: &Url, client: &Client) -> (bool, serde_json::Value) { 65 use serde_json::json; 66 67 let mut url = url.clone(); ··· 97 } 98 } 99 100 #[handler] 101 async fn health( 102 Data(State { 103 plc, 104 client, 105 - upstream, 106 }): Data<&State>, 107 ) -> impl IntoResponse { 108 let mut overall_status = StatusCode::OK; ··· 110 if !ok { 111 overall_status = StatusCode::BAD_GATEWAY; 112 } 113 - let (ok, upstream_status) = plc_status(upstream, client).await; 114 - if !ok { 115 - overall_status = StatusCode::BAD_GATEWAY; 116 } 117 - ( 118 - overall_status, 119 - Json(serde_json::json!({ 120 - "server": "allegedly (mirror)", 121 - "version": env!("CARGO_PKG_VERSION"), 122 - "wrapped_plc": wrapped_status, 123 - "upstream_plc": upstream_status, 124 - })), 125 - ) 126 } 127 128 #[handler] 129 - async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<impl IntoResponse> { 130 let mut target = state.plc.clone(); 131 target.set_path(req.uri().path()); 132 - let upstream_res = state 133 .client 134 .get(target) 135 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server ··· 138 .await 139 .map_err(|e| { 140 log::error!("upstream req fail: {e}"); 141 - Error::from_string(failed_to_reach_wrapped(), StatusCode::BAD_GATEWAY) 142 })?; 143 144 - let http_res: poem::http::Response<reqwest::Body> = upstream_res.into(); 145 - let (parts, reqw_body) = http_res.into_parts(); 146 147 - let parts = poem::ResponseParts { 148 - status: parts.status, 149 - version: parts.version, 150 - headers: parts.headers, 151 - extensions: parts.extensions, 152 - }; 153 154 - let body = http_body_util::BodyDataStream::new(reqw_body) 155 - .map_err(|e| std::io::Error::other(Box::new(e))); 156 157 - Ok(Response::from_parts( 158 - parts, 159 - poem::Body::from_bytes_stream(body), 160 - )) 161 } 162 163 #[handler] ··· 169 170 Sorry, this server does not accept POST requests. 171 172 - You may wish to try upstream: {upstream} 173 "#, 174 logo("mirror (nope)") 175 ), ··· 182 domains: Vec<String>, 183 cache_path: PathBuf, 184 directory_url: String, 185 }, 186 Bind(SocketAddr), 187 } 188 189 - pub async fn serve(upstream: &Url, plc: Url, listen: ListenConf) -> std::io::Result<()> { 190 // not using crate CLIENT: don't want the retries etc 191 let client = Client::builder() 192 .user_agent(UA) 193 .timeout(Duration::from_secs(10)) // fallback 194 .build() 195 - .unwrap(); 196 197 let state = State { 198 client, 199 plc, 200 upstream: upstream.clone(), 201 }; 202 203 - let app = Route::new() 204 .at("/", get(hello)) 205 .at("/_health", get(health)) 206 - .at("/:any", get(proxy).post(nope)) 207 .with(AddData::new(state)) 208 .with(Cors::new().allow_credentials(false)) 209 .with(Compression::new()) 210 - .with(GovernorMiddleware::new(Quota::per_minute( 211 - 3000.try_into().unwrap(), 212 - ))) 213 .with(CatchPanic::new()) 214 .with(Tracing); 215 ··· 218 domains, 219 cache_path, 220 directory_url, 221 } => { 222 rustls::crypto::aws_lc_rs::default_provider() 223 .install_default() ··· 231 } 232 let auto_cert = auto_cert.build().expect("acme config to build"); 233 234 - run_insecure_notice(); 235 - run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await 236 } 237 - ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await, 238 } 239 } 240 241 async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> ··· 250 } 251 252 /// kick off a tiny little server on a tokio task to tell people to use 443 253 - fn run_insecure_notice() { 254 #[handler] 255 fn oop_plz_be_secure() -> (StatusCode, String) { 256 ( ··· 265 ) 266 } 267 268 - let app = Route::new().at("/", get(oop_plz_be_secure)).with(Tracing); 269 - let listener = TcpListener::bind("0.0.0.0:80"); 270 - tokio::task::spawn(async move { 271 - Server::new(listener) 272 - .name("allegedly (mirror:80 helper)") 273 - .run(app) 274 - .await 275 - }); 276 }
··· 1 + use crate::{ 2 + CachedValue, CreatePlcOpLimiter, Db, Dt, Fetcher, GovernorMiddleware, IpLimiters, UA, logo, 3 + }; 4 use futures::TryStreamExt; 5 use governor::Quota; 6 use poem::{ 7 + Body, Endpoint, EndpointExt, Error, IntoResponse, Request, Response, Result, Route, Server, 8 + get, handler, 9 + http::{StatusCode, header::USER_AGENT}, 10 listener::{Listener, TcpListener, acme::AutoCert}, 11 middleware::{AddData, CatchPanic, Compression, Cors, Tracing}, 12 + web::{Data, Json, Path}, 13 }; 14 use reqwest::{Client, Url}; 15 use std::{net::SocketAddr, path::PathBuf, time::Duration}; 16 17 + #[derive(Clone)] 18 struct State { 19 client: Client, 20 plc: Url, 21 upstream: Url, 22 + sync_info: Option<SyncInfo>, 23 + experimental: ExperimentalConf, 24 + } 25 + 26 + /// server info that only applies in mirror (synchronizing) mode 27 + #[derive(Clone)] 28 + struct SyncInfo { 29 + latest_at: CachedValue<Dt, GetLatestAt>, 30 + upstream_status: CachedValue<PlcStatus, CheckUpstream>, 31 } 32 33 #[handler] 34 + fn hello( 35 + Data(State { 36 + sync_info, 37 + upstream, 38 + experimental: exp, 39 + .. 40 + }): Data<&State>, 41 + req: &Request, 42 + ) -> String { 43 + // let mode = if sync_info.is_some() { "mirror" } else { "wrap" }; 44 + let pre_info = if sync_info.is_some() { 45 + format!( 46 + r#" 47 + This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 48 + synchronizes a local PLC reference server instance[2] (why?[3]). 49 50 51 Configured upstream: 52 53 {upstream} 54 55 + "# 56 + ) 57 + } else { 58 + format!( 59 + r#" 60 + This is a PLC[1] mirror running Allegedly in wrap mode. Wrap mode reverse- 61 + proxies requests to a PLC server and can terminate TLS, like NGINX or Caddy. 62 + 63 + 64 + Configured upstream (only used if experimental op forwarding is enabled): 65 + 66 + {upstream} 67 + 68 + "# 69 + ) 70 + }; 71 + 72 + let post_info = match (exp.write_upstream, &exp.acme_domain, req.uri().host()) { 73 + (false, _, _) => " - POST /* Always rejected. This is a mirror.".to_string(), 74 + (_, None, _) => { 75 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 76 + } 77 + (_, Some(d), Some(f)) if f == d => { 78 + " - POST /:did Create a PLC op. Allegedly will forward it upstream.".to_string() 79 + } 80 + (_, Some(d), _) => format!( 81 + r#" - POST /* Rejected, but experimental upstream op forwarding is 82 + available at `POST https://{d}/:did`!"# 83 + ), 84 + }; 85 + 86 + format!( 87 + r#"{} 88 + {pre_info} 89 90 Available APIs: 91 92 + - GET /_health Health and version info 93 94 + - GET /* Proxies to wrapped server; see PLC API docs: 95 + https://web.plc.directory/api/redoc 96 97 + tip: try `GET /{{did}}` to resolve an identity 98 99 + {post_info} 100 + 101 + 102 + Allegedly is a suite of open-source CLI tools from for working with PLC logs, 103 + from microcosm: 104 + 105 + https://tangled.org/@microcosm.blue/Allegedly 106 + 107 + https://microcosm.blue 108 109 + 110 + [1] https://web.plc.directory 111 + [2] https://github.com/did-method-plc/did-method-plc 112 + [3] https://updates.microcosm.blue/3lz7nwvh4zc2u 113 "#, 114 logo("mirror") 115 ) 116 } 117 118 + #[handler] 119 + fn favicon() -> impl IntoResponse { 120 + include_bytes!("../favicon.ico").with_content_type("image/x-icon") 121 + } 122 + 123 + fn failed_to_reach_named(name: &str) -> String { 124 format!( 125 r#"{} 126 127 + Failed to reach the {name} server. Sorry. 128 "#, 129 logo("mirror 502 :( ") 130 ) 131 } 132 133 + fn bad_create_op(reason: &str) -> Response { 134 + Response::builder() 135 + .status(StatusCode::BAD_REQUEST) 136 + .body(format!( 137 + r#"{} 138 + 139 + NooOOOooooo: {reason} 140 + "#, 141 + logo("mirror 400 >:( ") 142 + )) 143 + } 144 + 145 + type PlcStatus = (bool, serde_json::Value); 146 + 147 + async fn plc_status(url: &Url, client: &Client) -> PlcStatus { 148 use serde_json::json; 149 150 let mut url = url.clone(); ··· 180 } 181 } 182 183 + #[derive(Clone)] 184 + struct GetLatestAt(Db); 185 + impl Fetcher<Dt> for GetLatestAt { 186 + async fn fetch(&self) -> Result<Dt, Box<dyn std::error::Error>> { 187 + let now = self.0.get_latest().await?.ok_or(anyhow::anyhow!( 188 + "expected to find at least one thing in the db" 189 + ))?; 190 + Ok(now) 191 + } 192 + } 193 + 194 + #[derive(Clone)] 195 + struct CheckUpstream(Url, Client); 196 + impl Fetcher<PlcStatus> for CheckUpstream { 197 + async fn fetch(&self) -> Result<PlcStatus, Box<dyn std::error::Error>> { 198 + Ok(plc_status(&self.0, &self.1).await) 199 + } 200 + } 201 + 202 #[handler] 203 async fn health( 204 Data(State { 205 plc, 206 client, 207 + sync_info, 208 + .. 209 }): Data<&State>, 210 ) -> impl IntoResponse { 211 let mut overall_status = StatusCode::OK; ··· 213 if !ok { 214 overall_status = StatusCode::BAD_GATEWAY; 215 } 216 + if let Some(SyncInfo { 217 + latest_at, 218 + upstream_status, 219 + }) = sync_info 220 + { 221 + // mirror mode 222 + let (ok, upstream_status) = upstream_status.get().await.expect("plc_status infallible"); 223 + if !ok { 224 + overall_status = StatusCode::BAD_GATEWAY; 225 + } 226 + let latest = latest_at.get().await.ok(); 227 + ( 228 + overall_status, 229 + Json(serde_json::json!({ 230 + "server": "allegedly (mirror)", 231 + "version": env!("CARGO_PKG_VERSION"), 232 + "wrapped_plc": wrapped_status, 233 + "upstream_plc": upstream_status, 234 + "latest_at": latest, 235 + })), 236 + ) 237 + } else { 238 + // wrap mode 239 + ( 240 + overall_status, 241 + Json(serde_json::json!({ 242 + "server": "allegedly (mirror)", 243 + "version": env!("CARGO_PKG_VERSION"), 244 + "wrapped_plc": wrapped_status, 245 + })), 246 + ) 247 } 248 + } 249 + 250 + fn proxy_response(res: reqwest::Response) -> Response { 251 + let http_res: poem::http::Response<reqwest::Body> = res.into(); 252 + let (parts, reqw_body) = http_res.into_parts(); 253 + 254 + let parts = poem::ResponseParts { 255 + status: parts.status, 256 + version: parts.version, 257 + headers: parts.headers, 258 + extensions: parts.extensions, 259 + }; 260 + 261 + let body = http_body_util::BodyDataStream::new(reqw_body) 262 + .map_err(|e| std::io::Error::other(Box::new(e))); 263 + 264 + Response::from_parts(parts, poem::Body::from_bytes_stream(body)) 265 } 266 267 #[handler] 268 + async fn proxy(req: &Request, Data(state): Data<&State>) -> Result<Response> { 269 let mut target = state.plc.clone(); 270 target.set_path(req.uri().path()); 271 + target.set_query(req.uri().query()); 272 + let wrapped_res = state 273 .client 274 .get(target) 275 .timeout(Duration::from_secs(3)) // should be low latency to wrapped server ··· 278 .await 279 .map_err(|e| { 280 log::error!("upstream req fail: {e}"); 281 + Error::from_string( 282 + failed_to_reach_named("wrapped reference PLC"), 283 + StatusCode::BAD_GATEWAY, 284 + ) 285 })?; 286 287 + Ok(proxy_response(wrapped_res)) 288 + } 289 290 + #[handler] 291 + async fn forward_create_op_upstream( 292 + Data(State { 293 + upstream, 294 + client, 295 + experimental, 296 + .. 297 + }): Data<&State>, 298 + Path(did): Path<String>, 299 + req: &Request, 300 + body: Body, 301 + ) -> Result<Response> { 302 + if let Some(expected_domain) = &experimental.acme_domain { 303 + let Some(found_host) = req.uri().host() else { 304 + return Ok(bad_create_op(&format!( 305 + "missing `Host` header, expected {expected_domain:?} for experimental requests." 306 + ))); 307 + }; 308 + if found_host != expected_domain { 309 + return Ok(bad_create_op(&format!( 310 + "experimental requests must be made to {expected_domain:?}, but this request's `Host` header was {found_host}" 311 + ))); 312 + } 313 + } 314 315 + // adjust proxied headers 316 + let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 317 + log::trace!("original request headers: {headers:?}"); 318 + headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 319 + let client_ua = headers 320 + .get(USER_AGENT) 321 + .map(|h| h.to_str().unwrap()) 322 + .unwrap_or("unknown"); 323 + headers.insert( 324 + USER_AGENT, 325 + format!("{UA} (forwarding from {client_ua:?})") 326 + .parse() 327 + .unwrap(), 328 + ); 329 + log::trace!("adjusted request headers: {headers:?}"); 330 + 331 + let mut target = upstream.clone(); 332 + target.set_path(&did); 333 + let upstream_res = client 334 + .post(target) 335 + .timeout(Duration::from_secs(15)) // be a little generous 336 + .headers(headers) 337 + .body(reqwest::Body::wrap_stream(body.into_bytes_stream())) 338 + .send() 339 + .await 340 + .map_err(|e| { 341 + log::warn!("upstream write fail: {e}"); 342 + Error::from_string( 343 + failed_to_reach_named("upstream PLC"), 344 + StatusCode::BAD_GATEWAY, 345 + ) 346 + })?; 347 348 + Ok(proxy_response(upstream_res)) 349 } 350 351 #[handler] ··· 357 358 Sorry, this server does not accept POST requests. 359 360 + You may wish to try sending that to our upstream: {upstream}. 361 + 362 + If you operate this server, try running with `--experimental-write-upstream`. 363 "#, 364 logo("mirror (nope)") 365 ), ··· 372 domains: Vec<String>, 373 cache_path: PathBuf, 374 directory_url: String, 375 + ipv6: bool, 376 }, 377 Bind(SocketAddr), 378 } 379 380 + #[derive(Debug, Clone)] 381 + pub struct ExperimentalConf { 382 + pub acme_domain: Option<String>, 383 + pub write_upstream: bool, 384 + } 385 + 386 + pub async fn serve( 387 + upstream: Url, 388 + plc: Url, 389 + listen: ListenConf, 390 + experimental: ExperimentalConf, 391 + db: Option<Db>, 392 + ) -> anyhow::Result<&'static str> { 393 + log::info!("starting server..."); 394 + 395 // not using crate CLIENT: don't want the retries etc 396 let client = Client::builder() 397 .user_agent(UA) 398 .timeout(Duration::from_secs(10)) // fallback 399 .build() 400 + .expect("reqwest client to build"); 401 + 402 + // when `db` is None, we're running in wrap mode. no db access, no upstream sync 403 + let sync_info = db.map(|db| SyncInfo { 404 + latest_at: CachedValue::new(GetLatestAt(db), Duration::from_secs(2)), 405 + upstream_status: CachedValue::new( 406 + CheckUpstream(upstream.clone(), client.clone()), 407 + Duration::from_secs(6), 408 + ), 409 + }); 410 411 let state = State { 412 client, 413 plc, 414 upstream: upstream.clone(), 415 + sync_info, 416 + experimental: experimental.clone(), 417 }; 418 419 + let mut app = Route::new() 420 .at("/", get(hello)) 421 + .at("/favicon.ico", get(favicon)) 422 .at("/_health", get(health)) 423 + .at("/export", get(proxy)); 424 + 425 + if experimental.write_upstream { 426 + log::info!("enabling experimental write forwarding to upstream"); 427 + 428 + let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 429 + let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); 430 + 431 + let upstream_proxier = forward_create_op_upstream 432 + .with(GovernorMiddleware::new(did_limiter)) 433 + .with(GovernorMiddleware::new(ip_limiter)); 434 + 435 + app = app.at("/did:plc:*", get(proxy).post(upstream_proxier)); 436 + } else { 437 + app = app.at("/did:plc:*", get(proxy).post(nope)); 438 + } 439 + 440 + let app = app 441 .with(AddData::new(state)) 442 .with(Cors::new().allow_credentials(false)) 443 .with(Compression::new()) 444 + .with(GovernorMiddleware::new(IpLimiters::new(Quota::per_minute( 445 + 3000.try_into().expect("ratelimit middleware to build"), 446 + )))) 447 .with(CatchPanic::new()) 448 .with(Tracing); 449 ··· 452 domains, 453 cache_path, 454 directory_url, 455 + ipv6, 456 } => { 457 rustls::crypto::aws_lc_rs::default_provider() 458 .install_default() ··· 466 } 467 let auto_cert = auto_cert.build().expect("acme config to build"); 468 469 + log::trace!("auto_cert: {auto_cert:?}"); 470 + 471 + let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 472 + let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 473 + let app_res = run(app, listener.acme(auto_cert)).await; 474 + log::warn!("server task ended, aborting insecure server task..."); 475 + notice_task.abort(); 476 + app_res?; 477 + notice_task.await??; 478 } 479 + ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 480 } 481 + 482 + Ok("server (uh oh?)") 483 } 484 485 async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> ··· 494 } 495 496 /// kick off a tiny little server on a tokio task to tell people to use 443 497 + async fn run_insecure_notice(ipv6: bool) -> Result<(), std::io::Error> { 498 #[handler] 499 fn oop_plz_be_secure() -> (StatusCode, String) { 500 ( ··· 509 ) 510 } 511 512 + let app = Route::new() 513 + .at("/favicon.ico", get(favicon)) 514 + .nest("/", get(oop_plz_be_secure)) 515 + .with(Tracing); 516 + Server::new(TcpListener::bind(if ipv6 { 517 + "[::]:80" 518 + } else { 519 + "0.0.0.0:80" 520 + })) 521 + .name("allegedly (mirror:80 helper)") 522 + .run(app) 523 + .await 524 }
+69 -45
src/plc_pg.rs
··· 1 - use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 use std::pin::pin; 3 use std::time::Instant; 4 - use tokio::sync::{mpsc, oneshot}; 5 use tokio_postgres::{ 6 - Client, Error as PgError, NoTls, 7 binary_copy::BinaryCopyInWriter, 8 connect, 9 types::{Json, Type}, 10 }; 11 12 /// a little tokio-postgres helper 13 /// 14 /// it's clone for easiness. it doesn't share any resources underneath after 15 - /// cloning at all so it's not meant for 16 - #[derive(Debug, Clone)] 17 pub struct Db { 18 pg_uri: String, 19 } 20 21 impl Db { 22 - pub async fn new(pg_uri: &str) -> Result<Self, anyhow::Error> { 23 // we're going to interact with did-method-plc's database, so make sure 24 // it's what we expect: check for db migrations. 25 log::trace!("checking migrations..."); 26 - let (client, connection) = connect(pg_uri, NoTls).await?; 27 - let connection_task = tokio::task::spawn(async move { 28 - connection 29 - .await 30 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 31 - .unwrap(); 32 - }); 33 let migrations: Vec<String> = client 34 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 35 .await? ··· 47 ); 48 drop(client); 49 // make sure the connection worker thing doesn't linger 50 - connection_task.await?; 51 log::info!("db connection succeeded and plc migrations appear as expected"); 52 53 Ok(Self { 54 pg_uri: pg_uri.to_string(), 55 }) 56 } 57 58 - pub async fn connect(&self) -> Result<Client, PgError> { 59 log::trace!("connecting postgres..."); 60 - let (client, connection) = connect(&self.pg_uri, NoTls).await?; 61 - 62 - // send the connection away to do the actual communication work 63 - // apparently the connection will complete when the client drops 64 - tokio::task::spawn(async move { 65 - connection 66 - .await 67 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 68 - .unwrap(); 69 - }); 70 - 71 - Ok(client) 72 } 73 74 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> { 75 - let client = self.connect().await?; 76 let dt: Option<Dt> = client 77 .query_opt( 78 r#"SELECT "createdAt" ··· 83 ) 84 .await? 85 .map(|row| row.get(0)); 86 Ok(dt) 87 } 88 } 89 90 - pub async fn pages_to_pg(db: Db, mut pages: mpsc::Receiver<ExportPage>) -> Result<(), PgError> { 91 - let mut client = db.connect().await?; 92 93 let ops_stmt = client 94 .prepare( ··· 108 while let Some(page) = pages.recv().await { 109 log::trace!("writing page with {} ops", page.ops.len()); 110 let tx = client.transaction().await?; 111 - for s in page.ops { 112 - let Ok(op) = serde_json::from_str::<Op>(&s) else { 113 - log::warn!("ignoring unparseable op {s:?}"); 114 - continue; 115 - }; 116 ops_inserted += tx 117 .execute( 118 &ops_stmt, ··· 129 } 130 tx.commit().await?; 131 } 132 133 log::info!( 134 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 135 t0.elapsed() 136 ); 137 - Ok(()) 138 } 139 140 /// Dump rows into an empty operations table quickly ··· 155 reset: bool, 156 mut pages: mpsc::Receiver<ExportPage>, 157 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 158 - ) -> Result<(), PgError> { 159 - let mut client = db.connect().await?; 160 161 let t0 = Instant::now(); 162 let tx = client.transaction().await?; ··· 212 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 213 let mut last_at = None; 214 while let Some(page) = pages.recv().await { 215 - for s in &page.ops { 216 - let Ok(op) = serde_json::from_str::<Op>(s) else { 217 - log::warn!("ignoring unparseable op: {s:?}"); 218 - continue; 219 - }; 220 writer 221 .as_mut() 222 .write(&[ 223 &op.did, 224 - &Json(op.operation), 225 &op.cid, 226 &op.nullified, 227 &op.created_at, ··· 234 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 235 } 236 } 237 238 if let Some(notify) = notify_last_at { 239 log::trace!("notifying last_at: {last_at:?}"); ··· 274 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 275 276 tx.commit().await?; 277 log::info!("total backfill time: {:?}", t0.elapsed()); 278 279 - Ok(()) 280 }
··· 1 + use crate::{Dt, ExportPage, PageBoundaryState}; 2 + use native_tls::{Certificate, TlsConnector}; 3 + use postgres_native_tls::MakeTlsConnector; 4 + use std::path::PathBuf; 5 use std::pin::pin; 6 use std::time::Instant; 7 + use tokio::{ 8 + sync::{mpsc, oneshot}, 9 + task::{JoinHandle, spawn}, 10 + }; 11 use tokio_postgres::{ 12 + Client, Error as PgError, NoTls, Socket, 13 binary_copy::BinaryCopyInWriter, 14 connect, 15 + tls::MakeTlsConnect, 16 types::{Json, Type}, 17 }; 18 19 + fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> { 20 + let cert = std::fs::read(cert)?; 21 + let cert = Certificate::from_pem(&cert)?; 22 + let connector = TlsConnector::builder().add_root_certificate(cert).build()?; 23 + Ok(MakeTlsConnector::new(connector)) 24 + } 25 + 26 + async fn get_client_and_task<T>( 27 + uri: &str, 28 + connector: T, 29 + ) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> 30 + where 31 + T: MakeTlsConnect<Socket>, 32 + <T as MakeTlsConnect<Socket>>::Stream: Send + 'static, 33 + { 34 + let (client, connection) = connect(uri, connector).await?; 35 + Ok((client, spawn(connection))) 36 + } 37 + 38 /// a little tokio-postgres helper 39 /// 40 /// it's clone for easiness. it doesn't share any resources underneath after 41 + /// cloning *at all* so it's not meant for eg. handling public web requests 42 + #[derive(Clone)] 43 pub struct Db { 44 pg_uri: String, 45 + cert: Option<MakeTlsConnector>, 46 } 47 48 impl Db { 49 + pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> { 50 // we're going to interact with did-method-plc's database, so make sure 51 // it's what we expect: check for db migrations. 52 log::trace!("checking migrations..."); 53 + 54 + let connector = cert.map(get_tls).transpose()?; 55 + 56 + let (client, conn_task) = if let Some(ref connector) = connector { 57 + get_client_and_task(pg_uri, connector.clone()).await? 58 + } else { 59 + get_client_and_task(pg_uri, NoTls).await? 60 + }; 61 + 62 let migrations: Vec<String> = client 63 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 64 .await? ··· 76 ); 77 drop(client); 78 // make sure the connection worker thing doesn't linger 79 + conn_task.await??; 80 log::info!("db connection succeeded and plc migrations appear as expected"); 81 82 Ok(Self { 83 pg_uri: pg_uri.to_string(), 84 + cert: connector, 85 }) 86 } 87 88 + pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> { 89 log::trace!("connecting postgres..."); 90 + if let Some(ref connector) = self.cert { 91 + get_client_and_task(&self.pg_uri, connector.clone()).await 92 + } else { 93 + get_client_and_task(&self.pg_uri, NoTls).await 94 + } 95 } 96 97 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> { 98 + let (client, task) = self.connect().await?; 99 let dt: Option<Dt> = client 100 .query_opt( 101 r#"SELECT "createdAt" ··· 106 ) 107 .await? 108 .map(|row| row.get(0)); 109 + drop(task); 110 Ok(dt) 111 } 112 } 113 114 + pub async fn pages_to_pg( 115 + db: Db, 116 + mut pages: mpsc::Receiver<ExportPage>, 117 + ) -> anyhow::Result<&'static str> { 118 + log::info!("starting pages_to_pg writer..."); 119 + 120 + let (mut client, task) = db.connect().await?; 121 122 let ops_stmt = client 123 .prepare( ··· 137 while let Some(page) = pages.recv().await { 138 log::trace!("writing page with {} ops", page.ops.len()); 139 let tx = client.transaction().await?; 140 + for op in page.ops { 141 ops_inserted += tx 142 .execute( 143 &ops_stmt, ··· 154 } 155 tx.commit().await?; 156 } 157 + drop(task); 158 159 log::info!( 160 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 161 t0.elapsed() 162 ); 163 + Ok("pages_to_pg") 164 } 165 166 /// Dump rows into an empty operations table quickly ··· 181 reset: bool, 182 mut pages: mpsc::Receiver<ExportPage>, 183 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 184 + ) -> anyhow::Result<&'static str> { 185 + let (mut client, task) = db.connect().await?; 186 187 let t0 = Instant::now(); 188 let tx = client.transaction().await?; ··· 238 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 239 let mut last_at = None; 240 while let Some(page) = pages.recv().await { 241 + for op in &page.ops { 242 writer 243 .as_mut() 244 .write(&[ 245 &op.did, 246 + &Json(op.operation.clone()), 247 &op.cid, 248 &op.nullified, 249 &op.created_at, ··· 256 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 257 } 258 } 259 + log::debug!("finished receiving bulk pages"); 260 261 if let Some(notify) = notify_last_at { 262 log::trace!("notifying last_at: {last_at:?}"); ··· 297 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 298 299 tx.commit().await?; 300 + drop(task); 301 log::info!("total backfill time: {:?}", t0.elapsed()); 302 303 + Ok("backfill_to_pg") 304 }
+116 -259
src/poll.rs
··· 4 use thiserror::Error; 5 use tokio::sync::mpsc; 6 7 - // plc.directory ratelimit on /export is 500 per 5 mins 8 - const UPSTREAM_REQUEST_INTERVAL: Duration = Duration::from_millis(600); 9 - 10 #[derive(Debug, Error)] 11 pub enum GetPageError { 12 #[error(transparent)] ··· 26 pk: (String, String), // did, cid 27 } 28 29 - impl From<Op<'_>> for LastOp { 30 fn from(op: Op) -> Self { 31 Self { 32 created_at: op.created_at, 33 - pk: (op.did.to_string(), op.cid.to_string()), 34 } 35 } 36 } 37 38 impl From<Dt> for LastOp { 39 fn from(dt: Dt) -> Self { 40 Self { ··· 44 } 45 } 46 47 - /// PLC 48 #[derive(Debug, PartialEq)] 49 pub struct PageBoundaryState { 50 pub last_at: Dt, 51 keys_at: Vec<OpKey>, // expected to ~always be length one 52 } 53 54 - // ok so this is silly. 55 - // 56 - // i think i had some idea that deferring parsing to later steps would make it 57 - // easier to do things like sometimes not parsing at all (where the output is 58 - // also json lines), and maybe avoid some memory shuffling. 59 - // but since the input already has to be split into lines, keeping them as line 60 - // strings is probably the worst option: space-inefficient, allows garbage, and 61 - // leads to, well, this impl. 62 - // 63 - // it almost could have been slick if the *original* was just reused, and the 64 - // parsed ops were just kind of on the side referencing into it, but i'm lazy 65 - // and didn't get it there. 66 - // 67 - // should unrefactor to make Op own its data again, parse (and deal with errors) 68 - // upfront, and probably greatly simplify everything downstream. simple. 69 impl PageBoundaryState { 70 pub fn new(page: &ExportPage) -> Option<Self> { 71 - let mut skips = 0; 72 - 73 // grab the very last op 74 - let (last_at, last_key) = loop { 75 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 76 - // there are no ops left? oop. bail. 77 - // last_at and existing keys remain in tact if there was no later op 78 - return None; 79 - }; 80 - if s.is_empty() { 81 - // annoying: ignore any trailing blank lines 82 - skips += 1; 83 - continue; 84 - } 85 - let Ok(op) = serde_json::from_str::<Op>(&s) 86 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 87 - else { 88 - // doubly annoying: skip over trailing garbage?? 89 - skips += 1; 90 - continue; 91 - }; 92 - break (op.created_at, Into::<OpKey>::into(&op)); 93 - }; 94 95 // set initial state 96 let mut me = Self { ··· 99 }; 100 101 // and make sure all keys at this time are captured from the back 102 - me.capture_nth_last_at(page, last_at, skips); 103 104 Some(me) 105 } 106 fn apply_to_next(&mut self, page: &mut ExportPage) { 107 // walk ops forward, kicking previously-seen ops until created_at advances 108 let to_remove: Vec<usize> = page 109 .ops 110 .iter() 111 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 112 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 113 .enumerate() 114 - .take_while(|(_, opr)| opr.as_ref().map(|op| op.created_at == self.last_at).unwrap_or(false)) 115 - .filter_map(|(i, opr)| { 116 - if self.keys_at.contains(&(&opr.expect("any Errs were filtered by take_while")).into()) { 117 - Some(i) 118 - } else { None } 119 - }) 120 .collect(); 121 122 - // actually remove them. last to first to indices don't shift 123 for dup_idx in to_remove.into_iter().rev() { 124 page.ops.remove(dup_idx); 125 } 126 127 // grab the very last op 128 - let mut skips = 0; 129 - let (last_at, last_key) = loop { 130 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 131 - // there are no ops left? oop. bail. 132 - // last_at and existing keys remain in tact if there was no later op 133 - return; 134 - }; 135 - if s.is_empty() { 136 - // annoying: trim off any trailing blank lines 137 - skips += 1; 138 - continue; 139 - } 140 - let Ok(op) = serde_json::from_str::<Op>(&s) 141 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 142 - else { 143 - // doubly annoying: skip over trailing garbage?? 144 - skips += 1; 145 - continue; 146 - }; 147 - break (op.created_at, Into::<OpKey>::into(&op)); 148 }; 149 150 // reset state (as long as time actually moved forward on this page) ··· 157 self.keys_at.push(last_key); 158 } 159 // and make sure all keys at this time are captured from the back 160 - self.capture_nth_last_at(page, last_at, skips); 161 } 162 163 /// walk backwards from 2nd last and collect keys until created_at changes ··· 166 .iter() 167 .rev() 168 .skip(skips) 169 - .skip(1) // we alredy added the very last one 170 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 171 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 172 - .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false)) 173 - .for_each(|opr| { 174 - let op = &opr.expect("any Errs were filtered by take_while"); 175 self.keys_at.push(op.into()); 176 }); 177 } 178 } 179 180 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 181 log::trace!("Getting page: {url}"); 182 183 - let ops: Vec<String> = CLIENT 184 .get(url) 185 .send() 186 .await? ··· 190 .trim() 191 .split('\n') 192 .filter_map(|s| { 193 - let s = s.trim(); 194 - if s.is_empty() { None } else { Some(s) } 195 }) 196 - .map(Into::into) 197 .collect(); 198 199 - let last_op = ops 200 - .last() 201 - .filter(|s| !s.is_empty()) 202 - .map(|s| serde_json::from_str::<Op>(s)) 203 - .transpose()? 204 - .map(Into::into) 205 - .inspect(|at| log::trace!("new last op: {at:?}")); 206 207 Ok((ExportPage { ops }, last_op)) 208 } 209 210 pub async fn poll_upstream( 211 after: Option<Dt>, 212 base: Url, 213 dest: mpsc::Sender<ExportPage>, 214 - ) -> anyhow::Result<()> { 215 - let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 216 let mut prev_last: Option<LastOp> = after.map(Into::into); 217 let mut boundary_state: Option<PageBoundaryState> = None; 218 loop { ··· 252 const FIVES_TS: i64 = 1431648000; 253 const NEXT_TS: i64 = 1431648001; 254 255 - fn valid_op() -> serde_json::Value { 256 - serde_json::json!({ 257 "did": "did", 258 "cid": "cid", 259 "createdAt": "2015-05-15T00:00:00Z", 260 "nullified": false, 261 "operation": {}, 262 - }) 263 } 264 265 - fn next_op() -> serde_json::Value { 266 - serde_json::json!({ 267 "did": "didnext", 268 "cid": "cidnext", 269 "createdAt": "2015-05-15T00:00:01Z", 270 "nullified": false, 271 "operation": {}, 272 - }) 273 } 274 275 fn base_state() -> PageBoundaryState { 276 let page = ExportPage { 277 - ops: vec![valid_op().to_string()], 278 }; 279 - PageBoundaryState::new(&page).unwrap() 280 } 281 282 #[test] ··· 287 } 288 289 #[test] 290 - fn test_boundary_new_empty_op() { 291 - let page = ExportPage { 292 - ops: vec!["".to_string()], 293 - }; 294 - let state = PageBoundaryState::new(&page); 295 - assert!(state.is_none()); 296 - } 297 - 298 - #[test] 299 - fn test_boundary_new_ignores_bad_op() { 300 - let page = ExportPage { 301 - ops: vec!["bad".to_string()], 302 - }; 303 - let state = PageBoundaryState::new(&page); 304 - assert!(state.is_none()); 305 - } 306 - 307 - #[test] 308 - fn test_boundary_new_multiple_bad_end() { 309 - let page = ExportPage { 310 - ops: vec![ 311 - "bad".to_string(), 312 - "".to_string(), 313 - "foo".to_string(), 314 - "".to_string(), 315 - ], 316 - }; 317 - let state = PageBoundaryState::new(&page); 318 - assert!(state.is_none()); 319 - } 320 - 321 - #[test] 322 fn test_boundary_new_one_op() { 323 let page = ExportPage { 324 - ops: vec![valid_op().to_string()], 325 }; 326 let state = PageBoundaryState::new(&page).unwrap(); 327 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); ··· 335 } 336 337 #[test] 338 - fn test_boundary_new_one_op_with_stuff() { 339 - let expect_same_state = |m, ops| { 340 - let this_state = PageBoundaryState::new(&ExportPage { ops }).unwrap(); 341 - assert_eq!(this_state, base_state(), "{}", m); 342 - }; 343 - 344 - expect_same_state("empty before", vec!["".to_string(), valid_op().to_string()]); 345 - 346 - expect_same_state("empty after", vec![valid_op().to_string(), "".to_string()]); 347 - 348 - expect_same_state( 349 - "bad before, empty after", 350 - vec!["bad".to_string(), valid_op().to_string(), "".to_string()], 351 - ); 352 - 353 - expect_same_state( 354 - "bad and empty before and after", 355 - vec![ 356 - "".to_string(), 357 - "bad".to_string(), 358 - valid_op().to_string(), 359 - "".to_string(), 360 - "bad".to_string(), 361 - ], 362 - ); 363 - } 364 - 365 - #[test] 366 fn test_add_new_empty() { 367 let mut state = base_state(); 368 state.apply_to_next(&mut ExportPage { ops: vec![] }); ··· 370 } 371 372 #[test] 373 - fn test_add_new_empty_op() { 374 - let mut state = base_state(); 375 - state.apply_to_next(&mut ExportPage { 376 - ops: vec!["".to_string()], 377 - }); 378 - assert_eq!(state, base_state()); 379 - } 380 - 381 - #[test] 382 - fn test_add_new_ignores_bad_op() { 383 - let mut state = base_state(); 384 - state.apply_to_next(&mut ExportPage { 385 - ops: vec!["bad".to_string()], 386 - }); 387 - assert_eq!(state, base_state()); 388 - } 389 - 390 - #[test] 391 - fn test_add_new_multiple_bad() { 392 - let mut page = ExportPage { 393 - ops: vec![ 394 - "bad".to_string(), 395 - "".to_string(), 396 - "foo".to_string(), 397 - "".to_string(), 398 - ], 399 - }; 400 - 401 - let mut state = base_state(); 402 - state.apply_to_next(&mut page); 403 - assert_eq!(state, base_state()); 404 - } 405 - 406 - #[test] 407 fn test_add_new_same_op() { 408 let mut page = ExportPage { 409 - ops: vec![valid_op().to_string()], 410 }; 411 let mut state = base_state(); 412 state.apply_to_next(&mut page); ··· 417 fn test_add_new_same_time() { 418 // make an op with a different OpKey 419 let mut op = valid_op(); 420 - op.as_object_mut() 421 - .unwrap() 422 - .insert("cid".to_string(), "cid2".into()); 423 - let mut page = ExportPage { 424 - ops: vec![op.to_string()], 425 - }; 426 427 let mut state = base_state(); 428 state.apply_to_next(&mut page); ··· 446 fn test_add_new_same_time_dup_before() { 447 // make an op with a different OpKey 448 let mut op = valid_op(); 449 - op.as_object_mut() 450 - .unwrap() 451 - .insert("cid".to_string(), "cid2".into()); 452 let mut page = ExportPage { 453 - ops: vec![valid_op().to_string(), op.to_string()], 454 }; 455 456 let mut state = base_state(); ··· 475 fn test_add_new_same_time_dup_after() { 476 // make an op with a different OpKey 477 let mut op = valid_op(); 478 - op.as_object_mut() 479 - .unwrap() 480 - .insert("cid".to_string(), "cid2".into()); 481 let mut page = ExportPage { 482 - ops: vec![op.to_string(), valid_op().to_string()], 483 - }; 484 - 485 - let mut state = base_state(); 486 - state.apply_to_next(&mut page); 487 - assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 488 - assert_eq!( 489 - state.keys_at, 490 - vec![ 491 - OpKey { 492 - cid: "cid".to_string(), 493 - did: "did".to_string(), 494 - }, 495 - OpKey { 496 - cid: "cid2".to_string(), 497 - did: "did".to_string(), 498 - }, 499 - ] 500 - ); 501 - } 502 - 503 - #[test] 504 - fn test_add_new_same_time_blank_after() { 505 - // make an op with a different OpKey 506 - let mut op = valid_op(); 507 - op.as_object_mut() 508 - .unwrap() 509 - .insert("cid".to_string(), "cid2".into()); 510 - let mut page = ExportPage { 511 - ops: vec![op.to_string(), "".to_string()], 512 }; 513 514 let mut state = base_state(); ··· 532 #[test] 533 fn test_add_new_next_time() { 534 let mut page = ExportPage { 535 - ops: vec![next_op().to_string()], 536 }; 537 let mut state = base_state(); 538 state.apply_to_next(&mut page); ··· 549 #[test] 550 fn test_add_new_next_time_with_dup() { 551 let mut page = ExportPage { 552 - ops: vec![valid_op().to_string(), next_op().to_string()], 553 }; 554 let mut state = base_state(); 555 state.apply_to_next(&mut page); ··· 562 },] 563 ); 564 assert_eq!(page.ops.len(), 1); 565 - assert_eq!(page.ops[0], next_op().to_string()); 566 } 567 568 #[test] 569 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 570 // make an op with a different OpKey 571 let mut op = valid_op(); 572 - op.as_object_mut() 573 - .unwrap() 574 - .insert("cid".to_string(), "cid2".into()); 575 576 let mut page = ExportPage { 577 ops: vec![ 578 - valid_op().to_string(), // should get dropped 579 - op.to_string(), // should be kept 580 - next_op().to_string(), 581 ], 582 }; 583 let mut state = base_state(); ··· 591 },] 592 ); 593 assert_eq!(page.ops.len(), 2); 594 - assert_eq!(page.ops[0], op.to_string()); 595 - assert_eq!(page.ops[1], next_op().to_string()); 596 } 597 598 #[test] 599 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 600 // make an op with a different OpKey 601 let mut op = valid_op(); 602 - op.as_object_mut() 603 - .unwrap() 604 - .insert("cid".to_string(), "cid2".into()); 605 606 let mut page = ExportPage { 607 ops: vec![ 608 - op.to_string(), // should be kept 609 - valid_op().to_string(), // should get dropped 610 - next_op().to_string(), 611 ], 612 }; 613 let mut state = base_state(); ··· 621 },] 622 ); 623 assert_eq!(page.ops.len(), 2); 624 - assert_eq!(page.ops[0], op.to_string()); 625 - assert_eq!(page.ops[1], next_op().to_string()); 626 } 627 }
··· 4 use thiserror::Error; 5 use tokio::sync::mpsc; 6 7 #[derive(Debug, Error)] 8 pub enum GetPageError { 9 #[error(transparent)] ··· 23 pk: (String, String), // did, cid 24 } 25 26 + impl From<Op> for LastOp { 27 fn from(op: Op) -> Self { 28 Self { 29 created_at: op.created_at, 30 + pk: (op.did, op.cid), 31 } 32 } 33 } 34 35 + impl From<&Op> for LastOp { 36 + fn from(op: &Op) -> Self { 37 + Self { 38 + created_at: op.created_at, 39 + pk: (op.did.clone(), op.cid.clone()), 40 + } 41 + } 42 + } 43 + 44 + // bit of a hack 45 impl From<Dt> for LastOp { 46 fn from(dt: Dt) -> Self { 47 Self { ··· 51 } 52 } 53 54 + /// State for removing duplicates ops between PLC export page boundaries 55 #[derive(Debug, PartialEq)] 56 pub struct PageBoundaryState { 57 + /// The previous page's last timestamp 58 + /// 59 + /// Duplicate ops from /export only occur for the same exact timestamp 60 pub last_at: Dt, 61 + /// The previous page's ops at its last timestamp 62 keys_at: Vec<OpKey>, // expected to ~always be length one 63 } 64 65 impl PageBoundaryState { 66 + /// Initialize the boundary state with a PLC page 67 pub fn new(page: &ExportPage) -> Option<Self> { 68 // grab the very last op 69 + let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 70 71 // set initial state 72 let mut me = Self { ··· 75 }; 76 77 // and make sure all keys at this time are captured from the back 78 + me.capture_nth_last_at(page, last_at, 1); 79 80 Some(me) 81 } 82 + /// Apply the deduplication and update state 83 + /// 84 + /// The beginning of the page will be modified to remove duplicates from the 85 + /// previous page. 86 + /// 87 + /// The end of the page is inspected to update the deduplicator state for 88 + /// the next page. 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 90 // walk ops forward, kicking previously-seen ops until created_at advances 91 let to_remove: Vec<usize> = page 92 .ops 93 .iter() 94 .enumerate() 95 + .take_while(|(_, op)| op.created_at == self.last_at) 96 + .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 97 + .map(|(i, _)| i) 98 .collect(); 99 100 + // actually remove them. last to first so indices don't shift 101 for dup_idx in to_remove.into_iter().rev() { 102 page.ops.remove(dup_idx); 103 } 104 105 // grab the very last op 106 + let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 107 + // there are no ops left? oop. bail. 108 + // last_at and existing keys remain in tact 109 + return; 110 }; 111 112 // reset state (as long as time actually moved forward on this page) ··· 119 self.keys_at.push(last_key); 120 } 121 // and make sure all keys at this time are captured from the back 122 + self.capture_nth_last_at(page, last_at, 1); 123 } 124 125 /// walk backwards from 2nd last and collect keys until created_at changes ··· 128 .iter() 129 .rev() 130 .skip(skips) 131 + .take_while(|op| op.created_at == last_at) 132 + .for_each(|op| { 133 self.keys_at.push(op.into()); 134 }); 135 } 136 } 137 138 + /// Get one PLC export page 139 + /// 140 + /// Extracts the final op so it can be used to fetch the following page 141 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 log::trace!("Getting page: {url}"); 143 144 + let ops: Vec<Op> = CLIENT 145 .get(url) 146 .send() 147 .await? ··· 151 .trim() 152 .split('\n') 153 .filter_map(|s| { 154 + serde_json::from_str::<Op>(s) 155 + .inspect_err(|e| { 156 + if !s.is_empty() { 157 + log::warn!("failed to parse op: {e} ({s})") 158 + } 159 + }) 160 + .ok() 161 }) 162 .collect(); 163 164 + let last_op = ops.last().map(Into::into); 165 166 Ok((ExportPage { ops }, last_op)) 167 } 168 169 + /// Poll an upstream PLC server for new ops 170 + /// 171 + /// Pages of operations are written to the `dest` channel. 172 + /// 173 + /// ```no_run 174 + /// # #[tokio::main] 175 + /// # async fn main() { 176 + /// use allegedly::{ExportPage, Op, poll_upstream}; 177 + /// 178 + /// let after = Some(chrono::Utc::now()); 179 + /// let upstream = "https://plc.wtf/export".parse().unwrap(); 180 + /// let throttle = std::time::Duration::from_millis(300); 181 + /// 182 + /// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 183 + /// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 184 + /// 185 + /// while let Some(ExportPage { ops }) = rx.recv().await { 186 + /// println!("received {} plc ops", ops.len()); 187 + /// 188 + /// for Op { did, cid, operation, .. } in ops { 189 + /// // in this example we're alerting when changes are found for one 190 + /// // specific identity 191 + /// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 192 + /// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 193 + /// } 194 + /// } 195 + /// } 196 + /// # } 197 + /// ``` 198 pub async fn poll_upstream( 199 after: Option<Dt>, 200 base: Url, 201 + throttle: Duration, 202 dest: mpsc::Sender<ExportPage>, 203 + ) -> anyhow::Result<&'static str> { 204 + log::info!("starting upstream poller at {base} after {after:?}"); 205 + let mut tick = tokio::time::interval(throttle); 206 let mut prev_last: Option<LastOp> = after.map(Into::into); 207 let mut boundary_state: Option<PageBoundaryState> = None; 208 loop { ··· 242 const FIVES_TS: i64 = 1431648000; 243 const NEXT_TS: i64 = 1431648001; 244 245 + fn valid_op() -> Op { 246 + serde_json::from_value(serde_json::json!({ 247 "did": "did", 248 "cid": "cid", 249 "createdAt": "2015-05-15T00:00:00Z", 250 "nullified": false, 251 "operation": {}, 252 + })) 253 + .unwrap() 254 } 255 256 + fn next_op() -> Op { 257 + serde_json::from_value(serde_json::json!({ 258 "did": "didnext", 259 "cid": "cidnext", 260 "createdAt": "2015-05-15T00:00:01Z", 261 "nullified": false, 262 "operation": {}, 263 + })) 264 + .unwrap() 265 } 266 267 fn base_state() -> PageBoundaryState { 268 let page = ExportPage { 269 + ops: vec![valid_op()], 270 }; 271 + PageBoundaryState::new(&page).expect("to have a base page boundary state") 272 } 273 274 #[test] ··· 279 } 280 281 #[test] 282 fn test_boundary_new_one_op() { 283 let page = ExportPage { 284 + ops: vec![valid_op()], 285 }; 286 let state = PageBoundaryState::new(&page).unwrap(); 287 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); ··· 295 } 296 297 #[test] 298 fn test_add_new_empty() { 299 let mut state = base_state(); 300 state.apply_to_next(&mut ExportPage { ops: vec![] }); ··· 302 } 303 304 #[test] 305 fn test_add_new_same_op() { 306 let mut page = ExportPage { 307 + ops: vec![valid_op()], 308 }; 309 let mut state = base_state(); 310 state.apply_to_next(&mut page); ··· 315 fn test_add_new_same_time() { 316 // make an op with a different OpKey 317 let mut op = valid_op(); 318 + op.cid = "cid2".to_string(); 319 + let mut page = ExportPage { ops: vec![op] }; 320 321 let mut state = base_state(); 322 state.apply_to_next(&mut page); ··· 340 fn test_add_new_same_time_dup_before() { 341 // make an op with a different OpKey 342 let mut op = valid_op(); 343 + op.cid = "cid2".to_string(); 344 let mut page = ExportPage { 345 + ops: vec![valid_op(), op], 346 }; 347 348 let mut state = base_state(); ··· 367 fn test_add_new_same_time_dup_after() { 368 // make an op with a different OpKey 369 let mut op = valid_op(); 370 + op.cid = "cid2".to_string(); 371 let mut page = ExportPage { 372 + ops: vec![op, valid_op()], 373 }; 374 375 let mut state = base_state(); ··· 393 #[test] 394 fn test_add_new_next_time() { 395 let mut page = ExportPage { 396 + ops: vec![next_op()], 397 }; 398 let mut state = base_state(); 399 state.apply_to_next(&mut page); ··· 410 #[test] 411 fn test_add_new_next_time_with_dup() { 412 let mut page = ExportPage { 413 + ops: vec![valid_op(), next_op()], 414 }; 415 let mut state = base_state(); 416 state.apply_to_next(&mut page); ··· 423 },] 424 ); 425 assert_eq!(page.ops.len(), 1); 426 + assert_eq!(page.ops[0], next_op()); 427 } 428 429 #[test] 430 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 431 // make an op with a different OpKey 432 let mut op = valid_op(); 433 + op.cid = "cid2".to_string(); 434 435 let mut page = ExportPage { 436 ops: vec![ 437 + valid_op(), // should get dropped 438 + op.clone(), // should be kept 439 + next_op(), 440 ], 441 }; 442 let mut state = base_state(); ··· 450 },] 451 ); 452 assert_eq!(page.ops.len(), 2); 453 + assert_eq!(page.ops[0], op); 454 + assert_eq!(page.ops[1], next_op()); 455 } 456 457 #[test] 458 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 459 // make an op with a different OpKey 460 let mut op = valid_op(); 461 + op.cid = "cid2".to_string(); 462 463 let mut page = ExportPage { 464 ops: vec![ 465 + op.clone(), // should be kept 466 + valid_op(), // should get dropped 467 + next_op(), 468 ], 469 }; 470 let mut state = base_state(); ··· 478 },] 479 ); 480 assert_eq!(page.ops.len(), 2); 481 + assert_eq!(page.ops[0], op); 482 + assert_eq!(page.ops[1], next_op()); 483 } 484 }
+102 -39
src/ratelimit.rs
··· 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 use std::{ 10 convert::TryInto, 11 net::{IpAddr, Ipv6Addr}, 12 sync::{Arc, LazyLock}, 13 time::Duration, ··· 20 type IP6_56 = [u8; 7]; 21 type IP6_48 = [u8; 6]; 22 23 fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 24 let period = quota.replenish_interval() / factor; 25 let burst = quota 26 .burst_size() 27 - .checked_mul(factor.try_into().unwrap()) 28 - .unwrap(); 29 Quota::with_period(period).map(|q| q.allow_burst(burst)) 30 } 31 32 #[derive(Debug)] 33 - struct IpLimiters { 34 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 35 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 36 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, ··· 40 pub fn new(quota: Quota) -> Self { 41 Self { 42 per_ip: RateLimiter::keyed(quota), 43 - ip6_56: RateLimiter::keyed(scale_quota(quota, 8).unwrap()), 44 - ip6_48: RateLimiter::keyed(scale_quota(quota, 256).unwrap()), 45 } 46 } 47 - pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { 48 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 49 match ip { 50 - addr @ IpAddr::V4(_) => self.per_ip.check_key(&addr).map_err(asdf), 51 IpAddr::V6(a) => { 52 // always check all limiters 53 let check_ip = self ··· 56 .map_err(asdf); 57 let check_56 = self 58 .ip6_56 59 - .check_key(a.octets()[..7].try_into().unwrap()) 60 .map_err(asdf); 61 let check_48 = self 62 .ip6_48 63 - .check_key(a.octets()[..6].try_into().unwrap()) 64 .map_err(asdf); 65 check_ip.and(check_56).and(check_48) 66 } 67 } 68 } 69 } 70 71 /// Once the rate limit has been reached, the middleware will respond with 72 /// status code 429 (too many requests) and a `Retry-After` header with the amount 73 /// of time that needs to pass before another request will be allowed. 74 - #[derive(Debug)] 75 - pub struct GovernorMiddleware { 76 #[allow(dead_code)] 77 stop_on_drop: oneshot::Sender<()>, 78 - limiters: Arc<IpLimiters>, 79 } 80 81 - impl GovernorMiddleware { 82 /// Limit request rates 83 /// 84 /// a little gross but this spawns a tokio task for housekeeping: 85 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 86 - pub fn new(quota: Quota) -> Self { 87 - let limiters = Arc::new(IpLimiters::new(quota)); 88 let (stop_on_drop, mut stopped) = oneshot::channel(); 89 tokio::task::spawn({ 90 let limiters = limiters.clone(); ··· 94 _ = &mut stopped => break, 95 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 96 }; 97 - log::debug!( 98 - "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 99 - limiters.per_ip.len(), 100 - limiters.ip6_56.len(), 101 - limiters.ip6_48.len(), 102 - ); 103 - limiters.per_ip.retain_recent(); 104 - limiters.ip6_56.retain_recent(); 105 - limiters.ip6_48.retain_recent(); 106 } 107 } 108 }); ··· 113 } 114 } 115 116 - impl<E: Endpoint> Middleware<E> for GovernorMiddleware { 117 - type Output = GovernorMiddlewareImpl<E>; 118 fn transform(&self, ep: E) -> Self::Output { 119 GovernorMiddlewareImpl { 120 ep, ··· 123 } 124 } 125 126 - pub struct GovernorMiddlewareImpl<E> { 127 ep: E, 128 - limiters: Arc<IpLimiters>, 129 } 130 131 - impl<E: Endpoint> Endpoint for GovernorMiddlewareImpl<E> { 132 type Output = E::Output; 133 134 async fn call(&self, req: Request) -> Result<Self::Output> { 135 - let remote = req 136 - .remote_addr() 137 - .as_socket_addr() 138 - .unwrap_or_else(|| panic!("failed to get request's remote addr")) // TODO 139 - .ip(); 140 141 - log::trace!("remote: {remote}"); 142 - 143 - match self.limiters.check_key(remote) { 144 Ok(_) => { 145 - log::debug!("allowing remote {remote}"); 146 self.ep.call(req).await 147 } 148 Err(d) => { 149 let wait_time = d.as_secs(); 150 151 - log::debug!("rate limit exceeded for {remote}, quota reset in {wait_time}s"); 152 153 let res = Response::builder() 154 .status(StatusCode::TOO_MANY_REQUESTS)
··· 8 use poem::{Endpoint, Middleware, Request, Response, Result, http::StatusCode}; 9 use std::{ 10 convert::TryInto, 11 + hash::Hash, 12 net::{IpAddr, Ipv6Addr}, 13 sync::{Arc, LazyLock}, 14 time::Duration, ··· 21 type IP6_56 = [u8; 7]; 22 type IP6_48 = [u8; 6]; 23 24 + pub trait Limiter<K: Hash + std::fmt::Debug>: Send + Sync + 'static { 25 + fn extract_key(&self, req: &Request) -> Result<K>; 26 + fn check_key(&self, ip: &K) -> Result<(), Duration>; 27 + fn housekeep(&self); 28 + } 29 + 30 fn scale_quota(quota: Quota, factor: u32) -> Option<Quota> { 31 let period = quota.replenish_interval() / factor; 32 let burst = quota 33 .burst_size() 34 + .checked_mul(factor.try_into().expect("factor to be non-zero")) 35 + .expect("burst to be able to multiply"); 36 Quota::with_period(period).map(|q| q.allow_burst(burst)) 37 } 38 39 #[derive(Debug)] 40 + pub struct CreatePlcOpLimiter { 41 + limiter: RateLimiter<String, DefaultKeyedStateStore<String>, DefaultClock>, 42 + } 43 + 44 + impl CreatePlcOpLimiter { 45 + pub fn new(quota: Quota) -> Self { 46 + Self { 47 + limiter: RateLimiter::keyed(quota), 48 + } 49 + } 50 + } 51 + 52 + /// this must be used with an endpoint with a single path param for the did 53 + impl Limiter<String> for CreatePlcOpLimiter { 54 + fn extract_key(&self, req: &Request) -> Result<String> { 55 + let (did,) = req.path_params::<(String,)>()?; 56 + Ok(did) 57 + } 58 + fn check_key(&self, did: &String) -> Result<(), Duration> { 59 + self.limiter 60 + .check_key(did) 61 + .map_err(|e| e.wait_time_from(CLOCK.now())) 62 + } 63 + fn housekeep(&self) { 64 + log::debug!( 65 + "limiter size before housekeeping: {} dids", 66 + self.limiter.len() 67 + ); 68 + self.limiter.retain_recent(); 69 + } 70 + } 71 + 72 + #[derive(Debug)] 73 + pub struct IpLimiters { 74 per_ip: RateLimiter<IpAddr, DefaultKeyedStateStore<IpAddr>, DefaultClock>, 75 ip6_56: RateLimiter<IP6_56, DefaultKeyedStateStore<IP6_56>, DefaultClock>, 76 ip6_48: RateLimiter<IP6_48, DefaultKeyedStateStore<IP6_48>, DefaultClock>, ··· 80 pub fn new(quota: Quota) -> Self { 81 Self { 82 per_ip: RateLimiter::keyed(quota), 83 + ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")), 84 + ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 85 } 86 } 87 + } 88 + 89 + impl Limiter<IpAddr> for IpLimiters { 90 + fn extract_key(&self, req: &Request) -> Result<IpAddr> { 91 + Ok(req 92 + .remote_addr() 93 + .as_socket_addr() 94 + .expect("failed to get request's remote addr") // TODO 95 + .ip()) 96 + } 97 + fn check_key(&self, ip: &IpAddr) -> Result<(), Duration> { 98 let asdf = |n: NotUntil<_>| n.wait_time_from(CLOCK.now()); 99 match ip { 100 + addr @ IpAddr::V4(_) => self.per_ip.check_key(addr).map_err(asdf), 101 IpAddr::V6(a) => { 102 // always check all limiters 103 let check_ip = self ··· 106 .map_err(asdf); 107 let check_56 = self 108 .ip6_56 109 + .check_key( 110 + a.octets()[..7] 111 + .try_into() 112 + .expect("to check ip6 /56 limiter"), 113 + ) 114 .map_err(asdf); 115 let check_48 = self 116 .ip6_48 117 + .check_key( 118 + a.octets()[..6] 119 + .try_into() 120 + .expect("to check ip6 /48 limiter"), 121 + ) 122 .map_err(asdf); 123 check_ip.and(check_56).and(check_48) 124 } 125 } 126 } 127 + fn housekeep(&self) { 128 + log::debug!( 129 + "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 130 + self.per_ip.len(), 131 + self.ip6_56.len(), 132 + self.ip6_48.len(), 133 + ); 134 + self.per_ip.retain_recent(); 135 + self.ip6_56.retain_recent(); 136 + self.ip6_48.retain_recent(); 137 + } 138 } 139 140 /// Once the rate limit has been reached, the middleware will respond with 141 /// status code 429 (too many requests) and a `Retry-After` header with the amount 142 /// of time that needs to pass before another request will be allowed. 143 + // #[derive(Debug)] 144 + pub struct GovernorMiddleware<K> { 145 #[allow(dead_code)] 146 stop_on_drop: oneshot::Sender<()>, 147 + limiters: Arc<dyn Limiter<K>>, 148 } 149 150 + impl<K: Hash + std::fmt::Debug> GovernorMiddleware<K> { 151 /// Limit request rates 152 /// 153 /// a little gross but this spawns a tokio task for housekeeping: 154 /// https://docs.rs/governor/latest/governor/struct.RateLimiter.html#keyed-rate-limiters---housekeeping 155 + pub fn new(limiters: impl Limiter<K>) -> Self { 156 + let limiters = Arc::new(limiters); 157 let (stop_on_drop, mut stopped) = oneshot::channel(); 158 tokio::task::spawn({ 159 let limiters = limiters.clone(); ··· 163 _ = &mut stopped => break, 164 _ = tokio::time::sleep(Duration::from_secs(60)) => {}, 165 }; 166 + limiters.housekeep(); 167 } 168 } 169 }); ··· 174 } 175 } 176 177 + impl<E, K> Middleware<E> for GovernorMiddleware<K> 178 + where 179 + E: Endpoint, 180 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 181 + { 182 + type Output = GovernorMiddlewareImpl<E, K>; 183 fn transform(&self, ep: E) -> Self::Output { 184 GovernorMiddlewareImpl { 185 ep, ··· 188 } 189 } 190 191 + pub struct GovernorMiddlewareImpl<E, K> { 192 ep: E, 193 + limiters: Arc<dyn Limiter<K>>, 194 } 195 196 + impl<E, K> Endpoint for GovernorMiddlewareImpl<E, K> 197 + where 198 + E: Endpoint, 199 + K: Hash + std::fmt::Debug + Send + Sync + 'static, 200 + { 201 type Output = E::Output; 202 203 async fn call(&self, req: Request) -> Result<Self::Output> { 204 + let key = self.limiters.extract_key(&req)?; 205 206 + match self.limiters.check_key(&key) { 207 Ok(_) => { 208 + log::debug!("allowing key {key:?}"); 209 self.ep.call(req).await 210 } 211 Err(d) => { 212 let wait_time = d.as_secs(); 213 214 + log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 215 216 let res = Response::builder() 217 .status(StatusCode::TOO_MANY_REQUESTS)
+31 -14
src/weekly.rs
··· 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 - Ok(File::open(path).await?) 101 } 102 } 103 ··· 138 let mut week_t0 = total_t0; 139 140 while let Some(page) = rx.recv().await { 141 - for mut s in page.ops { 142 - let Ok(op) = serde_json::from_str::<Op>(&s) 143 - .inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}")) 144 - else { 145 - continue; 146 - }; 147 let op_week = op.created_at.into(); 148 if current_week.map(|w| w != op_week).unwrap_or(true) { 149 encoder.shutdown().await?; ··· 168 week_ops = 0; 169 week_t0 = now; 170 } 171 - s.push('\n'); // hack 172 - log::trace!("writing: {s}"); 173 - encoder.write_all(s.as_bytes()).await?; 174 total_ops += 1; 175 week_ops += 1; 176 } ··· 197 dest: mpsc::Sender<ExportPage>, 198 ) -> anyhow::Result<()> { 199 use futures::TryStreamExt; 200 - let decoder = GzipDecoder::new(BufReader::new(source.reader_for(week).await?)); 201 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 202 203 - while let Some(chunk) = chunks.try_next().await? { 204 - let ops: Vec<String> = chunk.into_iter().collect(); 205 let page = ExportPage { ops }; 206 - dest.send(page).await?; 207 } 208 Ok(()) 209 }
··· 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 + log::debug!("opening folder source: {path:?}"); 101 + let file = File::open(path) 102 + .await 103 + .inspect_err(|e| log::error!("failed to open file: {e}"))?; 104 + Ok(file) 105 } 106 } 107 ··· 142 let mut week_t0 = total_t0; 143 144 while let Some(page) = rx.recv().await { 145 + for op in page.ops { 146 let op_week = op.created_at.into(); 147 if current_week.map(|w| w != op_week).unwrap_or(true) { 148 encoder.shutdown().await?; ··· 167 week_ops = 0; 168 week_t0 = now; 169 } 170 + log::trace!("writing: {op:?}"); 171 + encoder 172 + .write_all(serde_json::to_string(&op)?.as_bytes()) 173 + .await?; 174 total_ops += 1; 175 week_ops += 1; 176 } ··· 197 dest: mpsc::Sender<ExportPage>, 198 ) -> anyhow::Result<()> { 199 use futures::TryStreamExt; 200 + let reader = source 201 + .reader_for(week) 202 + .await 203 + .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 204 + let decoder = GzipDecoder::new(BufReader::new(reader)); 205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 206 207 + while let Some(chunk) = chunks 208 + .try_next() 209 + .await 210 + .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 + { 212 + let ops: Vec<Op> = chunk 213 + .into_iter() 214 + .filter_map(|s| { 215 + serde_json::from_str::<Op>(&s) 216 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 + .ok() 218 + }) 219 + .collect(); 220 let page = ExportPage { ops }; 221 + dest.send(page) 222 + .await 223 + .inspect_err(|e| log::error!("failed to send page: {e}"))?; 224 } 225 Ok(()) 226 }