Server tools to backfill, tail, mirror, and verify PLC logs
at main 224 lines 7.3 kB view raw
1use allegedly::{ 2 Db, Dt, ExportPage, FjallDb, FolderSource, HttpSource, SeqPage, backfill, backfill_to_fjall, 3 backfill_to_pg, 4 bin::{GlobalArgs, bin_init}, 5 full_pages, full_pages_seq, logo, pages_to_pg, pages_to_stdout, poll_upstream, 6 poll_upstream_seq, 7}; 8use clap::Parser; 9use reqwest::Url; 10use std::{path::PathBuf, time::Duration}; 11use tokio::{ 12 sync::{mpsc, oneshot}, 13 task::JoinSet, 14}; 15 16pub const DEFAULT_HTTP: &str = "https://plc.t3.storage.dev/plc.directory/"; 17 18#[derive(Debug, clap::Args)] 19pub struct Args { 20 /// Remote URL prefix to fetch bundles from 21 #[arg(long)] 22 #[clap(default_value = DEFAULT_HTTP)] 23 http: Url, 24 /// Local folder to fetch bundles from (overrides `http`) 25 #[arg(long)] 26 dir: Option<PathBuf>, 27 /// Don't do weekly bulk-loading at all. 28 /// 29 /// overrides `http` and `dir`, makes catch_up redundant 30 #[arg(long, action)] 31 no_bulk: bool, 32 /// Parallel bundle fetchers 33 /// 34 /// Default: 4 for http fetches, 1 for local folder 35 #[arg(long)] 36 source_workers: Option<usize>, 37 /// Bulk load into did-method-plc-compatible postgres instead of stdout 38 /// 39 /// Pass a postgres connection url like "postgresql://localhost:5432" 40 #[arg(long, env = "ALLEGEDLY_TO_POSTGRES")] 41 to_postgres: Option<Url>, 42 /// Cert for postgres (if needed) 43 #[arg(long)] 44 postgres_cert: Option<PathBuf>, 45 /// Delete all operations from the db before starting 46 /// 47 /// only used if `--to-postgres` or `--to-fjall` is present 48 #[arg(long, action)] 49 reset: bool, 50 /// Load into a local fjall embedded database 51 /// (doesnt support bulk yet unless loading from another fjall db) 52 /// 53 /// Pass a directory path for the fjall database 54 #[arg(long, conflicts_with_all = ["to_postgres", "postgres_cert"])] 55 to_fjall: Option<PathBuf>, 56 /// Stop at the week ending before this date 57 #[arg(long)] 58 until: Option<Dt>, 59 /// After the weekly imports, poll upstream until we're caught up 60 #[arg(long, action)] 61 catch_up: bool, 62} 63 64pub async fn run( 65 GlobalArgs { 66 upstream, 67 upstream_throttle_ms, 68 }: GlobalArgs, 69 Args { 70 http, 71 dir, 72 no_bulk, 73 source_workers, 74 to_postgres, 75 postgres_cert, 76 reset, 77 to_fjall, 78 until, 79 catch_up, 80 }: Args, 81) -> anyhow::Result<()> { 82 let mut tasks = JoinSet::<anyhow::Result<&'static str>>::new(); 83 84 let (bulk_tx, bulk_out) = mpsc::channel(32); // bulk uses big pages 85 86 // a bulk sink can notify us as soon as the very last op's time is known 87 // so we can start catching up while the sink might restore indexes and such 88 let (found_last_tx, found_last_out) = if catch_up { 89 let (tx, rx) = oneshot::channel(); 90 (Some(tx), Some(rx)) 91 } else { 92 (None, None) 93 }; 94 95 let (poll_tx, poll_out) = mpsc::channel::<ExportPage>(128); // normal/small pages 96 let (full_tx, full_out) = mpsc::channel::<ExportPage>(1); // don't need to buffer at this filter 97 98 // set up sources 99 if no_bulk { 100 // simple mode, just poll upstream from teh beginning 101 if http != DEFAULT_HTTP.parse()? { 102 log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 103 } 104 if let Some(d) = dir { 105 log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 106 } 107 if let Some(u) = until { 108 log::warn!( 109 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 110 ); 111 } 112 let mut upstream = upstream; 113 upstream.set_path("/export"); 114 let throttle = Duration::from_millis(upstream_throttle_ms); 115 if let Some(fjall_path) = to_fjall { 116 log::trace!("opening fjall db at {fjall_path:?}..."); 117 let db = FjallDb::open(&fjall_path)?; 118 log::trace!("opened fjall db"); 119 120 let (poll_tx, poll_out) = mpsc::channel::<SeqPage>(128); // normal/small pages 121 let (full_tx, full_out) = mpsc::channel::<SeqPage>(1); // don't need to buffer at this filter 122 123 tasks.spawn(poll_upstream_seq(None, upstream, throttle, poll_tx)); 124 tasks.spawn(full_pages_seq(poll_out, full_tx)); 125 tasks.spawn(backfill_to_fjall(db, reset, full_out, None)); 126 } else { 127 tasks.spawn(poll_upstream(None, upstream, throttle, poll_tx)); 128 tasks.spawn(full_pages(poll_out, full_tx)); 129 tasks.spawn(pages_to_stdout(full_out, None)); 130 } 131 } else { 132 // fun mode 133 134 // set up bulk sources 135 if let Some(dir) = dir { 136 if http != DEFAULT_HTTP.parse()? { 137 anyhow::bail!( 138 "non-default bulk http setting can't be used with bulk dir setting ({dir:?})" 139 ); 140 } 141 tasks.spawn(backfill( 142 FolderSource(dir), 143 bulk_tx, 144 source_workers.unwrap_or(1), 145 until, 146 )); 147 } else { 148 tasks.spawn(backfill( 149 HttpSource(http), 150 bulk_tx, 151 source_workers.unwrap_or(4), 152 until, 153 )); 154 } 155 156 // and the catch-up source... 157 if let Some(last) = found_last_out { 158 let throttle = Duration::from_millis(upstream_throttle_ms); 159 tasks.spawn(async move { 160 let mut upstream = upstream; 161 upstream.set_path("/export"); 162 163 poll_upstream(last.await?, upstream, throttle, poll_tx).await 164 }); 165 } 166 167 // set up sinks 168 if let Some(pg_url) = to_postgres { 169 log::trace!("connecting to postgres..."); 170 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 171 log::trace!("connected to postgres"); 172 173 tasks.spawn(backfill_to_pg(db.clone(), reset, bulk_out, found_last_tx)); 174 if catch_up { 175 tasks.spawn(pages_to_pg(db, full_out)); 176 } 177 } else { 178 tasks.spawn(pages_to_stdout(bulk_out, found_last_tx)); 179 if catch_up { 180 tasks.spawn(pages_to_stdout(full_out, None)); 181 } 182 } 183 } 184 185 while let Some(next) = tasks.join_next().await { 186 match next { 187 Err(e) if e.is_panic() => { 188 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 189 return Err(e.into()); 190 } 191 Err(e) => { 192 log::error!("a joinset task failed to join: {e}"); 193 return Err(e.into()); 194 } 195 Ok(Err(e)) => { 196 log::error!("a joinset task completed with error: {e}"); 197 return Err(e); 198 } 199 Ok(Ok(name)) => { 200 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 201 } 202 } 203 } 204 205 Ok(()) 206} 207 208#[derive(Debug, Parser)] 209struct CliArgs { 210 #[command(flatten)] 211 globals: GlobalArgs, 212 #[command(flatten)] 213 args: Args, 214} 215 216#[allow(dead_code)] 217#[tokio::main] 218async fn main() -> anyhow::Result<()> { 219 let args = CliArgs::parse(); 220 bin_init(false); 221 log::info!("{}", logo("backfill")); 222 run(args.globals, args.args).await?; 223 Ok(()) 224}