Server tools to backfill, tail, mirror, and verify PLC logs

Compare changes

Choose any two refs to compare.

+720 -587
+14
Cargo.lock
··· 38 "governor", 39 "http-body-util", 40 "log", 41 "poem", 42 "reqwest", 43 "reqwest-middleware", 44 "reqwest-retry", ··· 1740 version = "1.11.1" 1741 source = "registry+https://github.com/rust-lang/crates.io-index" 1742 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1743 1744 [[package]] 1745 name = "postgres-protocol"
··· 38 "governor", 39 "http-body-util", 40 "log", 41 + "native-tls", 42 "poem", 43 + "postgres-native-tls", 44 "reqwest", 45 "reqwest-middleware", 46 "reqwest-retry", ··· 1742 version = "1.11.1" 1743 source = "registry+https://github.com/rust-lang/crates.io-index" 1744 checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 1745 + 1746 + [[package]] 1747 + name = "postgres-native-tls" 1748 + version = "0.5.1" 1749 + source = "registry+https://github.com/rust-lang/crates.io-index" 1750 + checksum = "a1f39498473c92f7b6820ae970382c1d83178a3454c618161cb772e8598d9f6f" 1751 + dependencies = [ 1752 + "native-tls", 1753 + "tokio", 1754 + "tokio-native-tls", 1755 + "tokio-postgres", 1756 + ] 1757 1758 [[package]] 1759 name = "postgres-protocol"
+2
Cargo.toml
··· 15 governor = "0.10.1" 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 poem = { version = "3.1.12", features = ["acme", "compression"] } 19 reqwest = { version = "0.12.23", features = ["stream", "json"] } 20 reqwest-middleware = "0.4.2" 21 reqwest-retry = "0.7.0"
··· 15 governor = "0.10.1" 16 http-body-util = "0.1.3" 17 log = "0.4.28" 18 + native-tls = "0.2.14" 19 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 + postgres-native-tls = "0.5.1" 21 reqwest = { version = "0.12.23", features = ["stream", "json"] } 22 reqwest-middleware = "0.4.2" 23 reqwest-retry = "0.7.0"
favicon.ico

This is a binary file and will not be displayed.

