Server tools to backfill, tail, mirror, and verify PLC logs

Compare changes

Choose any two refs to compare.

+720 -587
+14
Cargo.lock
··· 38 38 "governor", 39 39 "http-body-util", 40 40 "log", 41 + "native-tls", 41 42 "poem", 43 + "postgres-native-tls", 42 44 "reqwest", 43 45 "reqwest-middleware", 44 46 "reqwest-retry", ··· 1740 1742 version = "1.11.1" 1741 1743 source = "registry+https://github.com/rust-lang/crates.io-index" 1742 1744 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1745 + 1746 + [[package]] 1747 + name = "postgres-native-tls" 1748 + version = "0.5.1" 1749 + source = "registry+https://github.com/rust-lang/crates.io-index" 1750 + checksum = "a1f39498473c92f7b6820ae970382c1d83178a3454c618161cb772e8598d9f6f" 1751 + dependencies = [ 1752 + "native-tls", 1753 + "tokio", 1754 + "tokio-native-tls", 1755 + "tokio-postgres", 1756 + ] 1743 1757 1744 1758 [[package]] 1745 1759 name = "postgres-protocol"
+2
Cargo.toml
··· 15 15 governor = "0.10.1" 16 16 http-body-util = "0.1.3" 17 17 log = "0.4.28" 18 + native-tls = "0.2.14" 18 19 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 + postgres-native-tls = "0.5.1" 19 21 reqwest = { version = "0.12.23", features = ["stream", "json"] } 20 22 reqwest-middleware = "0.4.2" 21 23 reqwest-retry = "0.7.0"
favicon.ico

This is a binary file and will not be displayed.