+1 -1
readme.md
··· 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 --acme-domain "plc.wtf" \ 30 - --acme-cache-dir ./acme-cache \ 31 --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 ``` 33
··· 27 --upstream "https://plc.directory" \ 28 --wrap "http://127.0.0.1:3000" \ 29 --acme-domain "plc.wtf" \ 30 + --acme-cache-path ./acme-cache \ 31 --acme-directory-url "https://acme-staging-v02.api.letsencrypt.org/directory" 32 ``` 33
+12 -5
src/backfill.rs
··· 13 dest: mpsc::Sender<ExportPage>, 14 source_workers: usize, 15 until: Option<Dt>, 16 - ) -> anyhow::Result<()> { 17 // queue up the week bundles that should be available 18 let weeks = Arc::new(Mutex::new( 19 until ··· 39 while let Some(week) = weeks.lock().await.pop() { 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 - week_to_pages(source.clone(), week, dest.clone()).await?; 43 } 44 log::info!("done with the weeks ig"); 45 Ok(()) ··· 50 51 // wait for the big backfill to finish 52 while let Some(res) = workers.join_next().await { 53 - res??; 54 } 55 - log::info!("finished fetching backfill in {:?}", t_step.elapsed()); 56 - Ok(()) 57 }
··· 13 dest: mpsc::Sender<ExportPage>, 14 source_workers: usize, 15 until: Option<Dt>, 16 + ) -> anyhow::Result<&'static str> { 17 // queue up the week bundles that should be available 18 let weeks = Arc::new(Mutex::new( 19 until ··· 39 while let Some(week) = weeks.lock().await.pop() { 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 + week_to_pages(source.clone(), week, dest.clone()) 43 + .await 44 + .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?; 45 } 46 log::info!("done with the weeks ig"); 47 Ok(()) ··· 52 53 // wait for the big backfill to finish 54 while let Some(res) = workers.join_next().await { 55 + res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 56 + .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 57 } 58 + log::info!( 59 + "finished fetching backfill in {:?}. senders remaining: {}", 60 + t_step.elapsed(), 61 + dest.strong_count() 62 + ); 63 + Ok("backfill") 64 }
+40 -226
src/bin/allegedly.rs
··· 1 - use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, ListenConf, PageBoundaryState, backfill, 3 - backfill_to_pg, bin_init, pages_to_pg, pages_to_weeks, poll_upstream, serve, 4 - }; 5 use clap::{CommandFactory, Parser, Subcommand}; 6 - use reqwest::Url; 7 - use std::{net::SocketAddr, path::PathBuf, time::Instant}; 8 - use tokio::sync::{mpsc, oneshot}; 9 10 #[derive(Debug, Parser)] 11 struct Cli { 12 - /// Upstream PLC server 13 - #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 14 - #[clap(default_value = "https://plc.directory")] 15 - upstream: Url, 16 #[command(subcommand)] 17 command: Commands, 18 } ··· 21 enum Commands { 22 /// Use weekly bundled ops to get a complete directory mirror FAST 23 Backfill { 24 - /// Remote URL prefix to fetch bundles from 25 - #[arg(long)] 26 - #[clap(default_value = "https://plc.t3.storage.dev/plc.directory/")] 27 - http: Url, 28 - /// Local folder to fetch bundles from (overrides `http`) 29 - #[arg(long)] 30 - dir: Option<PathBuf>, 31 - /// Parallel bundle fetchers 32 - /// 33 - /// Default: 4 for http fetches, 1 for local folder 34 - #[arg(long)] 35 - source_workers: Option<usize>, 36 - /// Bulk load into did-method-plc-compatible postgres instead of stdout 37 - /// 38 - /// Pass a postgres connection url like "postgresql://localhost:5432" 39 - #[arg(long)] 40 - to_postgres: Option<Url>, 41 - /// Delete all operations from the postgres db before starting 42 - /// 43 - /// only used if `--to-postgres` is present 44 - #[arg(long, action)] 45 - postgres_reset: bool, 46 - /// Stop at the week ending before this date 47 - #[arg(long)] 48 - until: Option<Dt>, 49 - /// After the weekly imports, poll upstream until we're caught up 50 - #[arg(long, action)] 51 - catch_up: bool, 52 }, 53 /// Scrape a PLC server, collecting ops into weekly bundles 54 /// ··· 73 }, 74 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 75 Mirror { 76 - /// the wrapped did-method-plc server 77 - #[arg(long, env = "ALLEGEDLY_WRAP")] 78 - wrap: Url, 79 - /// the wrapped did-method-plc server's database (write access required) 80 - #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 81 - wrap_pg: Url, 82 - /// wrapping server listen address 83 - #[arg(short, long, env = "ALLEGEDLY_BIND")] 84 - #[clap(default_value = "127.0.0.1:8000")] 85 - bind: SocketAddr, 86 - /// obtain a certificate from letsencrypt 87 - /// 88 - /// for now this will force listening on all interfaces at :80 and :443 89 - /// (:80 will serve an "https required" error, *will not* redirect) 90 - #[arg( 91 - long, 92 - conflicts_with("bind"), 93 - requires("acme_cache_path"), 94 - env = "ALLEGEDLY_ACME_DOMAIN" 95 - )] 96 - acme_domain: Vec<String>, 97 - /// which local directory to keep the letsencrypt certs in 98 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 99 - acme_cache_path: Option<PathBuf>, 100 - /// which public acme directory to use 101 - /// 102 - /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 103 - #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 104 - #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 105 - acme_directory_url: Url, 106 }, 107 /// Poll an upstream PLC server and log new ops to stdout 108 Tail { ··· 112 }, 113 } 114 115 - async fn pages_to_stdout( 116 - mut rx: mpsc::Receiver<ExportPage>, 117 - notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 118 - ) -> anyhow::Result<()> { 119 - let mut last_at = None; 120 - while let Some(page) = rx.recv().await { 121 - for op in &page.ops { 122 - println!("{op}"); 123 - } 124 - if notify_last_at.is_some() 125 - && let Some(s) = PageBoundaryState::new(&page) 126 - { 127 - last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 128 - } 129 - } 130 - if let Some(notify) = notify_last_at { 131 - log::trace!("notifying last_at: {last_at:?}"); 132 - if notify.send(last_at).is_err() { 133 - log::error!("receiver for last_at dropped, can't notify"); 134 - }; 135 - } 136 - Ok(()) 137 - } 138 - 139 - /// page forwarder who drops its channels on receipt of a small page 140 - /// 141 - /// PLC will return up to 1000 ops on a page, and returns full pages until it 142 - /// has caught up, so this is a (hacky?) way to stop polling once we're up. 143 - fn full_pages(mut rx: mpsc::Receiver<ExportPage>) -> mpsc::Receiver<ExportPage> { 144 - let (tx, fwd) = mpsc::channel(1); 145 - tokio::task::spawn(async move { 146 - while let Some(page) = rx.recv().await 147 - && page.ops.len() > 900 148 - { 149 - tx.send(page).await.unwrap(); 150 - } 151 - }); 152 - fwd 153 - } 154 - 155 #[tokio::main] 156 - async fn main() { 157 let args = Cli::parse(); 158 let matches = Cli::command().get_matches(); 159 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 160 bin_init(name); 161 162 let t0 = Instant::now(); 163 match args.command { 164 - Commands::Backfill { 165 - http, 166 - dir, 167 - source_workers, 168 - to_postgres, 169 - postgres_reset, 170 - until, 171 - catch_up, 172 - } => { 173 - let (tx, rx) = mpsc::channel(32); // these are big pages 174 - tokio::task::spawn(async move { 175 - if let Some(dir) = dir { 176 - log::info!("Reading weekly bundles from local folder {dir:?}"); 177 - backfill(FolderSource(dir), tx, source_workers.unwrap_or(1), until) 178 - .await 179 - .unwrap(); 180 - } else { 181 - log::info!("Fetching weekly bundles from from {http}"); 182 - backfill(HttpSource(http), tx, source_workers.unwrap_or(4), until) 183 - .await 184 - .unwrap(); 185 - } 186 - }); 187 - 188 - // postgres writer will notify us as soon as the very last op's time is known 189 - // so we can start catching up while pg is restoring indexes and stuff 190 - let (notify_last_at, rx_last) = if catch_up { 191 - let (tx, rx) = oneshot::channel(); 192 - (Some(tx), Some(rx)) 193 - } else { 194 - (None, None) 195 - }; 196 - 197 - let to_postgres_url_bulk = to_postgres.clone(); 198 - let bulk_out_write = tokio::task::spawn(async move { 199 - if let Some(ref url) = to_postgres_url_bulk { 200 - let db = Db::new(url.as_str()).await.unwrap(); 201 - backfill_to_pg(db, postgres_reset, rx, notify_last_at) 202 - .await 203 - .unwrap(); 204 - } else { 205 - pages_to_stdout(rx, notify_last_at).await.unwrap(); 206 - } 207 - }); 208 - 209 - if let Some(rx_last) = rx_last { 210 - let mut upstream = args.upstream; 211 - upstream.set_path("/export"); 212 - // wait until the time for `after` is known 213 - let last_at = rx_last.await.unwrap(); 214 - log::info!("beginning catch-up from {last_at:?} while the writer finalizes stuff"); 215 - let (tx, rx) = mpsc::channel(256); // these are small pages 216 - tokio::task::spawn( 217 - async move { poll_upstream(last_at, upstream, tx).await.unwrap() }, 218 - ); 219 - bulk_out_write.await.unwrap(); 220 - log::info!("writing catch-up pages"); 221 - let full_pages = full_pages(rx); 222 - if let Some(url) = to_postgres { 223 - let db = Db::new(url.as_str()).await.unwrap(); 224 - pages_to_pg(db, full_pages).await.unwrap(); 225 - } else { 226 - pages_to_stdout(full_pages, None).await.unwrap(); 227 - } 228 - } 229 - } 230 Commands::Bundle { 231 dest, 232 after, 233 clobber, 234 } => { 235 - let mut url = args.upstream; 236 url.set_path("/export"); 237 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 238 - tokio::task::spawn(async move { poll_upstream(Some(after), url, tx).await.unwrap() }); 239 log::trace!("ensuring output directory exists"); 240 - std::fs::create_dir_all(&dest).unwrap(); 241 - pages_to_weeks(rx, dest, clobber).await.unwrap(); 242 - } 243 - Commands::Mirror { 244 - wrap, 245 - wrap_pg, 246 - bind, 247 - acme_domain, 248 - acme_cache_path, 249 - acme_directory_url, 250 - } => { 251 - let db = Db::new(wrap_pg.as_str()).await.unwrap(); 252 - let latest = db 253 - .get_latest() 254 .await 255 - .unwrap() 256 - .expect("there to be at least one op in the db. did you backfill?"); 257 - 258 - let (tx, rx) = mpsc::channel(2); 259 - // upstream poller 260 - let mut url = args.upstream.clone(); 261 - tokio::task::spawn(async move { 262 - log::info!("starting poll reader..."); 263 - url.set_path("/export"); 264 - tokio::task::spawn( 265 - async move { poll_upstream(Some(latest), url, tx).await.unwrap() }, 266 - ); 267 - }); 268 - // db writer 269 - let poll_db = db.clone(); 270 - tokio::task::spawn(async move { 271 - log::info!("starting db writer..."); 272 - pages_to_pg(poll_db, rx).await.unwrap(); 273 - }); 274 - 275 - let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 276 - (_, false, Some(cache_path)) => ListenConf::Acme { 277 - domains: acme_domain, 278 - cache_path, 279 - directory_url: acme_directory_url.to_string(), 280 - }, 281 - (bind, true, None) => ListenConf::Bind(bind), 282 - (_, _, _) => unreachable!(), 283 - }; 284 - 285 - serve(&args.upstream, wrap, listen_conf).await.unwrap(); 286 } 287 Commands::Tail { after } => { 288 - let mut url = args.upstream; 289 url.set_path("/export"); 290 let start_at = after.or_else(|| Some(chrono::Utc::now())); 291 let (tx, rx) = mpsc::channel(1); 292 - tokio::task::spawn(async move { poll_upstream(start_at, url, tx).await.unwrap() }); 293 - pages_to_stdout(rx, None).await.unwrap(); 294 } 295 } 296 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 297 }
··· 1 + use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 use clap::{CommandFactory, Parser, Subcommand}; 3 + use std::{path::PathBuf, time::Instant}; 4 + use tokio::fs::create_dir_all; 5 + use tokio::sync::mpsc; 6 + 7 + mod backfill; 8 + mod mirror; 9 10 #[derive(Debug, Parser)] 11 struct Cli { 12 + #[command(flatten)] 13 + globals: GlobalArgs, 14 + 15 #[command(subcommand)] 16 command: Commands, 17 } ··· 20 enum Commands { 21 /// Use weekly bundled ops to get a complete directory mirror FAST 22 Backfill { 23 + #[command(flatten)] 24 + args: backfill::Args, 25 }, 26 /// Scrape a PLC server, collecting ops into weekly bundles 27 /// ··· 46 }, 47 /// Wrap a did-method-plc server, syncing upstream and blocking op submits 48 Mirror { 49 + #[command(flatten)] 50 + args: mirror::Args, 51 }, 52 /// Poll an upstream PLC server and log new ops to stdout 53 Tail { ··· 57 }, 58 } 59 60 #[tokio::main] 61 + async fn main() -> anyhow::Result<()> { 62 let args = Cli::parse(); 63 let matches = Cli::command().get_matches(); 64 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 65 bin_init(name); 66 67 + let globals = args.globals.clone(); 68 + 69 let t0 = Instant::now(); 70 match args.command { 71 + Commands::Backfill { args } => backfill::run(globals, args).await?, 72 Commands::Bundle { 73 dest, 74 after, 75 clobber, 76 } => { 77 + let mut url = globals.upstream; 78 url.set_path("/export"); 79 let (tx, rx) = mpsc::channel(32); // read ahead if gzip stalls for some reason 80 + tokio::task::spawn(async move { 81 + poll_upstream(Some(after), url, tx) 82 + .await 83 + .expect("to poll upstream") 84 + }); 85 log::trace!("ensuring output directory exists"); 86 + create_dir_all(&dest) 87 .await 88 + .expect("to ensure output dir exists"); 89 + pages_to_weeks(rx, dest, clobber) 90 + .await 91 + .expect("to write bundles to output files"); 92 } 93 + Commands::Mirror { args } => mirror::run(globals, args).await?, 94 Commands::Tail { after } => { 95 + let mut url = globals.upstream; 96 url.set_path("/export"); 97 let start_at = after.or_else(|| Some(chrono::Utc::now())); 98 let (tx, rx) = mpsc::channel(1); 99 + tokio::task::spawn(async move { 100 + poll_upstream(start_at, url, tx) 101 + .await 102 + .expect("to poll upstream") 103 + }); 104 + pages_to_stdout(rx, None) 105 + .await 106 + .expect("to write pages to stdout"); 107 } 108 } 109 log::info!("whew, {:?}. goodbye!", t0.elapsed()); 110 + Ok(()) 111 }
+199
src/bin/backfill.rs
···
··· 1 + use allegedly::{ 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 + bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 4 + }; 5 + use clap::Parser; 6 + use reqwest::Url; 7 + use std::path::PathBuf; 8 + use tokio::{ 9 + sync::{mpsc, oneshot}, 10 + task::JoinSet, 11 + }; 12 + 13 + pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 14 + 15 + #[derive(Debug, clap::Args)] 16 + pub struct Args { 17 + /// Remote URL prefix to fetch bundles from 18 + #[arg(long)] 19 + #[clap(default_value = DEFAULT_HTTP)] 20 + http: Url, 21 + /// Local folder to fetch bundles from (overrides `http`) 22 + #[arg(long)] 23 + dir: Option<PathBuf>, 24 + /// Don't do weekly bulk-loading at all. 25 + /// 26 + /// overrides `http` and `dir`, makes catch_up redundant 27 + #[arg(long, action)] 28 + no_bulk: bool, 29 + /// Parallel bundle fetchers 30 + /// 31 + /// Default: 4 for http fetches, 1 for local folder 32 + #[arg(long)] 33 + source_workers: Option<usize>, 34 + /// Bulk load into did-method-plc-compatible postgres instead of stdout 35 + /// 36 + /// Pass a postgres connection url like "postgresql://localhost:5432" 37 + #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 38 + to_postgres: Option<Url>, 39 + /// Cert for postgres (if needed) 40 + #[arg(long)] 41 + postgres_cert: Option<PathBuf>, 42 + /// Delete all operations from the postgres db before starting 43 + /// 44 + /// only used if `--to-postgres` is present 45 + #[arg(long, action)] 46 + postgres_reset: bool, 47 + /// Stop at the week ending before this date 48 + #[arg(long)] 49 + until: Option<Dt>, 50 + /// After the weekly imports, poll upstream until we're caught up 51 + #[arg(long, action)] 52 + catch_up: bool, 53 + } 54 + 55 + pub async fn run( 56 + GlobalArgs { upstream }: GlobalArgs, 57 + Args { 58 + http, 59 + dir, 60 + no_bulk, 61 + source_workers, 62 + to_postgres, 63 + postgres_cert, 64 + postgres_reset, 65 + until, 66 + catch_up, 67 + }: Args, 68 + ) -> anyhow::Result<()> { 69 + let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 70 + 71 + let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 72 + 73 + // a bulk sink can notify us as soon as the very last op's time is known 74 + // so we can start catching up while the sink might restore indexes and such 75 + let (found_last_tx, found_last_out) = if catch_up { 76 + let (tx, rx) = oneshot::channel(); 77 + (Some(tx), Some(rx)) 78 + } else { 79 + (None, None) 80 + }; 81 + 82 + let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 83 + let (full_tx, full_out) = mpsc::channel(1); // don't need to buffer at this filter 84 + 85 + // set up sources 86 + if no_bulk { 87 + // simple mode, just poll upstream from teh beginning 88 + if http != DEFAULT_HTTP.parse()? { 89 + log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 90 + } 91 + if let Some(d) = dir { 92 + log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 93 + } 94 + if let Some(u) = until { 95 + log::warn!( 96 + "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 97 + ); 98 + } 99 + let mut upstream = upstream; 100 + upstream.set_path("/export"); 101 + tasks.spawn(poll_upstream(None, upstream, poll_tx)); 102 + tasks.spawn(full_pages(poll_out, full_tx)); 103 + tasks.spawn(pages_to_stdout(full_out, None)); 104 + } else { 105 + // fun mode 106 + 107 + // set up bulk sources 108 + if let Some(dir) = dir { 109 + if http != DEFAULT_HTTP.parse()? { 110 + anyhow::bail!( 111 + "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 112 + ); 113 + } 114 + tasks.spawn(backfill( 115 + FolderSource(dir), 116 + bulk_tx, 117 + source_workers.unwrap_or(1), 118 + until, 119 + )); 120 + } else { 121 + tasks.spawn(backfill( 122 + HttpSource(http), 123 + bulk_tx, 124 + source_workers.unwrap_or(4), 125 + until, 126 + )); 127 + } 128 + 129 + // and the catch-up source... 130 + if let Some(last) = found_last_out { 131 + tasks.spawn(async move { 132 + let mut upstream = upstream; 133 + upstream.set_path("/export"); 134 + poll_upstream(last.await?, upstream, poll_tx).await 135 + }); 136 + } 137 + 138 + // set up sinks 139 + if let Some(pg_url) = to_postgres { 140 + log::trace!("connecting to postgres..."); 141 + let db = Db::new(pg_url.as_str(), postgres_cert).await?; 142 + log::trace!("connected to postgres"); 143 + 144 + tasks.spawn(backfill_to_pg( 145 + db.clone(), 146 + postgres_reset, 147 + bulk_out, 148 + found_last_tx, 149 + )); 150 + if catch_up { 151 + tasks.spawn(pages_to_pg(db, full_out)); 152 + } 153 + } else { 154 + tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 155 + if catch_up { 156 + tasks.spawn(pages_to_stdout(full_out, None)); 157 + } 158 + } 159 + } 160 + 161 + while let Some(next) = tasks.join_next().await { 162 + match next { 163 + Err(e) if e.is_panic() => { 164 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 165 + return Err(e.into()); 166 + } 167 + Err(e) => { 168 + log::error!("a joinset task failed to join: {e}"); 169 + return Err(e.into()); 170 + } 171 + Ok(Err(e)) => { 172 + log::error!("a joinset task completed with error: {e}"); 173 + return Err(e); 174 + } 175 + Ok(Ok(name)) => { 176 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 177 + } 178 + } 179 + } 180 + 181 + Ok(()) 182 + } 183 + 184 + #[derive(Debug, Parser)] 185 + struct CliArgs { 186 + #[command(flatten)] 187 + globals: GlobalArgs, 188 + #[command(flatten)] 189 + args: Args, 190 + } 191 + 192 + #[allow(dead_code)] 193 + #[tokio::main] 194 + async fn main() -> anyhow::Result<()> { 195 + let args = CliArgs::parse(); 196 + bin_init("backfill"); 197 + run(args.globals, args.args).await?; 198 + Ok(()) 199 + }
+128
src/bin/mirror.rs
···
··· 1 + use allegedly::{Db, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve}; 2 + use clap::Parser; 3 + use reqwest::Url; 4 + use std::{net::SocketAddr, path::PathBuf}; 5 + use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 6 + 7 + #[derive(Debug, clap::Args)] 8 + pub struct Args { 9 + /// the wrapped did-method-plc server 10 + #[arg(long, env = "ALLEGEDLY_WRAP")] 11 + wrap: Url, 12 + /// the wrapped did-method-plc server's database (write access required) 13 + #[arg(long, env = "ALLEGEDLY_WRAP_PG")] 14 + wrap_pg: Url, 15 + /// path to tls cert for the wrapped postgres db, if needed 16 + #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 17 + wrap_pg_cert: Option<PathBuf>, 18 + /// wrapping server listen address 19 + #[arg(short, long, env = "ALLEGEDLY_BIND")] 20 + #[clap(default_value = "127.0.0.1:8000")] 21 + bind: SocketAddr, 22 + /// obtain a certificate from letsencrypt 23 + /// 24 + /// for now this will force listening on all interfaces at :80 and :443 25 + /// (:80 will serve an "https required" error, *will not* redirect) 26 + #[arg( 27 + long, 28 + conflicts_with("bind"), 29 + requires("acme_cache_path"), 30 + env = "ALLEGEDLY_ACME_DOMAIN" 31 + )] 32 + acme_domain: Vec<String>, 33 + /// which local directory to keep the letsencrypt certs in 34 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 35 + acme_cache_path: Option<PathBuf>, 36 + /// which public acme directory to use 37 + /// 38 + /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 39 + #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 40 + #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 41 + acme_directory_url: Url, 42 + } 43 + 44 + pub async fn run( 45 + GlobalArgs { upstream }: GlobalArgs, 46 + Args { 47 + wrap, 48 + wrap_pg, 49 + wrap_pg_cert, 50 + bind, 51 + acme_domain, 52 + acme_cache_path, 53 + acme_directory_url, 54 + }: Args, 55 + ) -> anyhow::Result<()> { 56 + let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 57 + 58 + // TODO: allow starting up with polling backfill from beginning? 59 + log::debug!("getting the latest op from the db..."); 60 + let latest = db 61 + .get_latest() 62 + .await? 63 + .expect("there to be at least one op in the db. did you backfill?"); 64 + 65 + let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 66 + (_, false, Some(cache_path)) => { 67 + log::info!("configuring acme for https at {acme_domain:?}..."); 68 + create_dir_all(&cache_path).await?; 69 + ListenConf::Acme { 70 + domains: acme_domain, 71 + cache_path, 72 + directory_url: acme_directory_url.to_string(), 73 + } 74 + } 75 + (bind, true, None) => ListenConf::Bind(bind), 76 + (_, _, _) => unreachable!(), 77 + }; 78 + 79 + let mut tasks = JoinSet::new(); 80 + 81 + let (send_page, recv_page) = mpsc::channel(8); 82 + 83 + let mut poll_url = upstream.clone(); 84 + poll_url.set_path("/export"); 85 + 86 + tasks.spawn(poll_upstream(Some(latest), poll_url, send_page)); 87 + tasks.spawn(pages_to_pg(db.clone(), recv_page)); 88 + tasks.spawn(serve(upstream, wrap, listen_conf)); 89 + 90 + while let Some(next) = tasks.join_next().await { 91 + match next { 92 + Err(e) if e.is_panic() => { 93 + log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 94 + return Err(e.into()); 95 + } 96 + Err(e) => { 97 + log::error!("a joinset task failed to join: {e}"); 98 + return Err(e.into()); 99 + } 100 + Ok(Err(e)) => { 101 + log::error!("a joinset task completed with error: {e}"); 102 + return Err(e); 103 + } 104 + Ok(Ok(name)) => { 105 + log::trace!("a task completed: {name:?}. {} left", tasks.len()); 106 + } 107 + } 108 + } 109 + 110 + Ok(()) 111 + } 112 + 113 + #[derive(Debug, Parser)] 114 + struct CliArgs { 115 + #[command(flatten)] 116 + globals: GlobalArgs, 117 + #[command(flatten)] 118 + args: Args, 119 + } 120 + 121 + #[allow(dead_code)] 122 + #[tokio::main] 123 + async fn main() -> anyhow::Result<()> { 124 + let args = CliArgs::parse(); 125 + bin_init("mirror"); 126 + run(args.globals, args.args).await?; 127 + Ok(()) 128 + }
+14
src/bin/mod.rs
···
··· 1 + use reqwest::Url; 2 + 3 + #[derive(Debug, Clone, clap::Args)] 4 + pub struct GlobalArgs { 5 + /// Upstream PLC server 6 + #[arg(short, long, global = true, env = "ALLEGEDLY_UPSTREAM")] 7 + #[clap(default_value = "https://plc.directory")] 8 + pub upstream: Url, 9 + } 10 + 11 + #[allow(dead_code)] 12 + fn main() { 13 + panic!("this is not actually a module") 14 + }
+4 -1
src/client.rs
··· 10 ); 11 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 - let inner = Client::builder().user_agent(UA).build().unwrap(); 14 15 let policy = ExponentialBackoff::builder().build_with_max_retries(12); 16
··· 10 ); 11 12 pub static CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| { 13 + let inner = Client::builder() 14 + .user_agent(UA) 15 + .build() 16 + .expect("reqwest client to build"); 17 18 let policy = ExponentialBackoff::builder().build_with_max_retries(12); 19
+79 -10
src/lib.rs
··· 1 - use serde::Deserialize; 2 3 mod backfill; 4 mod client; ··· 7 mod poll; 8 mod ratelimit; 9 mod weekly; 10 11 pub use backfill::backfill; 12 pub use client::{CLIENT, UA}; ··· 23 /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 24 #[derive(Debug)] 25 pub struct ExportPage { 26 - pub ops: Vec<String>, 27 } 28 29 impl ExportPage { ··· 35 /// A fully-deserialized plc operation 36 /// 37 /// including the plc's wrapping with timestmap and nullified state 38 - #[derive(Debug, Deserialize)] 39 #[serde(rename_all = "camelCase")] 40 - pub struct Op<'a> { 41 - pub did: &'a str, 42 - pub cid: &'a str, 43 pub created_at: Dt, 44 pub nullified: bool, 45 - #[serde(borrow)] 46 - pub operation: &'a serde_json::value::RawValue, 47 } 48 49 /// Database primary key for an op ··· 53 pub cid: String, 54 } 55 56 - impl From<&Op<'_>> for OpKey { 57 - fn from(Op { did, cid, .. }: &Op<'_>) -> Self { 58 Self { 59 did: did.to_string(), 60 cid: cid.to_string(), 61 } 62 } 63 } 64 65 pub fn logo(name: &str) -> String {
··· 1 + use serde::{Deserialize, Serialize}; 2 + use tokio::sync::{mpsc, oneshot}; 3 4 mod backfill; 5 mod client; ··· 8 mod poll; 9 mod ratelimit; 10 mod weekly; 11 + 12 + pub mod bin; 13 14 pub use backfill::backfill; 15 pub use client::{CLIENT, UA}; ··· 26 /// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 27 #[derive(Debug)] 28 pub struct ExportPage { 29 + pub ops: Vec<Op>, 30 } 31 32 impl ExportPage { ··· 38 /// A fully-deserialized plc operation 39 /// 40 /// including the plc's wrapping with timestmap and nullified state 41 + #[derive(Debug, Clone, Deserialize, Serialize)] 42 #[serde(rename_all = "camelCase")] 43 + pub struct Op { 44 + pub did: String, 45 + pub cid: String, 46 pub created_at: Dt, 47 pub nullified: bool, 48 + pub operation: Box<serde_json::value::RawValue>, 49 + } 50 + 51 + #[cfg(test)] 52 + impl PartialEq for Op { 53 + fn eq(&self, other: &Self) -> bool { 54 + self.did == other.did 55 + && self.cid == other.cid 56 + && self.created_at == other.created_at 57 + && self.nullified == other.nullified 58 + && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 59 + == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 60 + } 61 } 62 63 /// Database primary key for an op ··· 67 pub cid: String, 68 } 69 70 + impl From<&Op> for OpKey { 71 + fn from(Op { did, cid, .. }: &Op) -> Self { 72 Self { 73 did: did.to_string(), 74 cid: cid.to_string(), 75 } 76 } 77 + } 78 + 79 + /// page forwarder who drops its channels on receipt of a small page 80 + /// 81 + /// PLC will return up to 1000 ops on a page, and returns full pages until it 82 + /// has caught up, so this is a (hacky?) way to stop polling once we're up. 83 + pub async fn full_pages( 84 + mut rx: mpsc::Receiver<ExportPage>, 85 + tx: mpsc::Sender<ExportPage>, 86 + ) -> anyhow::Result<&'static str> { 87 + while let Some(page) = rx.recv().await { 88 + let n = page.ops.len(); 89 + if n < 900 { 90 + let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 91 + let Some(age) = last_age else { 92 + log::info!("full_pages done, empty final page"); 93 + return Ok("full pages (hmm)"); 94 + }; 95 + if age <= chrono::TimeDelta::hours(6) { 96 + log::info!("full_pages done, final page of {n} ops"); 97 + } else { 98 + log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 99 + } 100 + return Ok("full pages (cool)"); 101 + } 102 + log::trace!("full_pages: continuing with page of {n} ops"); 103 + tx.send(page).await?; 104 + } 105 + Err(anyhow::anyhow!( 106 + "full_pages ran out of source material, sender closed" 107 + )) 108 + } 109 + 110 + pub async fn pages_to_stdout( 111 + mut rx: mpsc::Receiver<ExportPage>, 112 + notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 113 + ) -> anyhow::Result<&'static str> { 114 + let mut last_at = None; 115 + while let Some(page) = rx.recv().await { 116 + for op in &page.ops { 117 + println!("{}", serde_json::to_string(op)?); 118 + } 119 + if notify_last_at.is_some() 120 + && let Some(s) = PageBoundaryState::new(&page) 121 + { 122 + last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 123 + } 124 + } 125 + if let Some(notify) = notify_last_at { 126 + log::trace!("notifying last_at: {last_at:?}"); 127 + if notify.send(last_at).is_err() { 128 + log::error!("receiver for last_at dropped, can't notify"); 129 + }; 130 + } 131 + Ok("pages_to_stdout") 132 } 133 134 pub fn logo(name: &str) -> String {
+46 -24
src/mirror.rs
··· 24 format!( 25 r#"{} 26 27 - This is a PLC[1] mirror running Allegedly[2] in mirror mode. Allegedly synchronizes and proxies to a downstream PLC reference server instance[3] (why?[4]). 28 29 30 Configured upstream: ··· 34 35 Available APIs: 36 37 - - All PLC GET requests [5]. 38 - - Rejects POSTs. This is a mirror. 39 40 - try `GET /{{did}}` to resolve an identity 41 42 43 [1] https://web.plc.directory 44 - [2] https://tangled.org/@microcosm.blue/Allegedly 45 - [3] https://github.com/did-method-plc/did-method-plc 46 - [4] https://updates.microcosm.blue/3lz7nwvh4zc2u 47 - [5] https://web.plc.directory/api/redoc 48 - 49 "#, 50 logo("mirror") 51 ) 52 } 53 54 fn failed_to_reach_wrapped() -> String { ··· 186 Bind(SocketAddr), 187 } 188 189 - pub async fn serve(upstream: &Url, plc: Url, listen: ListenConf) -> std::io::Result<()> { 190 // not using crate CLIENT: don't want the retries etc 191 let client = Client::builder() 192 .user_agent(UA) 193 .timeout(Duration::from_secs(10)) // fallback 194 .build() 195 - .unwrap(); 196 197 let state = State { 198 client, ··· 202 203 let app = Route::new() 204 .at("/", get(hello)) 205 .at("/_health", get(health)) 206 .at("/:any", get(proxy).post(nope)) 207 .with(AddData::new(state)) 208 .with(Cors::new().allow_credentials(false)) 209 .with(Compression::new()) 210 .with(GovernorMiddleware::new(Quota::per_minute( 211 - 3000.try_into().unwrap(), 212 ))) 213 .with(CatchPanic::new()) 214 .with(Tracing); ··· 231 } 232 let auto_cert = auto_cert.build().expect("acme config to build"); 233 234 - run_insecure_notice(); 235 - run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await 236 } 237 - ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await, 238 } 239 } 240 241 async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> ··· 250 } 251 252 /// kick off a tiny little server on a tokio task to tell people to use 443 253 - fn run_insecure_notice() { 254 #[handler] 255 fn oop_plz_be_secure() -> (StatusCode, String) { 256 ( ··· 265 ) 266 } 267 268 - let app = Route::new().at("/", get(oop_plz_be_secure)).with(Tracing); 269 - let listener = TcpListener::bind("0.0.0.0:80"); 270 - tokio::task::spawn(async move { 271 - Server::new(listener) 272 - .name("allegedly (mirror:80 helper)") 273 - .run(app) 274 - .await 275 - }); 276 }
··· 24 format!( 25 r#"{} 26 27 + This is a PLC[1] mirror running Allegedly in mirror mode. Mirror mode wraps and 28 + synchronizes a local PLC reference server instance[2] (why?[3]). 29 30 31 Configured upstream: ··· 35 36 Available APIs: 37 38 + - GET /_health Health and version info 39 + 40 + - GET /* Proxies to wrapped server; see PLC API docs: 41 + https://web.plc.directory/api/redoc 42 + 43 + - POST /* Always rejected. This is a mirror. 44 + 45 + 46 + tip: try `GET /{{did}}` to resolve an identity 47 + 48 + 49 + Allegedly is a suit of open-source CLI tools for working with PLC logs: 50 51 + https://tangled.org/@microcosm.blue/Allegedly 52 53 54 [1] https://web.plc.directory 55 + [2] https://github.com/did-method-plc/did-method-plc 56 + [3] https://updates.microcosm.blue/3lz7nwvh4zc2u 57 "#, 58 logo("mirror") 59 ) 60 + } 61 + 62 + #[handler] 63 + fn favicon() -> impl IntoResponse { 64 + include_bytes!("../favicon.ico").with_content_type("image/x-icon") 65 } 66 67 fn failed_to_reach_wrapped() -> String { ··· 199 Bind(SocketAddr), 200 } 201 202 + pub async fn serve(upstream: Url, plc: Url, listen: ListenConf) -> anyhow::Result<&'static str> { 203 + log::info!("starting server..."); 204 + 205 // not using crate CLIENT: don't want the retries etc 206 let client = Client::builder() 207 .user_agent(UA) 208 .timeout(Duration::from_secs(10)) // fallback 209 .build() 210 + .expect("reqwest client to build"); 211 212 let state = State { 213 client, ··· 217 218 let app = Route::new() 219 .at("/", get(hello)) 220 + .at("/favicon.ico", get(favicon)) 221 .at("/_health", get(health)) 222 .at("/:any", get(proxy).post(nope)) 223 .with(AddData::new(state)) 224 .with(Cors::new().allow_credentials(false)) 225 .with(Compression::new()) 226 .with(GovernorMiddleware::new(Quota::per_minute( 227 + 3000.try_into().expect("ratelimit middleware to build"), 228 ))) 229 .with(CatchPanic::new()) 230 .with(Tracing); ··· 247 } 248 let auto_cert = auto_cert.build().expect("acme config to build"); 249 250 + let notice_task = tokio::task::spawn(run_insecure_notice()); 251 + let app_res = run(app, TcpListener::bind("0.0.0.0:443").acme(auto_cert)).await; 252 + log::warn!("server task ended, aborting insecure server task..."); 253 + notice_task.abort(); 254 + app_res?; 255 + notice_task.await??; 256 } 257 + ListenConf::Bind(addr) => run(app, TcpListener::bind(addr)).await?, 258 } 259 + 260 + Ok("server (uh oh?)") 261 } 262 263 async fn run<A, L>(app: A, listener: L) -> std::io::Result<()> ··· 272 } 273 274 /// kick off a tiny little server on a tokio task to tell people to use 443 275 + async fn run_insecure_notice() -> Result<(), std::io::Error> { 276 #[handler] 277 fn oop_plz_be_secure() -> (StatusCode, String) { 278 ( ··· 287 ) 288 } 289 290 + let app = Route::new() 291 + .at("/", get(oop_plz_be_secure)) 292 + .at("/favicon.ico", get(favicon)) 293 + .with(Tracing); 294 + Server::new(TcpListener::bind("0.0.0.0:80")) 295 + .name("allegedly (mirror:80 helper)") 296 + .run(app) 297 + .await 298 }
+69 -45
src/plc_pg.rs
··· 1 - use crate::{Dt, ExportPage, Op, PageBoundaryState}; 2 use std::pin::pin; 3 use std::time::Instant; 4 - use tokio::sync::{mpsc, oneshot}; 5 use tokio_postgres::{ 6 - Client, Error as PgError, NoTls, 7 binary_copy::BinaryCopyInWriter, 8 connect, 9 types::{Json, Type}, 10 }; 11 12 /// a little tokio-postgres helper 13 /// 14 /// it's clone for easiness. it doesn't share any resources underneath after 15 - /// cloning at all so it's not meant for 16 - #[derive(Debug, Clone)] 17 pub struct Db { 18 pg_uri: String, 19 } 20 21 impl Db { 22 - pub async fn new(pg_uri: &str) -> Result<Self, anyhow::Error> { 23 // we're going to interact with did-method-plc's database, so make sure 24 // it's what we expect: check for db migrations. 25 log::trace!("checking migrations..."); 26 - let (client, connection) = connect(pg_uri, NoTls).await?; 27 - let connection_task = tokio::task::spawn(async move { 28 - connection 29 - .await 30 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 31 - .unwrap(); 32 - }); 33 let migrations: Vec<String> = client 34 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 35 .await? ··· 47 ); 48 drop(client); 49 // make sure the connection worker thing doesn't linger 50 - connection_task.await?; 51 log::info!("db connection succeeded and plc migrations appear as expected"); 52 53 Ok(Self { 54 pg_uri: pg_uri.to_string(), 55 }) 56 } 57 58 - pub async fn connect(&self) -> Result<Client, PgError> { 59 log::trace!("connecting postgres..."); 60 - let (client, connection) = connect(&self.pg_uri, NoTls).await?; 61 - 62 - // send the connection away to do the actual communication work 63 - // apparently the connection will complete when the client drops 64 - tokio::task::spawn(async move { 65 - connection 66 - .await 67 - .inspect_err(|e| log::error!("connection ended with error: {e}")) 68 - .unwrap(); 69 - }); 70 - 71 - Ok(client) 72 } 73 74 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> { 75 - let client = self.connect().await?; 76 let dt: Option<Dt> = client 77 .query_opt( 78 r#"SELECT "createdAt" ··· 83 ) 84 .await? 85 .map(|row| row.get(0)); 86 Ok(dt) 87 } 88 } 89 90 - pub async fn pages_to_pg(db: Db, mut pages: mpsc::Receiver<ExportPage>) -> Result<(), PgError> { 91 - let mut client = db.connect().await?; 92 93 let ops_stmt = client 94 .prepare( ··· 108 while let Some(page) = pages.recv().await { 109 log::trace!("writing page with {} ops", page.ops.len()); 110 let tx = client.transaction().await?; 111 - for s in page.ops { 112 - let Ok(op) = serde_json::from_str::<Op>(&s) else { 113 - log::warn!("ignoring unparseable op {s:?}"); 114 - continue; 115 - }; 116 ops_inserted += tx 117 .execute( 118 &ops_stmt, ··· 129 } 130 tx.commit().await?; 131 } 132 133 log::info!( 134 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 135 t0.elapsed() 136 ); 137 - Ok(()) 138 } 139 140 /// Dump rows into an empty operations table quickly ··· 155 reset: bool, 156 mut pages: mpsc::Receiver<ExportPage>, 157 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 158 - ) -> Result<(), PgError> { 159 - let mut client = db.connect().await?; 160 161 let t0 = Instant::now(); 162 let tx = client.transaction().await?; ··· 212 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 213 let mut last_at = None; 214 while let Some(page) = pages.recv().await { 215 - for s in &page.ops { 216 - let Ok(op) = serde_json::from_str::<Op>(s) else { 217 - log::warn!("ignoring unparseable op: {s:?}"); 218 - continue; 219 - }; 220 writer 221 .as_mut() 222 .write(&[ 223 &op.did, 224 - &Json(op.operation), 225 &op.cid, 226 &op.nullified, 227 &op.created_at, ··· 234 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 235 } 236 } 237 238 if let Some(notify) = notify_last_at { 239 log::trace!("notifying last_at: {last_at:?}"); ··· 274 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 275 276 tx.commit().await?; 277 log::info!("total backfill time: {:?}", t0.elapsed()); 278 279 - Ok(()) 280 }
··· 1 + use crate::{Dt, ExportPage, PageBoundaryState}; 2 + use native_tls::{Certificate, TlsConnector}; 3 + use postgres_native_tls::MakeTlsConnector; 4 + use std::path::PathBuf; 5 use std::pin::pin; 6 use std::time::Instant; 7 + use tokio::{ 8 + sync::{mpsc, oneshot}, 9 + task::{JoinHandle, spawn}, 10 + }; 11 use tokio_postgres::{ 12 + Client, Error as PgError, NoTls, Socket, 13 binary_copy::BinaryCopyInWriter, 14 connect, 15 + tls::MakeTlsConnect, 16 types::{Json, Type}, 17 }; 18 19 + fn get_tls(cert: PathBuf) -> anyhow::Result<MakeTlsConnector> { 20 + let cert = std::fs::read(cert)?; 21 + let cert = Certificate::from_pem(&cert)?; 22 + let connector = TlsConnector::builder().add_root_certificate(cert).build()?; 23 + Ok(MakeTlsConnector::new(connector)) 24 + } 25 + 26 + async fn get_client_and_task<T>( 27 + uri: &str, 28 + connector: T, 29 + ) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> 30 + where 31 + T: MakeTlsConnect<Socket>, 32 + <T as MakeTlsConnect<Socket>>::Stream: Send + 'static, 33 + { 34 + let (client, connection) = connect(uri, connector).await?; 35 + Ok((client, spawn(connection))) 36 + } 37 + 38 /// a little tokio-postgres helper 39 /// 40 /// it's clone for easiness. it doesn't share any resources underneath after 41 + /// cloning *at all* so it's not meant for eg. handling public web requests 42 + #[derive(Clone)] 43 pub struct Db { 44 pg_uri: String, 45 + cert: Option<MakeTlsConnector>, 46 } 47 48 impl Db { 49 + pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> { 50 // we're going to interact with did-method-plc's database, so make sure 51 // it's what we expect: check for db migrations. 52 log::trace!("checking migrations..."); 53 + 54 + let connector = cert.map(get_tls).transpose()?; 55 + 56 + let (client, conn_task) = if let Some(ref connector) = connector { 57 + get_client_and_task(pg_uri, connector.clone()).await? 58 + } else { 59 + get_client_and_task(pg_uri, NoTls).await? 60 + }; 61 + 62 let migrations: Vec<String> = client 63 .query("SELECT name FROM kysely_migration ORDER BY name", &[]) 64 .await? ··· 76 ); 77 drop(client); 78 // make sure the connection worker thing doesn't linger 79 + conn_task.await??; 80 log::info!("db connection succeeded and plc migrations appear as expected"); 81 82 Ok(Self { 83 pg_uri: pg_uri.to_string(), 84 + cert: connector, 85 }) 86 } 87 88 + pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> { 89 log::trace!("connecting postgres..."); 90 + if let Some(ref connector) = self.cert { 91 + get_client_and_task(&self.pg_uri, connector.clone()).await 92 + } else { 93 + get_client_and_task(&self.pg_uri, NoTls).await 94 + } 95 } 96 97 pub async fn get_latest(&self) -> Result<Option<Dt>, PgError> { 98 + let (client, task) = self.connect().await?; 99 let dt: Option<Dt> = client 100 .query_opt( 101 r#"SELECT "createdAt" ··· 106 ) 107 .await? 108 .map(|row| row.get(0)); 109 + drop(task); 110 Ok(dt) 111 } 112 } 113 114 + pub async fn pages_to_pg( 115 + db: Db, 116 + mut pages: mpsc::Receiver<ExportPage>, 117 + ) -> anyhow::Result<&'static str> { 118 + log::info!("starting pages_to_pg writer..."); 119 + 120 + let (mut client, task) = db.connect().await?; 121 122 let ops_stmt = client 123 .prepare( ··· 137 while let Some(page) = pages.recv().await { 138 log::trace!("writing page with {} ops", page.ops.len()); 139 let tx = client.transaction().await?; 140 + for op in page.ops { 141 ops_inserted += tx 142 .execute( 143 &ops_stmt, ··· 154 } 155 tx.commit().await?; 156 } 157 + drop(task); 158 159 log::info!( 160 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 161 t0.elapsed() 162 ); 163 + Ok("pages_to_pg") 164 } 165 166 /// Dump rows into an empty operations table quickly ··· 181 reset: bool, 182 mut pages: mpsc::Receiver<ExportPage>, 183 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 184 + ) -> anyhow::Result<&'static str> { 185 + let (mut client, task) = db.connect().await?; 186 187 let t0 = Instant::now(); 188 let tx = client.transaction().await?; ··· 238 let mut writer = pin!(BinaryCopyInWriter::new(sync, types)); 239 let mut last_at = None; 240 while let Some(page) = pages.recv().await { 241 + for op in &page.ops { 242 writer 243 .as_mut() 244 .write(&[ 245 &op.did, 246 + &Json(op.operation.clone()), 247 &op.cid, 248 &op.nullified, 249 &op.created_at, ··· 256 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 257 } 258 } 259 + log::debug!("finished receiving bulk pages"); 260 261 if let Some(notify) = notify_last_at { 262 log::trace!("notifying last_at: {last_at:?}"); ··· 297 log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 298 299 tx.commit().await?; 300 + drop(task); 301 log::info!("total backfill time: {:?}", t0.elapsed()); 302 303 + Ok("backfill_to_pg") 304 }
+66 -254
src/poll.rs
··· 26 pk: (String, String), // did, cid 27 } 28 29 - impl From<Op<'_>> for LastOp { 30 fn from(op: Op) -> Self { 31 Self { 32 created_at: op.created_at, 33 - pk: (op.did.to_string(), op.cid.to_string()), 34 } 35 } 36 } 37 38 impl From<Dt> for LastOp { 39 fn from(dt: Dt) -> Self { 40 Self { ··· 51 keys_at: Vec<OpKey>, // expected to ~always be length one 52 } 53 54 - // ok so this is silly. 55 - // 56 - // i think i had some idea that deferring parsing to later steps would make it 57 - // easier to do things like sometimes not parsing at all (where the output is 58 - // also json lines), and maybe avoid some memory shuffling. 59 - // but since the input already has to be split into lines, keeping them as line 60 - // strings is probably the worst option: space-inefficient, allows garbage, and 61 - // leads to, well, this impl. 62 - // 63 - // it almost could have been slick if the *original* was just reused, and the 64 - // parsed ops were just kind of on the side referencing into it, but i'm lazy 65 - // and didn't get it there. 66 - // 67 - // should unrefactor to make Op own its data again, parse (and deal with errors) 68 - // upfront, and probably greatly simplify everything downstream. simple. 69 impl PageBoundaryState { 70 pub fn new(page: &ExportPage) -> Option<Self> { 71 - let mut skips = 0; 72 - 73 // grab the very last op 74 - let (last_at, last_key) = loop { 75 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 76 - // there are no ops left? oop. bail. 77 - // last_at and existing keys remain in tact if there was no later op 78 - return None; 79 - }; 80 - if s.is_empty() { 81 - // annoying: ignore any trailing blank lines 82 - skips += 1; 83 - continue; 84 - } 85 - let Ok(op) = serde_json::from_str::<Op>(&s) 86 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 87 - else { 88 - // doubly annoying: skip over trailing garbage?? 89 - skips += 1; 90 - continue; 91 - }; 92 - break (op.created_at, Into::<OpKey>::into(&op)); 93 - }; 94 95 // set initial state 96 let mut me = Self { ··· 99 }; 100 101 // and make sure all keys at this time are captured from the back 102 - me.capture_nth_last_at(page, last_at, skips); 103 104 Some(me) 105 } ··· 108 let to_remove: Vec<usize> = page 109 .ops 110 .iter() 111 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 112 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 113 .enumerate() 114 - .take_while(|(_, opr)| opr.as_ref().map(|op| op.created_at == self.last_at).unwrap_or(false)) 115 - .filter_map(|(i, opr)| { 116 - if self.keys_at.contains(&(&opr.expect("any Errs were filtered by take_while")).into()) { 117 - Some(i) 118 - } else { None } 119 - }) 120 .collect(); 121 122 - // actually remove them. last to first to indices don't shift 123 for dup_idx in to_remove.into_iter().rev() { 124 page.ops.remove(dup_idx); 125 } 126 127 // grab the very last op 128 - let mut skips = 0; 129 - let (last_at, last_key) = loop { 130 - let Some(s) = page.ops.iter().rev().nth(skips).cloned() else { 131 - // there are no ops left? oop. bail. 132 - // last_at and existing keys remain in tact if there was no later op 133 - return; 134 - }; 135 - if s.is_empty() { 136 - // annoying: trim off any trailing blank lines 137 - skips += 1; 138 - continue; 139 - } 140 - let Ok(op) = serde_json::from_str::<Op>(&s) 141 - .inspect_err(|e| log::warn!("deduplication failed last op parsing ({s:?}: {e}), ignoring for downstream to deal with.")) 142 - else { 143 - // doubly annoying: skip over trailing garbage?? 144 - skips += 1; 145 - continue; 146 - }; 147 - break (op.created_at, Into::<OpKey>::into(&op)); 148 }; 149 150 // reset state (as long as time actually moved forward on this page) ··· 157 self.keys_at.push(last_key); 158 } 159 // and make sure all keys at this time are captured from the back 160 - self.capture_nth_last_at(page, last_at, skips); 161 } 162 163 /// walk backwards from 2nd last and collect keys until created_at changes ··· 166 .iter() 167 .rev() 168 .skip(skips) 169 - .skip(1) // we alredy added the very last one 170 - .map(|s| serde_json::from_str::<Op>(s).inspect_err(|e| 171 - log::warn!("deduplication failed op parsing ({s:?}: {e}), bailing for downstream to deal with."))) 172 - .take_while(|opr| opr.as_ref().map(|op| op.created_at == last_at).unwrap_or(false)) 173 - .for_each(|opr| { 174 - let op = &opr.expect("any Errs were filtered by take_while"); 175 self.keys_at.push(op.into()); 176 }); 177 } ··· 180 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 181 log::trace!("Getting page: {url}"); 182 183 - let ops: Vec<String> = CLIENT 184 .get(url) 185 .send() 186 .await? ··· 190 .trim() 191 .split('\n') 192 .filter_map(|s| { 193 - let s = s.trim(); 194 - if s.is_empty() { None } else { Some(s) } 195 }) 196 - .map(Into::into) 197 .collect(); 198 199 - let last_op = ops 200 - .last() 201 - .filter(|s| !s.is_empty()) 202 - .map(|s| serde_json::from_str::<Op>(s)) 203 - .transpose()? 204 - .map(Into::into) 205 - .inspect(|at| log::trace!("new last op: {at:?}")); 206 207 Ok((ExportPage { ops }, last_op)) 208 } ··· 211 after: Option<Dt>, 212 base: Url, 213 dest: mpsc::Sender<ExportPage>, 214 - ) -> anyhow::Result<()> { 215 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 216 let mut prev_last: Option<LastOp> = after.map(Into::into); 217 let mut boundary_state: Option<PageBoundaryState> = None; ··· 252 const FIVES_TS: i64 = 1431648000; 253 const NEXT_TS: i64 = 1431648001; 254 255 - fn valid_op() -> serde_json::Value { 256 - serde_json::json!({ 257 "did": "did", 258 "cid": "cid", 259 "createdAt": "2015-05-15T00:00:00Z", 260 "nullified": false, 261 "operation": {}, 262 - }) 263 } 264 265 - fn next_op() -> serde_json::Value { 266 - serde_json::json!({ 267 "did": "didnext", 268 "cid": "cidnext", 269 "createdAt": "2015-05-15T00:00:01Z", 270 "nullified": false, 271 "operation": {}, 272 - }) 273 } 274 275 fn base_state() -> PageBoundaryState { 276 let page = ExportPage { 277 - ops: vec![valid_op().to_string()], 278 }; 279 - PageBoundaryState::new(&page).unwrap() 280 } 281 282 #[test] ··· 287 } 288 289 #[test] 290 - fn test_boundary_new_empty_op() { 291 - let page = ExportPage { 292 - ops: vec!["".to_string()], 293 - }; 294 - let state = PageBoundaryState::new(&page); 295 - assert!(state.is_none()); 296 - } 297 - 298 - #[test] 299 - fn test_boundary_new_ignores_bad_op() { 300 - let page = ExportPage { 301 - ops: vec!["bad".to_string()], 302 - }; 303 - let state = PageBoundaryState::new(&page); 304 - assert!(state.is_none()); 305 - } 306 - 307 - #[test] 308 - fn test_boundary_new_multiple_bad_end() { 309 - let page = ExportPage { 310 - ops: vec![ 311 - "bad".to_string(), 312 - "".to_string(), 313 - "foo".to_string(), 314 - "".to_string(), 315 - ], 316 - }; 317 - let state = PageBoundaryState::new(&page); 318 - assert!(state.is_none()); 319 - } 320 - 321 - #[test] 322 fn test_boundary_new_one_op() { 323 let page = ExportPage { 324 - ops: vec![valid_op().to_string()], 325 }; 326 let state = PageBoundaryState::new(&page).unwrap(); 327 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); ··· 335 } 336 337 #[test] 338 - fn test_boundary_new_one_op_with_stuff() { 339 - let expect_same_state = |m, ops| { 340 - let this_state = PageBoundaryState::new(&ExportPage { ops }).unwrap(); 341 - assert_eq!(this_state, base_state(), "{}", m); 342 - }; 343 - 344 - expect_same_state("empty before", vec!["".to_string(), valid_op().to_string()]); 345 - 346 - expect_same_state("empty after", vec![valid_op().to_string(), "".to_string()]); 347 - 348 - expect_same_state( 349 - "bad before, empty after", 350 - vec!["bad".to_string(), valid_op().to_string(), "".to_string()], 351 - ); 352 - 353 - expect_same_state( 354 - "bad and empty before and after", 355 - vec![ 356 - "".to_string(), 357 - "bad".to_string(), 358 - valid_op().to_string(), 359 - "".to_string(), 360 - "bad".to_string(), 361 - ], 362 - ); 363 - } 364 - 365 - #[test] 366 fn test_add_new_empty() { 367 let mut state = base_state(); 368 state.apply_to_next(&mut ExportPage { ops: vec![] }); ··· 370 } 371 372 #[test] 373 - fn test_add_new_empty_op() { 374 - let mut state = base_state(); 375 - state.apply_to_next(&mut ExportPage { 376 - ops: vec!["".to_string()], 377 - }); 378 - assert_eq!(state, base_state()); 379 - } 380 - 381 - #[test] 382 - fn test_add_new_ignores_bad_op() { 383 - let mut state = base_state(); 384 - state.apply_to_next(&mut ExportPage { 385 - ops: vec!["bad".to_string()], 386 - }); 387 - assert_eq!(state, base_state()); 388 - } 389 - 390 - #[test] 391 - fn test_add_new_multiple_bad() { 392 - let mut page = ExportPage { 393 - ops: vec![ 394 - "bad".to_string(), 395 - "".to_string(), 396 - "foo".to_string(), 397 - "".to_string(), 398 - ], 399 - }; 400 - 401 - let mut state = base_state(); 402 - state.apply_to_next(&mut page); 403 - assert_eq!(state, base_state()); 404 - } 405 - 406 - #[test] 407 fn test_add_new_same_op() { 408 let mut page = ExportPage { 409 - ops: vec![valid_op().to_string()], 410 }; 411 let mut state = base_state(); 412 state.apply_to_next(&mut page); ··· 417 fn test_add_new_same_time() { 418 // make an op with a different OpKey 419 let mut op = valid_op(); 420 - op.as_object_mut() 421 - .unwrap() 422 - .insert("cid".to_string(), "cid2".into()); 423 - let mut page = ExportPage { 424 - ops: vec![op.to_string()], 425 - }; 426 427 let mut state = base_state(); 428 state.apply_to_next(&mut page); ··· 446 fn test_add_new_same_time_dup_before() { 447 // make an op with a different OpKey 448 let mut op = valid_op(); 449 - op.as_object_mut() 450 - .unwrap() 451 - .insert("cid".to_string(), "cid2".into()); 452 let mut page = ExportPage { 453 - ops: vec![valid_op().to_string(), op.to_string()], 454 }; 455 456 let mut state = base_state(); ··· 475 fn test_add_new_same_time_dup_after() { 476 // make an op with a different OpKey 477 let mut op = valid_op(); 478 - op.as_object_mut() 479 - .unwrap() 480 - .insert("cid".to_string(), "cid2".into()); 481 - let mut page = ExportPage { 482 - ops: vec![op.to_string(), valid_op().to_string()], 483 - }; 484 - 485 - let mut state = base_state(); 486 - state.apply_to_next(&mut page); 487 - assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 488 - assert_eq!( 489 - state.keys_at, 490 - vec![ 491 - OpKey { 492 - cid: "cid".to_string(), 493 - did: "did".to_string(), 494 - }, 495 - OpKey { 496 - cid: "cid2".to_string(), 497 - did: "did".to_string(), 498 - }, 499 - ] 500 - ); 501 - } 502 - 503 - #[test] 504 - fn test_add_new_same_time_blank_after() { 505 - // make an op with a different OpKey 506 - let mut op = valid_op(); 507 - op.as_object_mut() 508 - .unwrap() 509 - .insert("cid".to_string(), "cid2".into()); 510 let mut page = ExportPage { 511 - ops: vec![op.to_string(), "".to_string()], 512 }; 513 514 let mut state = base_state(); ··· 532 #[test] 533 fn test_add_new_next_time() { 534 let mut page = ExportPage { 535 - ops: vec![next_op().to_string()], 536 }; 537 let mut state = base_state(); 538 state.apply_to_next(&mut page); ··· 549 #[test] 550 fn test_add_new_next_time_with_dup() { 551 let mut page = ExportPage { 552 - ops: vec![valid_op().to_string(), next_op().to_string()], 553 }; 554 let mut state = base_state(); 555 state.apply_to_next(&mut page); ··· 562 },] 563 ); 564 assert_eq!(page.ops.len(), 1); 565 - assert_eq!(page.ops[0], next_op().to_string()); 566 } 567 568 #[test] 569 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 570 // make an op with a different OpKey 571 let mut op = valid_op(); 572 - op.as_object_mut() 573 - .unwrap() 574 - .insert("cid".to_string(), "cid2".into()); 575 576 let mut page = ExportPage { 577 ops: vec![ 578 - valid_op().to_string(), // should get dropped 579 - op.to_string(), // should be kept 580 - next_op().to_string(), 581 ], 582 }; 583 let mut state = base_state(); ··· 591 },] 592 ); 593 assert_eq!(page.ops.len(), 2); 594 - assert_eq!(page.ops[0], op.to_string()); 595 - assert_eq!(page.ops[1], next_op().to_string()); 596 } 597 598 #[test] 599 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 600 // make an op with a different OpKey 601 let mut op = valid_op(); 602 - op.as_object_mut() 603 - .unwrap() 604 - .insert("cid".to_string(), "cid2".into()); 605 606 let mut page = ExportPage { 607 ops: vec![ 608 - op.to_string(), // should be kept 609 - valid_op().to_string(), // should get dropped 610 - next_op().to_string(), 611 ], 612 }; 613 let mut state = base_state(); ··· 621 },] 622 ); 623 assert_eq!(page.ops.len(), 2); 624 - assert_eq!(page.ops[0], op.to_string()); 625 - assert_eq!(page.ops[1], next_op().to_string()); 626 } 627 }
··· 26 pk: (String, String), // did, cid 27 } 28 29 + impl From<Op> for LastOp { 30 fn from(op: Op) -> Self { 31 Self { 32 created_at: op.created_at, 33 + pk: (op.did, op.cid), 34 } 35 } 36 } 37 38 + impl From<&Op> for LastOp { 39 + fn from(op: &Op) -> Self { 40 + Self { 41 + created_at: op.created_at, 42 + pk: (op.did.clone(), op.cid.clone()), 43 + } 44 + } 45 + } 46 + 47 + // bit of a hack 48 impl From<Dt> for LastOp { 49 fn from(dt: Dt) -> Self { 50 Self { ··· 61 keys_at: Vec<OpKey>, // expected to ~always be length one 62 } 63 64 + /// track keys at final createdAt to deduplicate the start of the next page 65 impl PageBoundaryState { 66 pub fn new(page: &ExportPage) -> Option<Self> { 67 // grab the very last op 68 + let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 69 70 // set initial state 71 let mut me = Self { ··· 74 }; 75 76 // and make sure all keys at this time are captured from the back 77 + me.capture_nth_last_at(page, last_at, 1); 78 79 Some(me) 80 } ··· 83 let to_remove: Vec<usize> = page 84 .ops 85 .iter() 86 .enumerate() 87 + .take_while(|(_, op)| op.created_at == self.last_at) 88 + .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 89 + .map(|(i, _)| i) 90 .collect(); 91 92 + // actually remove them. last to first so indices don't shift 93 for dup_idx in to_remove.into_iter().rev() { 94 page.ops.remove(dup_idx); 95 } 96 97 // grab the very last op 98 + let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 99 + // there are no ops left? oop. bail. 100 + // last_at and existing keys remain in tact 101 + return; 102 }; 103 104 // reset state (as long as time actually moved forward on this page) ··· 111 self.keys_at.push(last_key); 112 } 113 // and make sure all keys at this time are captured from the back 114 + self.capture_nth_last_at(page, last_at, 1); 115 } 116 117 /// walk backwards from 2nd last and collect keys until created_at changes ··· 120 .iter() 121 .rev() 122 .skip(skips) 123 + .take_while(|op| op.created_at == last_at) 124 + .for_each(|op| { 125 self.keys_at.push(op.into()); 126 }); 127 } ··· 130 pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 131 log::trace!("Getting page: {url}"); 132 133 + let ops: Vec<Op> = CLIENT 134 .get(url) 135 .send() 136 .await? ··· 140 .trim() 141 .split('\n') 142 .filter_map(|s| { 143 + serde_json::from_str::<Op>(s) 144 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 145 + .ok() 146 }) 147 .collect(); 148 149 + let last_op = ops.last().map(Into::into); 150 151 Ok((ExportPage { ops }, last_op)) 152 } ··· 155 after: Option<Dt>, 156 base: Url, 157 dest: mpsc::Sender<ExportPage>, 158 + ) -> anyhow::Result<&'static str> { 159 + log::info!("starting upstream poller after {after:?}"); 160 let mut tick = tokio::time::interval(UPSTREAM_REQUEST_INTERVAL); 161 let mut prev_last: Option<LastOp> = after.map(Into::into); 162 let mut boundary_state: Option<PageBoundaryState> = None; ··· 197 const FIVES_TS: i64 = 1431648000; 198 const NEXT_TS: i64 = 1431648001; 199 200 + fn valid_op() -> Op { 201 + serde_json::from_value(serde_json::json!({ 202 "did": "did", 203 "cid": "cid", 204 "createdAt": "2015-05-15T00:00:00Z", 205 "nullified": false, 206 "operation": {}, 207 + })) 208 + .unwrap() 209 } 210 211 + fn next_op() -> Op { 212 + serde_json::from_value(serde_json::json!({ 213 "did": "didnext", 214 "cid": "cidnext", 215 "createdAt": "2015-05-15T00:00:01Z", 216 "nullified": false, 217 "operation": {}, 218 + })) 219 + .unwrap() 220 } 221 222 fn base_state() -> PageBoundaryState { 223 let page = ExportPage { 224 + ops: vec![valid_op()], 225 }; 226 + PageBoundaryState::new(&page).expect("to have a base page boundary state") 227 } 228 229 #[test] ··· 234 } 235 236 #[test] 237 fn test_boundary_new_one_op() { 238 let page = ExportPage { 239 + ops: vec![valid_op()], 240 }; 241 let state = PageBoundaryState::new(&page).unwrap(); 242 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); ··· 250 } 251 252 #[test] 253 fn test_add_new_empty() { 254 let mut state = base_state(); 255 state.apply_to_next(&mut ExportPage { ops: vec![] }); ··· 257 } 258 259 #[test] 260 fn test_add_new_same_op() { 261 let mut page = ExportPage { 262 + ops: vec![valid_op()], 263 }; 264 let mut state = base_state(); 265 state.apply_to_next(&mut page); ··· 270 fn test_add_new_same_time() { 271 // make an op with a different OpKey 272 let mut op = valid_op(); 273 + op.cid = "cid2".to_string(); 274 + let mut page = ExportPage { ops: vec![op] }; 275 276 let mut state = base_state(); 277 state.apply_to_next(&mut page); ··· 295 fn test_add_new_same_time_dup_before() { 296 // make an op with a different OpKey 297 let mut op = valid_op(); 298 + op.cid = "cid2".to_string(); 299 let mut page = ExportPage { 300 + ops: vec![valid_op(), op], 301 }; 302 303 let mut state = base_state(); ··· 322 fn test_add_new_same_time_dup_after() { 323 // make an op with a different OpKey 324 let mut op = valid_op(); 325 + op.cid = "cid2".to_string(); 326 let mut page = ExportPage { 327 + ops: vec![op, valid_op()], 328 }; 329 330 let mut state = base_state(); ··· 348 #[test] 349 fn test_add_new_next_time() { 350 let mut page = ExportPage { 351 + ops: vec![next_op()], 352 }; 353 let mut state = base_state(); 354 state.apply_to_next(&mut page); ··· 365 #[test] 366 fn test_add_new_next_time_with_dup() { 367 let mut page = ExportPage { 368 + ops: vec![valid_op(), next_op()], 369 }; 370 let mut state = base_state(); 371 state.apply_to_next(&mut page); ··· 378 },] 379 ); 380 assert_eq!(page.ops.len(), 1); 381 + assert_eq!(page.ops[0], next_op()); 382 } 383 384 #[test] 385 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 386 // make an op with a different OpKey 387 let mut op = valid_op(); 388 + op.cid = "cid2".to_string(); 389 390 let mut page = ExportPage { 391 ops: vec![ 392 + valid_op(), // should get dropped 393 + op.clone(), // should be kept 394 + next_op(), 395 ], 396 }; 397 let mut state = base_state(); ··· 405 },] 406 ); 407 assert_eq!(page.ops.len(), 2); 408 + assert_eq!(page.ops[0], op); 409 + assert_eq!(page.ops[1], next_op()); 410 } 411 412 #[test] 413 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 414 // make an op with a different OpKey 415 let mut op = valid_op(); 416 + op.cid = "cid2".to_string(); 417 418 let mut page = ExportPage { 419 ops: vec![ 420 + op.clone(), // should be kept 421 + valid_op(), // should get dropped 422 + next_op(), 423 ], 424 }; 425 let mut state = base_state(); ··· 433 },] 434 ); 435 assert_eq!(page.ops.len(), 2); 436 + assert_eq!(page.ops[0], op); 437 + assert_eq!(page.ops[1], next_op()); 438 } 439 }
+15 -7
src/ratelimit.rs
··· 24 let period = quota.replenish_interval() / factor; 25 let burst = quota 26 .burst_size() 27 - .checked_mul(factor.try_into().unwrap()) 28 - .unwrap(); 29 Quota::with_period(period).map(|q| q.allow_burst(burst)) 30 } 31 ··· 40 pub fn new(quota: Quota) -> Self { 41 Self { 42 per_ip: RateLimiter::keyed(quota), 43 - ip6_56: RateLimiter::keyed(scale_quota(quota, 8).unwrap()), 44 - ip6_48: RateLimiter::keyed(scale_quota(quota, 256).unwrap()), 45 } 46 } 47 pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { ··· 56 .map_err(asdf); 57 let check_56 = self 58 .ip6_56 59 - .check_key(a.octets()[..7].try_into().unwrap()) 60 .map_err(asdf); 61 let check_48 = self 62 .ip6_48 63 - .check_key(a.octets()[..6].try_into().unwrap()) 64 .map_err(asdf); 65 check_ip.and(check_56).and(check_48) 66 } ··· 135 let remote = req 136 .remote_addr() 137 .as_socket_addr() 138 - .unwrap_or_else(|| panic!("failed to get request's remote addr")) // TODO 139 .ip(); 140 141 log::trace!("remote: {remote}");
··· 24 let period = quota.replenish_interval() / factor; 25 let burst = quota 26 .burst_size() 27 + .checked_mul(factor.try_into().expect("factor to be non-zero")) 28 + .expect("burst to be able to multiply"); 29 Quota::with_period(period).map(|q| q.allow_burst(burst)) 30 } 31 ··· 40 pub fn new(quota: Quota) -> Self { 41 Self { 42 per_ip: RateLimiter::keyed(quota), 43 + ip6_56: RateLimiter::keyed(scale_quota(quota, 8).expect("to scale quota")), 44 + ip6_48: RateLimiter::keyed(scale_quota(quota, 256).expect("to scale quota")), 45 } 46 } 47 pub fn check_key(&self, ip: IpAddr) -> Result<(), Duration> { ··· 56 .map_err(asdf); 57 let check_56 = self 58 .ip6_56 59 + .check_key( 60 + a.octets()[..7] 61 + .try_into() 62 + .expect("to check ip6 /56 limiter"), 63 + ) 64 .map_err(asdf); 65 let check_48 = self 66 .ip6_48 67 + .check_key( 68 + a.octets()[..6] 69 + .try_into() 70 + .expect("to check ip6 /48 limiter"), 71 + ) 72 .map_err(asdf); 73 check_ip.and(check_56).and(check_48) 74 } ··· 143 let remote = req 144 .remote_addr() 145 .as_socket_addr() 146 + .expect("failed to get request's remote addr") // TODO 147 .ip(); 148 149 log::trace!("remote: {remote}");
+31 -14
src/weekly.rs
··· 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 - Ok(File::open(path).await?) 101 } 102 } 103 ··· 138 let mut week_t0 = total_t0; 139 140 while let Some(page) = rx.recv().await { 141 - for mut s in page.ops { 142 - let Ok(op) = serde_json::from_str::<Op>(&s) 143 - .inspect_err(|e| log::error!("failed to parse plc op, ignoring: {e}")) 144 - else { 145 - continue; 146 - }; 147 let op_week = op.created_at.into(); 148 if current_week.map(|w| w != op_week).unwrap_or(true) { 149 encoder.shutdown().await?; ··· 168 week_ops = 0; 169 week_t0 = now; 170 } 171 - s.push('\n'); // hack 172 - log::trace!("writing: {s}"); 173 - encoder.write_all(s.as_bytes()).await?; 174 total_ops += 1; 175 week_ops += 1; 176 } ··· 197 dest: mpsc::Sender<ExportPage>, 198 ) -> anyhow::Result<()> { 199 use futures::TryStreamExt; 200 - let decoder = GzipDecoder::new(BufReader::new(source.reader_for(week).await?)); 201 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 202 203 - while let Some(chunk) = chunks.try_next().await? { 204 - let ops: Vec<String> = chunk.into_iter().collect(); 205 let page = ExportPage { ops }; 206 - dest.send(page).await?; 207 } 208 Ok(()) 209 }
··· 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 + log::debug!("opening folder source: {path:?}"); 101 + let file = File::open(path) 102 + .await 103 + .inspect_err(|e| log::error!("failed to open file: {e}"))?; 104 + Ok(file) 105 } 106 } 107 ··· 142 let mut week_t0 = total_t0; 143 144 while let Some(page) = rx.recv().await { 145 + for op in page.ops { 146 let op_week = op.created_at.into(); 147 if current_week.map(|w| w != op_week).unwrap_or(true) { 148 encoder.shutdown().await?; ··· 167 week_ops = 0; 168 week_t0 = now; 169 } 170 + log::trace!("writing: {op:?}"); 171 + encoder 172 + .write_all(serde_json::to_string(&op)?.as_bytes()) 173 + .await?; 174 total_ops += 1; 175 week_ops += 1; 176 } ··· 197 dest: mpsc::Sender<ExportPage>, 198 ) -> anyhow::Result<()> { 199 use futures::TryStreamExt; 200 + let reader = source 201 + .reader_for(week) 202 + .await 203 + .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 204 + let decoder = GzipDecoder::new(BufReader::new(reader)); 205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 206 207 + while let Some(chunk) = chunks 208 + .try_next() 209 + .await 210 + .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 + { 212 + let ops: Vec<Op> = chunk 213 + .into_iter() 214 + .filter_map(|s| { 215 + serde_json::from_str::<Op>(&s) 216 + .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 + .ok() 218 + }) 219 + .collect(); 220 let page = ExportPage { ops }; 221 + dest.send(page) 222 + .await 223 + .inspect_err(|e| log::error!("failed to send page: {e}"))?; 224 } 225 Ok(()) 226 }