+1 -1
readme.md
··· 27 27 --upstream "https://plc.directory" \ 28 28 --wrap "http://127.0.0.1:3000" \ 29 29 --acme-domain "plc.wtf" \ 30 - --acme-cache-dir ./acme-cache \ 30 + --acme-cache-path ./acme-cache \ 31 31 --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 32 ``` 33 33
+12 -5
src/backfill.rs
··· 13 13 dest: mpsc::Sender<ExportPage>, 14 14 source_workers: usize, 15 15 until: Option<Dt>, 16 - ) -> anyhow::Result<()> { 16 + ) -> anyhow::Result<&'static str> { 17 17 // queue up the week bundles that should be available 18 18 let weeks = Arc::new(Mutex::new( 19 19 until ··· 39 39 while let Some(week) = weeks.lock().await.pop() { 40 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 - week_to_pages(source.clone(), week, dest.clone()).await?; 42 + week_to_pages(source.clone(), week, dest.clone()) 43 + .await 44 + .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?; 43 45 } 44 46 log::info!("done with the weeks ig"); 45 47 Ok(()) ··· 50 52 51 53 // wait for the big backfill to finish 52 54 while let Some(res) = workers.join_next().await { 53 - res??; 55 + res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 56 + .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 54 57 } 55 - log::info!("finished fetching backfill in {:?}", t_step.elapsed()); 56 - Ok(()) 58 + log::info!( 59 + "finished fetching backfill in {:?}. senders remaining: {}", 60 + t_step.elapsed(), 61 + dest.strong_count() 62 + ); 63 + Ok("backfill") 57 64 }
+40 -226
src/bin/allegedly.rs
··· 1 - use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill, 3 - backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve, 4 - }; 1 + use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 5 2 use clap::{CommandFactory, Parser, Subcommand}; 6 - use reqwest::Url; 7 - use std::{net::SocketAddr, path::PathBuf, time::Instant}; 8 - use tokio::sync::{mpsc, oneshot}; 3 + use std::{path::PathBuf, time::Instant}; 4 + use tokio::fs::create_dir_all; 5 + use tokio::sync::mpsc; 6 + 7 + mod backfill; 8 + mod mirror; 9 9 10 10 #[derive(Debug, Parser)] 11 11 struct Cli { 12 - /// Upstream PLC server 13 - #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 14 - #[clap(default_value = "https://plc.directory")] 15 - upstream: Url, 12 + #[command(flatten)] 13 + globals: GlobalArgs, 14 + 16 15 #[command(subcommand)] 17 16 command: Commands, 18 17 } ··· 21 20 enum Commands { 22 21 /// Use weekly bundled ops to get a complete directory mirror FAST 23 22 Backfill { 24 - /// Remote URL prefix to fetch bundles from 25 - #[arg(long)] 26 - #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 27 - http: Url, 28 - /// Local folder to fetch bundles from (overrides `http`) 29 - #[arg(long)] 30 - dir: Option<PathBuf>, 31 - /// Parallel bundle fetchers 32 - /// 33 - /// Default: 4 for http fetches, 1 for local folder 34 - #[arg(long)] 35 - source_workers: Option<usize>, 36 - /// Bulk load into did-method-plc-compatible postgres instead of stdout 37 - /// 38 - /// Pass a postgres connection url like "postgresql://localhost:5432" 39 - #[arg(long)] 40 - to_postgres: Option<Url>, 41 - /// Delete all operations from the postgres db before starting 42 - /// 43 - /// only used if `--to-postgres` is present 44 - #[arg(long, action)] 45 - postgres_reset: bool, 46 - /// Stop at the week ending before this date 47 - #[arg(long)] 48 - until: Option<Dt>, 49 - /// After the weekly imports, poll upstream until we're caught up 50 - #[arg(long, action)] 51 - catch_up: bool, 23 + #[command(flatten)] 24 + args: backfill::Args, 52 25 }, 53 26 /// Scrape a PLC server, collecting ops into weekly bundles 54 27 /// ··· 73 46 }, 74 47 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 75 48 Mirror { 76 - /// the wrapped did-method-plc server 77 - #[arg(long, env = "ALLEGEDLY_WRAP")] 78 - wrap: Url, 79 - /// the wrapped did-method-plc server's database (write access required) 80 - #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 81 - wrap_pg: Url, 82 - /// wrapping server listen address 83 - #[arg(short, long, env = "ALLEGEDLY_BIND")] 84 - #[clap(default_value = "127.0.0.1:8000")] 85 - bind: SocketAddr, 86 - /// obtain a certificate from letsencrypt 87 - /// 88 - /// for now this will force listening on all interfaces at :80 and :443 89 - /// (:80 will serve an "https required" error, *will not* redirect) 90 - #[arg( 91 - long, 92 - conflicts_with("bind"), 93 - requires("acme_cache_path"), 94 - env = "ALLEGEDLY_ACME_DOMAIN" 95 - )] 96 - acme_domain: Vec<String>, 97 - /// which local directory to keep the letsencrypt certs in 98 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 99 - acme_cache_path: Option<PathBuf>, 100 - /// which public acme directory to use 101 - /// 102 - /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 103 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 104 - #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 105 - acme_directory_url: Url, 49 + #[command(flatten)] 50 + args: mirror::Args, 106 51 }, 107 52 /// Poll an upstream PLC server and log new ops to stdout 108 53 Tail { ··· 112 57 }, 113 58 } 114 59 115 - async fn pages_to_stdout( 116 - mut rx: mpsc::Receiver<ExportPage>, 117 - notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 118 - ) -> anyhow::Result<()> { 119 - let mut last_at = None; 120 - while let Some(page) = rx.recv().await { 121 - for op in &page.ops { 122 - println!("{op}"); 123 - } 124 - if notify_last_at.is_some() 125 - && let Some(s) = PageBoundaryState::new(&page) 126 - { 127 - last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 128 - } 129 - } 130 - if let Some(notify) = notify_last_at { 131 - log::trace!("notifying last_at: {last_at:?}"); 132 - if notify.send(last_at).is_err() { 133 - log::error!("receiver for last_at dropped, can't notify"); 134 - }; 135 - } 136 - Ok(()) 137 - } 138 - 139 - /// page forwarder who drops its channels on receipt of a small page 140 - /// 141 - /// PLC will return up to 1000 ops on a page, and returns full pages until it 142 - /// has caught up, so this is a (hacky?) way to stop polling once we're up. 143 - fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> { 144 - let (tx, fwd) = mpsc::channel(1); 145 - tokio::task::spawn(async move { 146 - while let Some(page) = rx.recv().await 147 - && page.ops.len() > 900 148 - { 149 - tx.send(page).await.unwrap(); 150 - } 151 - }); 152 - fwd 153 - } 154 - 155 60 #[tokio::main] 156 - async fn main() { 61 + async fn main() -> anyhow::Result<()> { 157 62 let args = Cli::parse(); 158 63 let matches = Cli::command().get_matches(); 159 64 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 160 65 bin_init(name); 161 66 67 + let globals = args.globals.clone(); 68 + 162 69 let t0 = Instant::now(); 163 70 match args.command { 164 - Commands::Backfill { 165 - http, 166 - dir, 167 - source_workers, 168 - to_postgres, 169 - postgres_reset, 170 - until, 171 - catch_up, 172 - } => { 173 - let (tx, rx) = mpsc::channel(32); // these are big pages 174 - tokio::task::spawn(async move { 175 - if let Some(dir) = dir { 176 - log::info!("Reading weekly bundles from local folder {dir:?}"); 177 - backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until) 178 - .await 179 - .unwrap(); 180 - } else { 181 - log::info!("Fetching weekly bundles from from {http}"); 182 - backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until) 183 - .await 184 - .unwrap(); 185 - } 186 - }); 187 - 188 - // postgres writer will notify us as soon as the very last op's time is known 189 - // so we can start catching up while pg is restoring indexes and stuff 190 - let (notify_last_at, rx_last) = if catch_up { 191 - let (tx, rx) = oneshot::channel(); 192 - (Some(tx), Some(rx)) 193 - } else { 194 - (None, None) 195 - }; 196 - 197 - let to_postgres_url_bulk = to_postgres.clone(); 198 - let bulk_out_write = tokio::task::spawn(async move { 199 - if let Some(ref url) = to_postgres_url_bulk { 200 - let db = Db::new(url.as_str()).await.unwrap(); 201 - backfill_to_pg(db, postgres_reset, rx, notify_last_at) 202 - .await 203 - .unwrap(); 204 - } else { 205 - pages_to_stdout(rx, notify_last_at).await.unwrap(); 206 - } 207 - }); 208 - 209 - if let Some(rx_last) = rx_last { 210 - let mut upstream = args.upstream; 211 - upstream.set_path("/export"); 212 - // wait until the time for `after` is known 213 - let last_at = rx_last.await.unwrap(); 214 - log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff"); 215 - let (tx, rx) = mpsc::channel(256); // these are small pages 216 - tokio::task::spawn( 217 - async move { poll_upstream(last_at, upstream, tx).await.unwrap() }, 218 - ); 219 - bulk_out_write.await.unwrap(); 220 - log::info!("writing catch-up pages"); 221 - let full_pages = full_pages(rx); 222 - if let Some(url) = to_postgres { 223 - let db = Db::new(url.as_str()).await.unwrap(); 224 - pages_to_pg(db, full_pages).await.unwrap(); 225 - } else { 226 - pages_to_stdout(full_pages, None).await.unwrap(); 227 - } 228 - } 229 - } 71 + Commands::Backfill { args } => backfill::run(globals, args).await?, 230 72 Commands::Bundle { 231 73 dest, 232 74 after, 233 75 clobber, 234 76 } => { 235 - let mut url = args.upstream; 77 + let mut url = globals.upstream; 236 78 url.set_path("/export"); 237 79 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 238 - tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() }); 80 + tokio::task::spawn(async move { 81 + poll_upstream(Some(after), url, tx) 82 + .await 83 + .expect("to poll upstream") 84 + }); 239 85 log::trace!("ensuring output directory exists"); 240 - std::fs::create_dir_all(&dest).unwrap(); 241 - pages_to_weeks(rx, dest, clobber).await.unwrap(); 242 - } 243 - Commands::Mirror { 244 - wrap, 245 - wrap_pg, 246 - bind, 247 - acme_domain, 248 - acme_cache_path, 249 - acme_directory_url, 250 - } => { 251 - let db = Db::new(wrap_pg.as_str()).await.unwrap(); 252 - let latest = db 253 - .get_latest() 86 + create_dir_all(&dest) 254 87 .await 255 - .unwrap() 256 - .expect("there to be at least one op in the db. did you backfill?"); 257 - 258 - let (tx, rx) = mpsc::channel(2); 259 - // upstream poller 260 - let mut url = args.upstream.clone(); 261 - tokio::task::spawn(async move { 262 - log::info!("starting poll reader..."); 263 - url.set_path("/export"); 264 - tokio::task::spawn( 265 - async move { poll_upstream(Some(latest), url, tx).await.unwrap() }, 266 - ); 267 - }); 268 - // db writer 269 - let poll_db = db.clone(); 270 - tokio::task::spawn(async move { 271 - log::info!("starting db writer..."); 272 - pages_to_pg(poll_db, rx).await.unwrap(); 273 - }); 274 - 275 - let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 276 - (_, false, Some(cache_path)) => ListenConf::Acme { 277 - domains: acme_domain, 278 - cache_path, 279 - directory_url: acme_directory_url.to_string(), 280 - }, 281 - (bind, true, None) => ListenConf::Bind(bind), 282 - (_, _, _) => unreachable!(), 283 - }; 284 - 285 - serve(&args.upstream, wrap, listen_conf).await.unwrap(); 88 + .expect("to ensure output dir exists"); 89 + pages_to_weeks(rx, dest, clobber) 90 + .await 91 + .expect("to write bundles to output files"); 286 92 } 93 + Commands::Mirror { args } => mirror::run(globals, args).await?, 287 94 Commands::Tail { after } => { 288 - let mut url = args.upstream; 95 + let mut url = globals.upstream; 289 96 url.set_path("/export"); 290 97 let start_at = after.or_else(|| Some(chrono::Utc::now())); 291 98 let (tx, rx) = mpsc::channel(1); 292 - tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 293 - pages_to_stdout(rx, None).await.unwrap(); 99 + tokio::task::spawn(async move { 100 + poll_upstream(start_at, url, tx) 101 + .await 102 + .expect("to poll upstream") 103 + }); 104 + pages_to_stdout(rx, None) 105 + .await 106 + .expect("to write pages to stdout"); 294 107 } 295 108 } 296 109 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 110 + Ok(()) 297 111 }
+199
src/bin/backfill.rs
··· 1 + use allegedly::{ 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 + bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 4 + }; 5 + use clap::Parser; 6 + use reqwest::Url; 7 + use std::path::PathBuf; 8 + use tokio::{ 9 + sync::{mpsc, oneshot}, 10 + task::JoinSet, 11 + }; 12 + 13 + pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 14 + 15 + #[derive(Debug, clap::Args)] 16 + pub struct Args { 17 + /// Remote URL prefix to fetch bundles from 18 + #[arg(long)] 19 + #[clap(default_value = DEFAULT_HTTP)] 20 + http: Url, 21 + /// Local folder to fetch bundles from (overrides `http`) 22 + #[arg(long)] 23 + dir: Option<PathBuf>, 24 + /// Don't do weekly bulk-loading at all. 25 + /// 26 + /// overrides `http` and `dir`, makes catch_up redundant 27 + #[arg(long, action)] 28 + no_bulk: bool, 29 + /// Parallel bundle fetchers 30 + /// 31 + /// Default: 4 for http fetches, 1 for local folder 32 + #[arg(long)] 33 + source_workers: Option<usize>, 34 + /// Bulk load into did-method-plc-compatible postgres instead of stdout 35 + /// 36 + /// Pass a postgres connection url like "postgresql://localhost:5432" 37 + #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 38 + to_postgres: Option<Url>, 39 + /// Cert for postgres (if needed) 40 + #[arg(long)] 41 + postgres_cert: Option<PathBuf>, 42 + /// Delete all operations from the postgres db before starting 43 + /// 44 + /// only used if `--to-postgres` is present 45 + #[arg(long, action)] 46 + postgres_reset: bool, 47 + /// Stop at the week ending before this date 48 + #[arg(long)] 49 + until: Option<Dt>, 50 + /// After the weekly imports, poll upstream until we're caught up 51 + #[arg(long, action)] 52 + catch_up: bool, 53 + } 54 + 55 + pub async fn run( 56 + GlobalArgs { upstream }: GlobalArgs, 57 + Args { 58 + http, 59 + dir, 60 + no_bulk, 61 + source_workers, 62 + to_postgres, 63 + postgres_cert, 64 + postgres_reset, 65 + until, 66 + catch_up, 67 + }: Args, 68 + ) -> anyhow::Result<()> { 69 + let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 70 + 71 + let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 72 + 73 + // a bulk sink can notify us as soon as the very last op's time is known 74 + // so we can start catching up while the sink might restore indexes and such 75 + let (found_last_tx, found_last_out) = if catch_up { 76 + let (tx, rx) = oneshot::channel(); 77 + (Some(tx), Some(rx)) 78 + } else { 79 + (None, None) 80 + }; 81 + 82 + let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 83 + let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 84 + 85 + // set up sources 86 + if no_bulk { 87 + // simple mode, just poll upstream from teh beginning 88 + if http != DEFAULT_HTTP.parse()? { 89 + log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 90 + } 91 + if let Some(d) = dir { 92 + log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 93 + } 94 + if let Some(u) = until { 95 + log::warn!( 96 + "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 97 + ); 98 + } 99 + let mut upstream = upstream; 100 + upstream.set_path("/export"); 101 + tasks.spawn(poll_upstream(None, upstream, poll_tx)); 102 + tasks.spawn(full_pages(poll_out, full_tx)); 103 + tasks.spawn(pages_to_stdout(full_out, None)); 104 + } else { 105 + // fun mode 106 + 107 + // set up bulk sources 108 + if let Some(dir) = dir { 109 + if http != DEFAULT_HTTP.parse()? { 110 + anyhow::bail!( 111 + "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 112 + ); 113 + } 114 + tasks.spawn(backfill( 115 + FolderSource(dir), 116 + bulk_tx, 117 + source_workers.unwrap_or(1), 118 + until, 119 + )); 120 + } else { 121 + tasks.spawn(backfill( 122 + HttpSource(http), 123 + bulk_tx, 124 + source_workers.unwrap_or(4), 125 + until, 126 + )); 127 + } 128 + 129 + // and the catch-up source... 130 + if let Some(last) = found_last_out { 131 + tasks.spawn(async move { 132 + let mut upstream = upstream; 133 + upstream.set_path("/export"); 134 + poll_upstream(last.await?, upstream, poll_tx).await 135 + }); 136 + } 137 + 138 + // set up sinks 139 + if let Some(pg_url) = to_postgres { 140 + log::trace!("connecting to postgres..."); 141 + let db = Db::new(pg_url.as_str(), postgres_cert).await?; 142 + log::trace!("connected to postgres"); 143 + 144 + tasks.spawn(backfill_to_pg( 145 + db.clone(), 146 + postgres_reset, 147 + bulk_out, 148 + found_last_tx, 149 + )); 150 + if catch_up { 151 + tasks.spawn(pages_to_pg(db, full_out)); 152 + } 153 + } else { 154 + tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 155 + if catch_up { 156 + tasks.spawn(pages_to_stdout(full_out, None)); 157 + } 158 + } 159 + } 160 + 161 + while let Some(next) = tasks.join_next().await { 162 + match next { 163 + Err(e) if e.is_panic() => { 164 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 165 + return Err(e.into()); 166 + } 167 + Err(e) => { 168 + log::error!("a joinset task failed to join: {e}"); 169 + return Err(e.into()); 170 + } 171 + Ok(Err(e)) => { 172 + log::error!("a joinset task completed with error: {e}"); 173 + return Err(e); 174 + } 175 + Ok(Ok(name)) => { 176 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 177 + } 178 + } 179 + } 180 + 181 + Ok(()) 182 + } 183 + 184 + #[derive(Debug, Parser)] 185 + struct CliArgs { 186 + #[command(flatten)] 187 + globals: GlobalArgs, 188 + #[command(flatten)] 189 + args: Args, 190 + } 191 + 192 + #[allow(dead_code)] 193 + #[tokio::main] 194 + async fn main() -> anyhow::Result<()> { 195 + let args = CliArgs::parse(); 196 + bin_init("backfill"); 197 + run(args.globals, args.args).await?; 198 + Ok(()) 199 + }
+128
src/bin/mirror.rs
··· 1 + use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 2 + use clap::Parser; 3 + use reqwest::Url; 4 + use std::{net::SocketAddr, path::PathBuf}; 5 + use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 6 + 7 + #[derive(Debug, clap::Args)] 8 + pub struct Args { 9 + /// the wrapped did-method-plc server 10 + #[arg(long, env = "ALLEGEDLY_WRAP")] 11 + wrap: Url, 12 + /// the wrapped did-method-plc server's database (write access required) 13 + #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 14 + wrap_pg: Url, 15 + /// path to tls cert for the wrapped postgres db, if needed 16 + #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 17 + wrap_pg_cert: Option<PathBuf>, 18 + /// wrapping server listen address 19 + #[arg(short, long, env = "ALLEGEDLY_BIND")] 20 + #[clap(default_value = "127.0.0.1:8000")] 21 + bind: SocketAddr, 22 + /// obtain a certificate from letsencrypt 23 + /// 24 + /// for now this will force listening on all interfaces at :80 and :443 25 + /// (:80 will serve an "https required" error, *will not* redirect) 26 + #[arg( 27 + long, 28 + conflicts_with("bind"), 29 + requires("acme_cache_path"), 30 + env = "ALLEGEDLY_ACME_DOMAIN" 31 + )] 32 + acme_domain: Vec<String>, 33 + /// which local directory to keep the letsencrypt certs in 34 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 35 + acme_cache_path: Option<PathBuf>, 36 + /// which public acme directory to use 37 + /// 38 + /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 39 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 + #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 + acme_directory_url: Url, 42 + } 43 + 44 + pub async fn run( 45 + GlobalArgs { upstream }: GlobalArgs, 46 + Args { 47 + wrap, 48 + wrap_pg, 49 + wrap_pg_cert, 50 + bind, 51 + acme_domain, 52 + acme_cache_path, 53 + acme_directory_url, 54 + }: Args, 55 + ) -> anyhow::Result<()> { 56 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 57 + 58 + // TODO: allow starting up with polling backfill from beginning? 59 + log::debug!("getting the latest op from the db..."); 60 + let latest = db 61 + .get_latest() 62 + .await? 63 + .expect("there to be at least one op in the db. did you backfill?"); 64 + 65 + let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 66 + (_, false, Some(cache_path)) => { 67 + log::info!("configuring acme for https at {acme_domain:?}..."); 68 + create_dir_all(&cache_path).await?; 69 + ListenConf::Acme { 70 + domains: acme_domain, 71 + cache_path, 72 + directory_url: acme_directory_url.to_string(), 73 + } 74 + } 75 + (bind, true, None) => ListenConf::Bind(bind), 76 + (_, _, _) => unreachable!(), 77 + }; 78 + 79 + let mut tasks = JoinSet::new(); 80 + 81 + let (send_page, recv_page) = mpsc::channel(8); 82 + 83 + let mut poll_url = upstream.clone(); 84 + poll_url.set_path("/export"); 85 + 86 + tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 87 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 88 + tasks.spawn(serve(upstream, wrap, listen_conf)); 89 + 90 + while let Some(next) = tasks.join_next().await { 91 + match next { 92 + Err(e) if e.is_panic() => { 93 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 94 + return Err(e.into()); 95 + } 96 + Err(e) => { 97 + log::error!("a joinset task failed to join: {e}"); 98 + return Err(e.into()); 99 + } 100 + Ok(Err(e)) => { 101 + log::error!("a joinset task completed with error: {e}"); 102 + return Err(e); 103 + } 104 + Ok(Ok(name)) => { 105 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 106 + } 107 + } 108 + } 109 + 110 + Ok(()) 111 + } 112 + 113 + #[derive(Debug, Parser)] 114 + struct CliArgs { 115 + #[command(flatten)] 116 + globals: GlobalArgs, 117 + #[command(flatten)] 118 + args: Args, 119 + } 120 + 121 + #[allow(dead_code)] 122 + #[tokio::main] 123 + async fn main() -> anyhow::Result<()> { 124 + let args = CliArgs::parse(); 125 + bin_init("mirror"); 126 + run(args.globals, args.args).await?; 127 + Ok(()) 128 + }
+14
src/bin/mod.rs
··· 1 + use reqwest::Url; 2 + 3 + #[derive(Debug, Clone, clap::Args)] 4 + pub struct GlobalArgs { 5 + /// Upstream PLC server 6 + #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 + #[clap(default_value = "https://plc.directory")] 8 + pub upstream: Url, 9 + } 10 + 11 + #[allow(dead_code)] 12 + fn main() { 13 + panic!("this is not actually a module") 14 + }
+4 -1
src/client.rs
··· 10 10 ); 11 11 12 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 - let inner = Client::builder().user_agent(UA).build().unwrap(); 13 + let inner = Client::builder() 14 + .user_agent(UA) 15 + .build() 16 + .expect("reqwest client to build"); 14 17 15 18 let policy = ExponentialBackoff::builder().build_with_max_retries(12); 16 19
+79 -10
src/lib.rs
··· 1 - use serde::Deserialize; 1 + use serde::{Deserialize, Serialize}; 2 + use tokio::sync::{mpsc, oneshot}; 2 3 3 4 mod backfill; 4 5 mod client; ··· 7 8 mod poll; 8 9 mod ratelimit; 9 10 mod weekly; 11 + 12 + pub mod bin; 10 13 11 14 pub use backfill::backfill; 12 15 pub use client::{CLIENT, UA}; ··· 23 26 /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 24 27 #[derive(Debug)] 25 28 pub struct ExportPage { 26 - pub ops: Vec<String>, 29 + pub ops: Vec<Op>, 27 30 } 28 31 29 32 impl ExportPage { ··· 35 38 /// A fully-deserialized plc operation 36 39 /// 37 40 /// including the plc's wrapping with timestmap and nullified state 38 - #[derive(Debug, Deserialize)] 41 + #[derive(Debug, Clone, Deserialize, Serialize)] 39 42 #[serde(rename_all = "camelCase")] 40 - pub struct Op<'a> { 41 - pub did: &'a str, 42 - pub cid: &'a str, 43 + pub struct Op { 44 + pub did: String, 45 + pub cid: String, 43 46 pub created_at: Dt, 44 47 pub nullified: bool, 45 - #[serde(borrow)] 46 - pub operation: &'a serde_json::value::RawValue, 48 + pub operation: Box<serde_json::value::RawValue>, 49 + } 50 + 51 + #[cfg(test)] 52 + impl PartialEq for Op { 53 + fn eq(&self, other: &Self) -> bool { 54 + self.did == other.did 55 + && self.cid == other.cid 56 + && self.created_at == other.created_at 57 + && self.nullified == other.nullified 58 + && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 59 + == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 60 + } 47 61 } 48 62 49 63 /// Database primary key for an op ··· 53 67 pub cid: String, 54 68 } 55 69 56 - impl From<&Op<'_>> for OpKey { 57 - fn from(Op { did, cid, .. }: &Op<'_>) -> Self { 70 + impl From<&Op> for OpKey { 71 + fn from(Op { did, cid, .. }: &Op) -> Self { 58 72 Self { 59 73 did: did.to_string(), 60 74 cid: cid.to_string(), 61 75 } 62 76 } 77 + } 78 + 79 + /// page forwarder who drops its channels on receipt of a small page 80 + /// 81 + /// PLC will return up to 1000 ops on a page, and returns full pages until it 82 + /// has caught up, so this is a (hacky?) way to stop polling once we're up. 83 + pub async fn full_pages( 84 + mut rx: mpsc::Receiver<ExportPage>, 85 + tx: mpsc::Sender<ExportPage>, 86 + ) -> anyhow::Result<&'static str> { 87 + while let Some(page) = rx.recv().await { 88 + let n = page.ops.len(); 89 + if n < 900 { 90 + let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 91 + let Some(age) = last_age else { 92 + log::info!("full_pages done, empty final page"); 93 + return Ok("full pages (hmm)"); 94 + }; 95 + if age <= chrono::TimeDelta::hours(6) { 96 + log::info!("full_pages done, final page of {n} ops"); 97 + } else { 98 + log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 99 + } 100 + return Ok("full pages (cool)"); 101 + } 102 + log::trace!("full_pages: continuing with page of {n} ops"); 103 + tx.send(page).await?; 104 + } 105 + Err(anyhow::anyhow!( 106 + "full_pages ran out of source material, sender closed" 107 + )) 108 + } 109 + 110 + pub async fn pages_to_stdout( 111 + mut rx: mpsc::Receiver<ExportPage>, 112 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 113 + ) -> anyhow::Result<&'static str> { 114 + let mut last_at = None; 115 + while let Some(page) = rx.recv().await { 116 + for op in &page.ops { 117 + println!("{}", serde_json::to_string(op)?); 118 + } 119 + if notify_last_at.is_some() 120 + && let Some(s) = PageBoundaryState::new(&page) 121 + { 122 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 123 + } 124 + } 125 + if let Some(notify) = notify_last_at { 126 + log::trace!("notifying last_at: {last_at:?}"); 127 + if notify.send(last_at).is_err() { 128 + log::error!("receiver for last_at dropped, can't notify"); 129 + }; 130 + } 131 + Ok("pages_to_stdout") 63 132 } 64 133 65 134 pub fn logo(name: &str) -> String {
+46 -24
src/mirror.rs
··· 24 24 format!( 25 25 r#"{} 26 26 27 - This is a PLC[1] mirror running Allegedly[2] in mirror mode. Allegedly synchronizes and proxies to a downstream PLC reference server instance[3] (why?[4]). 27 + This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 28 + synchronizes a local PLC reference server instance[2] (why?[3]). 28 29 29 30 30 31 Configured upstream: ··· 34 35 35 36 Available APIs: 36 37 37 - - All PLC GET requests [5]. 38 - - Rejects POSTs. This is a mirror. 38 + - GET /_health Health and version info 39 + 40 + - GET /* Proxies to wrapped server; see PLC API docs: 41 + https://web.plc.directory/api/redoc 42 + 43 + - POST /* Always rejected. This is a mirror. 44 + 45 + 46 + tip: try `GET /{{did}}` to resolve an identity 47 + 48 + 49 + Allegedly is a suit of open-source CLI tools for working with PLC logs: 39 50 40 - try `GET /{{did}}` to resolve an identity 51 + https://tangled.org/@microcosm.blue/Allegedly 41 52 42 53 43 54 [1] https://web.plc.directory 44 - [2] https://tangled.org/@microcosm.blue/Allegedly 45 - [3] https://github.com/did-method-plc/did-method-plc 46 - [4] https://updates.microcosm.blue/3lz7nwvh4zc2u 47 - [5] https://web.plc.directory/api/redoc 48 - 55 + [2] https://github.com/did-method-plc/did-method-plc 56 + [3] https://updates.microcosm.blue/3lz7nwvh4zc2u 49 57 "#, 50 58 logo("mirror") 51 59 ) 60 + } 61 + 62 + #[handler] 63 + fn favicon() -> impl IntoResponse { 64 + include_bytes!("../favicon.ico").with_content_type("image/x-icon") 52 65 } 53 66 54 67 fn failed_to_reach_wrapped() -> String { ··· 186 199 Bind(SocketAddr), 187 200 } 188 201 189 - pub async fn serve(upstream: &Url, plc: Url, listen: ListenConf) -> std::io::Result<()> { 202 + pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 203 + log::info!("starting server..."); 204 + 190 205 // not using crate CLIENT: don't want the retries etc 191 206 let client = Client::builder() 192 207 .user_agent(UA) 193 208 .timeout(Duration::from_secs(10)) // fallback 194 209 .build() 195 - .unwrap(); 210 + .expect("reqwest client to build"); 196 211 197 212 let state = State { 198 213 client, ··· 202 217 203 218 let app = Route::new() 204 219 .at("/", get(hello)) 220 + .at("/favicon.ico", get(favicon)) 205 221 .at("/_health", get(health)) 206 222 .at("/:any", get(proxy).post(nope)) 207 223 .with(AddData::new(state)) 208 224 .with(Cors::new().allow_credentials(false)) 209 225 .with(Compression::new()) 210 226 .with(GovernorMiddleware::new(Quota::per_minute( 211 - 3000.try_into().unwrap(), 227 + 3000.try_into().expect("ratelimit middleware to build"), 212 228 ))) 213 229 .with(CatchPanic::new()) 214 230 .with(Tracing); ··· 231 247 } 232 248 let auto_cert = auto_cert.build().expect("acme config to build"); 233 249 234 - run_insecure_notice(); 235 - run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await 250 + let notice_task = tokio::task::spawn(run_insecure_notice()); 251 + let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 252 + log::warn!("server task ended, aborting insecure server task..."); 253 + notice_task.abort(); 254 + app_res?; 255 + notice_task.await??; 236 256 } 237 - ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await, 257 + ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 238 258 } 259 + 260 + Ok("server (uh oh?)") 239 261 } 240 262 241 263 async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> ··· 250 272 } 251 273 252 274 /// kick off a tiny little server on a tokio task to tell people to use 443 253 - fn run_insecure_notice() { 275 + async fn run_insecure_notice() -> Result<(), std::io::Error> { 254 276 #[handler] 255 277 fn oop_plz_be_secure() -> (StatusCode, String) { 256 278 ( ··· 265 287 ) 266 288 } 267 289 268 - let app = Route::new().at("/", get(oop_plz_be_secure)).with(Tracing); 269 - let listener = TcpListener::bind("0.0.0.0:80"); 270 - tokio::task::spawn(async move { 271 - Server::new(listener) 272 - .name("allegedly (mirror:80 helper)") 273 - .run(app) 274 - .await 275 - }); 290 + let app = Route::new() 291 + .at("/", get(oop_plz_be_secure)) 292 + .at("/favicon.ico", get(favicon)) 293 + .with(Tracing); 294 + Server::new(TcpListener::bind("0.0.0.0:80")) 295 + .name("allegedly (mirror:80 helper)") 296 + .run(app) 297 + .await 276 298 }
+69 -45
src/plc_pg.rs
··· 1 - use crate::{Dt, ExportPage, Op, PageBoundaryState}; 1 + use crate::{Dt, ExportPage, PageBoundaryState}; 2 + use native_tls::{Certificate, TlsConnector}; 3 + use postgres_native_tls::MakeTlsConnector; 4 + use std::path::PathBuf; 2 5 use std::pin::pin; 3 6 use std::time::Instant; 4 - use tokio::sync::{mpsc, oneshot}; 7 + use tokio::{ 8 + sync::{mpsc, oneshot}, 9 + task::{JoinHandle, spawn}, 10 + }; 5 11 use tokio_postgres::{ 6 - Client, Error as PgError, NoTls, 12 + Client, Error as PgError, NoTls, Socket, 7 13 binary_copy::BinaryCopyInWriter, 8 14 connect, 15 + tls::MakeTlsConnect, 9 16 types::{Json, Type}, 10 17 }; 11 18 19 + fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> { 20 + let cert = std::fs::read(cert)?; 21 + let cert = Certificate::from_pem(&cert)?; 22 + let connector = TlsConnector::builder().add_root_certificate(cert).build()?; 23 + Ok(MakeTlsConnector::new(connector)) 24 + } 25 + 26 + async fn get_client_and_task<T>( 27 + uri: &str, 28 + connector: T, 29 + ) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> 30 + where 31 + T: MakeTlsConnect<Socket>, 32 + <T as MakeTlsConnect<Socket>>::Stream: Send + 'static, 33 + { 34 + let (client, connection) = connect(uri, connector).await?; 35 + Ok((client, spawn(connection))) 36 + } 37 + 12 38 /// a little tokio-postgres helper 13 39 /// 14 40 /// it's clone for easiness. it doesn't share any resources underneath after 15 - /// cloning at all so it's not meant for 16 - #[derive(Debug, Clone)] 41 + /// cloning *at all* so it's not meant for eg. handling public web requests 42 + #[derive(Clone)] 17 43 pub struct Db { 18 44 pg_uri: String, 45 + cert: Option<MakeTlsConnector>, 19 46 } 20 47 21 48 impl Db { 22 - pub async fn new(pg_uri: &str) -> Result<Self, anyhow::Error> { 49 + pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> { 23 50 // we're going to interact with did-method-plc's database, so make sure 24 51 // it's what we expect: check for db migrations. 25 52 log::trace!("checking migrations..."); 26 - let (client, connection) = connect(pg_uri, NoTls).await?; 27 - let connection_task = tokio::task::spawn(async move { 28 - connection 29 - .await 30 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 31 - .unwrap(); 32 - }); 53 + 54 + let connector = cert.map(get_tls).transpose()?; 55 + 56 + let (client, conn_task) = if let Some(ref connector) = connector { 57 + get_client_and_task(pg_uri, connector.clone()).await? 58 + } else { 59 + get_client_and_task(pg_uri, NoTls).await? 60 + }; 61 + 33 62 let migrations: Vec<String> = client 34 63 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 35 64 .await? ··· 47 76 ); 48 77 drop(client); 49 78 // make sure the connection worker thing doesn't linger 50 - connection_task.await?; 79 + conn_task.await??; 51 80 log::info!("db connection succeeded and plc migrations appear as expected"); 52 81 53 82 Ok(Self { 54 83 pg_uri: pg_uri.to_string(), 84 + cert: connector, 55 85 }) 56 86 } 57 87 58 - pub async fn connect(&self) -> Result<Client, PgError> { 88 + pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> { 59 89 log::trace!("connecting postgres..."); 60 - let (client, connection) = connect(&self.pg_uri, NoTls).await?; 61 - 62 - // send the connection away to do the actual communication work 63 - // apparently the connection will complete when the client drops 64 - tokio::task::spawn(async move { 65 - connection 66 - .await 67 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 68 - .unwrap(); 69 - }); 70 - 71 - Ok(client) 90 + if let Some(ref connector) = self.cert { 91 + get_client_and_task(&self.pg_uri, connector.clone()).await 92 + } else { 93 + get_client_and_task(&self.pg_uri, NoTls).await 94 + } 72 95 } 73 96 74 97 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> { 75 - let client = self.connect().await?; 98 + let (client, task) = self.connect().await?; 76 99 let dt: Option<Dt> = client 77 100 .query_opt( 78 101 r#"SELECT "createdAt" ··· 83 106 ) 84 107 .await? 85 108 .map(|row| row.get(0)); 109 + drop(task); 86 110 Ok(dt) 87 111 } 88 112 } 89 113 90 - pub async fn pages_to_pg(db: Db, mut pages: mpsc::Receiver<ExportPage>) -> Result<(), PgError> { 91 - let mut client = db.connect().await?; 114 + pub async fn pages_to_pg( 115 + db: Db, 116 + mut pages: mpsc::Receiver<ExportPage>, 117 + ) -> anyhow::Result<&'static str> { 118 + log::info!("starting pages_to_pg writer..."); 119 + 120 + let (mut client, task) = db.connect().await?; 92 121 93 122 let ops_stmt = client 94 123 .prepare( ··· 108 137 while let Some(page) = pages.recv().await { 109 138 log::trace!("writing page with {} ops", page.ops.len()); 110 139 let tx = client.transaction().await?; 111 - for s in page.ops { 112 - let Ok(op) = serde_json::from_str::<Op>(&s) else { 113 - log::warn!("ignoring unparseable op {s:?}"); 114 - continue; 115 - }; 140 + for op in page.ops { 116 141 ops_inserted += tx 117 142 .execute( 118 143 &ops_stmt, ··· 129 154 } 130 155 tx.commit().await?; 131 156 } 157 + drop(task); 132 158 133 159 log::info!( 134 160 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 135 161 t0.elapsed() 136 162 ); 137 - Ok(()) 163 + Ok("pages_to_pg") 138 164 } 139 165 140 166 /// Dump rows into an empty operations table quickly ··· 155 181 reset: bool, 156 182 mut pages: mpsc::Receiver<ExportPage>, 157 183 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 158 - ) -> Result<(), PgError> { 159 - let mut client = db.connect().await?; 184 + ) -> anyhow::Result<&'static str> { 185 + let (mut client, task) = db.connect().await?; 160 186 161 187 let t0 = Instant::now(); 162 188 let tx = client.transaction().await?; ··· 212 238 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 213 239 let mut last_at = None; 214 240 while let Some(page) = pages.recv().await { 215 - for s in &page.ops { 216 - let Ok(op) = serde_json::from_str::<Op>(s) else { 217 - log::warn!("ignoring unparseable op: {s:?}"); 218 - continue; 219 - }; 241 + for op in &page.ops { 220 242 writer 221 243 .as_mut() 222 244 .write(&[ 223 245 &op.did, 224 - &Json(op.operation), 246 + &Json(op.operation.clone()), 225 247 &op.cid, 226 248 &op.nullified, 227 249 &op.created_at, ··· 234 256 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 235 257 } 236 258 } 259 + log::debug!("finished receiving bulk pages"); 237 260 238 261 if let Some(notify) = notify_last_at { 239 262 log::trace!("notifying last_at: {last_at:?}"); ··· 274 297 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 275 298 276 299 tx.commit().await?; 300 + drop(task); 277 301 log::info!("total backfill time: {:?}", t0.elapsed()); 278 302 279 - Ok(()) 303 + Ok("backfill_to_pg") 280 304 }
+66 -254
src/poll.rs
··· 26 26 pk: (String, String), // did, cid 27 27 } 28 28 29 - impl From<Op<'_>> for LastOp { 29 + impl From<Op> for LastOp { 30 30 fn from(op: Op) -> Self { 31 31 Self { 32 32 created_at: op.created_at, 33 - pk: (op.did.to_string(), op.cid.to_string()), 33 + pk: (op.did, op.cid), 34 34 } 35 35 } 36 36 } 37 37 38 + impl From<&Op> for LastOp { 39 + fn from(op: &Op) -> Self { 40 + Self { 41 + created_at: op.created_at, 42 + pk: (op.did.clone(), op.cid.clone()), 43 + } 44 + } 45 + } 46 + 47 + // bit of a hack 38 48 impl From<Dt> for LastOp { 39 49 fn from(dt: Dt) -> Self { 40 50 Self { ··· 51 61 keys_at: Vec<OpKey>, // expected to ~always be length one 52 62 } 53 63 54 - // ok so this is silly. 55 - // 56 - // i think i had some idea that deferring parsing to later steps would make it 57 - // easier to do things like sometimes not parsing at all (where the output is 58 - // also json lines), and maybe avoid some memory shuffling. 59 - // but since the input already has to be split into lines, keeping them as line 60 - // strings is probably the worst option: space-inefficient, allows garbage, and 61 - // leads to, well, this impl. 62 - // 63 - // it almost could have been slick if the *original* was just reused, and the 64 - // parsed ops were just kind of on the side referencing into it, but i'm lazy 65 - // and didn't get it there. 66 - // 67 - // should unrefactor to make Op own its data again, parse (and deal with errors) 68 - // upfront, and probably greatly simplify everything downstream. simple. 64 + /// track keys at final createdAt to deduplicate the start of the next page 69 65 impl PageBoundaryState { 70 66 pub fn new(page: &ExportPage) -> Option<Self> { 71 - let mut skips = 0; 72 - 73 67 // grab the very last op 74 - let (last_at, last_key) = loop { 75 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 76 - // there are no ops left? oop. bail. 77 - // last_at and existing keys remain in tact if there was no later op 78 - return None; 79 - }; 80 - if s.is_empty() { 81 - // annoying: ignore any trailing blank lines 82 - skips += 1; 83 - continue; 84 - } 85 - let Ok(op) = serde_json::from_str::<Op>(&s) 86 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 87 - else { 88 - // doubly annoying: skip over trailing garbage?? 89 - skips += 1; 90 - continue; 91 - }; 92 - break (op.created_at, Into::<OpKey>::into(&op)); 93 - }; 68 + let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 94 69 95 70 // set initial state 96 71 let mut me = Self { ··· 99 74 }; 100 75 101 76 // and make sure all keys at this time are captured from the back 102 - me.capture_nth_last_at(page, last_at, skips); 77 + me.capture_nth_last_at(page, last_at, 1); 103 78 104 79 Some(me) 105 80 } ··· 108 83 let to_remove: Vec<usize> = page 109 84 .ops 110 85 .iter() 111 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 112 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 113 86 .enumerate() 114 - .take_while(|(_, opr)| opr.as_ref().map(|op| op.created_at == self.last_at).unwrap_or(false)) 115 - .filter_map(|(i, opr)| { 116 - if self.keys_at.contains(&(&opr.expect("any Errs were filtered by take_while")).into()) { 117 - Some(i) 118 - } else { None } 119 - }) 87 + .take_while(|(_, op)| op.created_at == self.last_at) 88 + .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 89 + .map(|(i, _)| i) 120 90 .collect(); 121 91 122 - // actually remove them. last to first to indices don't shift 92 + // actually remove them. last to first so indices don't shift 123 93 for dup_idx in to_remove.into_iter().rev() { 124 94 page.ops.remove(dup_idx); 125 95 } 126 96 127 97 // grab the very last op 128 - let mut skips = 0; 129 - let (last_at, last_key) = loop { 130 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 131 - // there are no ops left? oop. bail. 132 - // last_at and existing keys remain in tact if there was no later op 133 - return; 134 - }; 135 - if s.is_empty() { 136 - // annoying: trim off any trailing blank lines 137 - skips += 1; 138 - continue; 139 - } 140 - let Ok(op) = serde_json::from_str::<Op>(&s) 141 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 142 - else { 143 - // doubly annoying: skip over trailing garbage?? 144 - skips += 1; 145 - continue; 146 - }; 147 - break (op.created_at, Into::<OpKey>::into(&op)); 98 + let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 99 + // there are no ops left? oop. bail. 100 + // last_at and existing keys remain in tact 101 + return; 148 102 }; 149 103 150 104 // reset state (as long as time actually moved forward on this page) ··· 157 111 self.keys_at.push(last_key); 158 112 } 159 113 // and make sure all keys at this time are captured from the back 160 - self.capture_nth_last_at(page, last_at, skips); 114 + self.capture_nth_last_at(page, last_at, 1); 161 115 } 162 116 163 117 /// walk backwards from 2nd last and collect keys until created_at changes ··· 166 120 .iter() 167 121 .rev() 168 122 .skip(skips) 169 - .skip(1) // we alredy added the very last one 170 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 171 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 172 - .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false)) 173 - .for_each(|opr| { 174 - let op = &opr.expect("any Errs were filtered by take_while"); 123 + .take_while(|op| op.created_at == last_at) 124 + .for_each(|op| { 175 125 self.keys_at.push(op.into()); 176 126 }); 177 127 } ··· 180 130 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 181 131 log::trace!("Getting page: {url}"); 182 132 183 - let ops: Vec<String> = CLIENT 133 + let ops: Vec<Op> = CLIENT 184 134 .get(url) 185 135 .send() 186 136 .await? ··· 190 140 .trim() 191 141 .split('\n') 192 142 .filter_map(|s| { 193 - let s = s.trim(); 194 - if s.is_empty() { None } else { Some(s) } 143 + serde_json::from_str::<Op>(s) 144 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 145 + .ok() 195 146 }) 196 - .map(Into::into) 197 147 .collect(); 198 148 199 - let last_op = ops 200 - .last() 201 - .filter(|s| !s.is_empty()) 202 - .map(|s| serde_json::from_str::<Op>(s)) 203 - .transpose()? 204 - .map(Into::into) 205 - .inspect(|at| log::trace!("new last op: {at:?}")); 149 + let last_op = ops.last().map(Into::into); 206 150 207 151 Ok((ExportPage { ops }, last_op)) 208 152 } ··· 211 155 after: Option<Dt>, 212 156 base: Url, 213 157 dest: mpsc::Sender<ExportPage>, 214 - ) -> anyhow::Result<()> { 158 + ) -> anyhow::Result<&'static str> { 159 + log::info!("starting upstream poller after {after:?}"); 215 160 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 216 161 let mut prev_last: Option<LastOp> = after.map(Into::into); 217 162 let mut boundary_state: Option<PageBoundaryState> = None; ··· 252 197 const FIVES_TS: i64 = 1431648000; 253 198 const NEXT_TS: i64 = 1431648001; 254 199 255 - fn valid_op() -> serde_json::Value { 256 - serde_json::json!({ 200 + fn valid_op() -> Op { 201 + serde_json::from_value(serde_json::json!({ 257 202 "did": "did", 258 203 "cid": "cid", 259 204 "createdAt": "2015-05-15T00:00:00Z", 260 205 "nullified": false, 261 206 "operation": {}, 262 - }) 207 + })) 208 + .unwrap() 263 209 } 264 210 265 - fn next_op() -> serde_json::Value { 266 - serde_json::json!({ 211 + fn next_op() -> Op { 212 + serde_json::from_value(serde_json::json!({ 267 213 "did": "didnext", 268 214 "cid": "cidnext", 269 215 "createdAt": "2015-05-15T00:00:01Z", 270 216 "nullified": false, 271 217 "operation": {}, 272 - }) 218 + })) 219 + .unwrap() 273 220 } 274 221 275 222 fn base_state() -> PageBoundaryState { 276 223 let page = ExportPage { 277 - ops: vec![valid_op().to_string()], 224 + ops: vec![valid_op()], 278 225 }; 279 - PageBoundaryState::new(&page).unwrap() 226 + PageBoundaryState::new(&page).expect("to have a base page boundary state") 280 227 } 281 228 282 229 #[test] ··· 287 234 } 288 235 289 236 #[test] 290 - fn test_boundary_new_empty_op() { 291 - let page = ExportPage { 292 - ops: vec!["".to_string()], 293 - }; 294 - let state = PageBoundaryState::new(&page); 295 - assert!(state.is_none()); 296 - } 297 - 298 - #[test] 299 - fn test_boundary_new_ignores_bad_op() { 300 - let page = ExportPage { 301 - ops: vec!["bad".to_string()], 302 - }; 303 - let state = PageBoundaryState::new(&page); 304 - assert!(state.is_none()); 305 - } 306 - 307 - #[test] 308 - fn test_boundary_new_multiple_bad_end() { 309 - let page = ExportPage { 310 - ops: vec![ 311 - "bad".to_string(), 312 - "".to_string(), 313 - "foo".to_string(), 314 - "".to_string(), 315 - ], 316 - }; 317 - let state = PageBoundaryState::new(&page); 318 - assert!(state.is_none()); 319 - } 320 - 321 - #[test] 322 237 fn test_boundary_new_one_op() { 323 238 let page = ExportPage { 324 - ops: vec![valid_op().to_string()], 239 + ops: vec![valid_op()], 325 240 }; 326 241 let state = PageBoundaryState::new(&page).unwrap(); 327 242 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); ··· 335 250 } 336 251 337 252 #[test] 338 - fn test_boundary_new_one_op_with_stuff() { 339 - let expect_same_state = |m, ops| { 340 - let this_state = PageBoundaryState::new(&ExportPage { ops }).unwrap(); 341 - assert_eq!(this_state, base_state(), "{}", m); 342 - }; 343 - 344 - expect_same_state("empty before", vec!["".to_string(), valid_op().to_string()]); 345 - 346 - expect_same_state("empty after", vec![valid_op().to_string(), "".to_string()]); 347 - 348 - expect_same_state( 349 - "bad before, empty after", 350 - vec!["bad".to_string(), valid_op().to_string(), "".to_string()], 351 - ); 352 - 353 - expect_same_state( 354 - "bad and empty before and after", 355 - vec![ 356 - "".to_string(), 357 - "bad".to_string(), 358 - valid_op().to_string(), 359 - "".to_string(), 360 - "bad".to_string(), 361 - ], 362 - ); 363 - } 364 - 365 - #[test] 366 253 fn test_add_new_empty() { 367 254 let mut state = base_state(); 368 255 state.apply_to_next(&mut ExportPage { ops: vec![] }); ··· 370 257 } 371 258 372 259 #[test] 373 - fn test_add_new_empty_op() { 374 - let mut state = base_state(); 375 - state.apply_to_next(&mut ExportPage { 376 - ops: vec!["".to_string()], 377 - }); 378 - assert_eq!(state, base_state()); 379 - } 380 - 381 - #[test] 382 - fn test_add_new_ignores_bad_op() { 383 - let mut state = base_state(); 384 - state.apply_to_next(&mut ExportPage { 385 - ops: vec!["bad".to_string()], 386 - }); 387 - assert_eq!(state, base_state()); 388 - } 389 - 390 - #[test] 391 - fn test_add_new_multiple_bad() { 392 - let mut page = ExportPage { 393 - ops: vec![ 394 - "bad".to_string(), 395 - "".to_string(), 396 - "foo".to_string(), 397 - "".to_string(), 398 - ], 399 - }; 400 - 401 - let mut state = base_state(); 402 - state.apply_to_next(&mut page); 403 - assert_eq!(state, base_state()); 404 - } 405 - 406 - #[test] 407 260 fn test_add_new_same_op() { 408 261 let mut page = ExportPage { 409 - ops: vec![valid_op().to_string()], 262 + ops: vec![valid_op()], 410 263 }; 411 264 let mut state = base_state(); 412 265 state.apply_to_next(&mut page); ··· 417 270 fn test_add_new_same_time() { 418 271 // make an op with a different OpKey 419 272 let mut op = valid_op(); 420 - op.as_object_mut() 421 - .unwrap() 422 - .insert("cid".to_string(), "cid2".into()); 423 - let mut page = ExportPage { 424 - ops: vec![op.to_string()], 425 - }; 273 + op.cid = "cid2".to_string(); 274 + let mut page = ExportPage { ops: vec![op] }; 426 275 427 276 let mut state = base_state(); 428 277 state.apply_to_next(&mut page); ··· 446 295 fn test_add_new_same_time_dup_before() { 447 296 // make an op with a different OpKey 448 297 let mut op = valid_op(); 449 - op.as_object_mut() 450 - .unwrap() 451 - .insert("cid".to_string(), "cid2".into()); 298 + op.cid = "cid2".to_string(); 452 299 let mut page = ExportPage { 453 - ops: vec![valid_op().to_string(), op.to_string()], 300 + ops: vec![valid_op(), op], 454 301 }; 455 302 456 303 let mut state = base_state(); ··· 475 322 fn test_add_new_same_time_dup_after() { 476 323 // make an op with a different OpKey 477 324 let mut op = valid_op(); 478 - op.as_object_mut() 479 - .unwrap() 480 - .insert("cid".to_string(), "cid2".into()); 481 - let mut page = ExportPage { 482 - ops: vec![op.to_string(), valid_op().to_string()], 483 - }; 484 - 485 - let mut state = base_state(); 486 - state.apply_to_next(&mut page); 487 - assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 488 - assert_eq!( 489 - state.keys_at, 490 - vec![ 491 - OpKey { 492 - cid: "cid".to_string(), 493 - did: "did".to_string(), 494 - }, 495 - OpKey { 496 - cid: "cid2".to_string(), 497 - did: "did".to_string(), 498 - }, 499 - ] 500 - ); 501 - } 502 - 503 - #[test] 504 - fn test_add_new_same_time_blank_after() { 505 - // make an op with a different OpKey 506 - let mut op = valid_op(); 507 - op.as_object_mut() 508 - .unwrap() 509 - .insert("cid".to_string(), "cid2".into()); 325 + op.cid = "cid2".to_string(); 510 326 let mut page = ExportPage { 511 - ops: vec![op.to_string(), "".to_string()], 327 + ops: vec![op, valid_op()], 512 328 }; 513 329 514 330 let mut state = base_state(); ··· 532 348 #[test] 533 349 fn test_add_new_next_time() { 534 350 let mut page = ExportPage { 535 - ops: vec![next_op().to_string()], 351 + ops: vec![next_op()], 536 352 }; 537 353 let mut state = base_state(); 538 354 state.apply_to_next(&mut page); ··· 549 365 #[test] 550 366 fn test_add_new_next_time_with_dup() { 551 367 let mut page = ExportPage { 552 - ops: vec![valid_op().to_string(), next_op().to_string()], 368 + ops: vec![valid_op(), next_op()], 553 369 }; 554 370 let mut state = base_state(); 555 371 state.apply_to_next(&mut page); ··· 562 378 },] 563 379 ); 564 380 assert_eq!(page.ops.len(), 1); 565 - assert_eq!(page.ops[0], next_op().to_string()); 381 + assert_eq!(page.ops[0], next_op()); 566 382 } 567 383 568 384 #[test] 569 385 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 570 386 // make an op with a different OpKey 571 387 let mut op = valid_op(); 572 - op.as_object_mut() 573 - .unwrap() 574 - .insert("cid".to_string(), "cid2".into()); 388 + op.cid = "cid2".to_string(); 575 389 576 390 let mut page = ExportPage { 577 391 ops: vec![ 578 - valid_op().to_string(), // should get dropped 579 - op.to_string(), // should be kept 580 - next_op().to_string(), 392 + valid_op(), // should get dropped 393 + op.clone(), // should be kept 394 + next_op(), 581 395 ], 582 396 }; 583 397 let mut state = base_state(); ··· 591 405 },] 592 406 ); 593 407 assert_eq!(page.ops.len(), 2); 594 - assert_eq!(page.ops[0], op.to_string()); 595 - assert_eq!(page.ops[1], next_op().to_string()); 408 + assert_eq!(page.ops[0], op); 409 + assert_eq!(page.ops[1], next_op()); 596 410 } 597 411 598 412 #[test] 599 413 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 600 414 // make an op with a different OpKey 601 415 let mut op = valid_op(); 602 - op.as_object_mut() 603 - .unwrap() 604 - .insert("cid".to_string(), "cid2".into()); 416 + op.cid = "cid2".to_string(); 605 417 606 418 let mut page = ExportPage { 607 419 ops: vec![ 608 - op.to_string(), // should be kept 609 - valid_op().to_string(), // should get dropped 610 - next_op().to_string(), 420 + op.clone(), // should be kept 421 + valid_op(), // should get dropped 422 + next_op(), 611 423 ], 612 424 }; 613 425 let mut state = base_state(); ··· 621 433 },] 622 434 ); 623 435 assert_eq!(page.ops.len(), 2); 624 - assert_eq!(page.ops[0], op.to_string()); 625 - assert_eq!(page.ops[1], next_op().to_string()); 436 + assert_eq!(page.ops[0], op); 437 + assert_eq!(page.ops[1], next_op()); 626 438 } 627 439 }
+15 -7
src/ratelimit.rs
··· 24 24 let period = quota.replenish_interval() / factor; 25 25 let burst = quota 26 26 .burst_size() 27 - .checked_mul(factor.try_into().unwrap()) 28 - .unwrap(); 27 + .checked_mul(factor.try_into().expect("factor to be non-zero")) 28 + .expect("burst to be able to multiply"); 29 29 Quota::with_period(period).map(|q| q.allow_burst(burst)) 30 30 } 31 31 ··· 40 40 pub fn new(quota: Quota) -> Self { 41 41 Self { 42 42 per_ip: RateLimiter::keyed(quota), 43 - ip6_56: RateLimiter::keyed(scale_quota(quota, 8).unwrap()), 44 - ip6_48: RateLimiter::keyed(scale_quota(quota, 256).unwrap()), 43 + ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")), 44 + ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 45 } 46 46 } 47 47 pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { ··· 56 56 .map_err(asdf); 57 57 let check_56 = self 58 58 .ip6_56 59 - .check_key(a.octets()[..7].try_into().unwrap()) 59 + .check_key( 60 + a.octets()[..7] 61 + .try_into() 62 + .expect("to check ip6 /56 limiter"), 63 + ) 60 64 .map_err(asdf); 61 65 let check_48 = self 62 66 .ip6_48 63 - .check_key(a.octets()[..6].try_into().unwrap()) 67 + .check_key( 68 + a.octets()[..6] 69 + .try_into() 70 + .expect("to check ip6 /48 limiter"), 71 + ) 64 72 .map_err(asdf); 65 73 check_ip.and(check_56).and(check_48) 66 74 } ··· 135 143 let remote = req 136 144 .remote_addr() 137 145 .as_socket_addr() 138 - .unwrap_or_else(|| panic!("failed to get request's remote addr")) // TODO 146 + .expect("failed to get request's remote addr") // TODO 139 147 .ip(); 140 148 141 149 log::trace!("remote: {remote}");
+31 -14
src/weekly.rs
··· 97 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 98 let FolderSource(dir) = self; 99 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 - Ok(File::open(path).await?) 100 + log::debug!("opening folder source: {path:?}"); 101 + let file = File::open(path) 102 + .await 103 + .inspect_err(|e| log::error!("failed to open file: {e}"))?; 104 + Ok(file) 101 105 } 102 106 } 103 107 ··· 138 142 let mut week_t0 = total_t0; 139 143 140 144 while let Some(page) = rx.recv().await { 141 - for mut s in page.ops { 142 - let Ok(op) = serde_json::from_str::<Op>(&s) 143 - .inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}")) 144 - else { 145 - continue; 146 - }; 145 + for op in page.ops { 147 146 let op_week = op.created_at.into(); 148 147 if current_week.map(|w| w != op_week).unwrap_or(true) { 149 148 encoder.shutdown().await?; ··· 168 167 week_ops = 0; 169 168 week_t0 = now; 170 169 } 171 - s.push('\n'); // hack 172 - log::trace!("writing: {s}"); 173 - encoder.write_all(s.as_bytes()).await?; 170 + log::trace!("writing: {op:?}"); 171 + encoder 172 + .write_all(serde_json::to_string(&op)?.as_bytes()) 173 + .await?; 174 174 total_ops += 1; 175 175 week_ops += 1; 176 176 } ··· 197 197 dest: mpsc::Sender<ExportPage>, 198 198 ) -> anyhow::Result<()> { 199 199 use futures::TryStreamExt; 200 - let decoder = GzipDecoder::new(BufReader::new(source.reader_for(week).await?)); 200 + let reader = source 201 + .reader_for(week) 202 + .await 203 + .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 204 + let decoder = GzipDecoder::new(BufReader::new(reader)); 201 205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 202 206 203 - while let Some(chunk) = chunks.try_next().await? { 204 - let ops: Vec<String> = chunk.into_iter().collect(); 207 + while let Some(chunk) = chunks 208 + .try_next() 209 + .await 210 + .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 + { 212 + let ops: Vec<Op> = chunk 213 + .into_iter() 214 + .filter_map(|s| { 215 + serde_json::from_str::<Op>(&s) 216 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 + .ok() 218 + }) 219 + .collect(); 205 220 let page = ExportPage { ops }; 206 - dest.send(page).await?; 221 + dest.send(page) 222 + .await 223 + .inspect_err(|e| log::error!("failed to send page: {e}"))?; 207 224 } 208 225 Ok(()) 209 226 }