Server tools to backfill, tail, mirror, and verify PLC logs
at main 297 lines 11 kB view raw
1use allegedly::{ 2 Db, ExperimentalConf, FjallDb, ListenConf, 3 bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 logo, pages_to_pg, poll_upstream, poll_upstream_seq, seq_pages_to_fjall, serve, serve_fjall, 5 tail_upstream_stream, 6}; 7use clap::Parser; 8use reqwest::Url; 9use std::{net::SocketAddr, path::PathBuf, time::Duration}; 10use tokio::{fs::create_dir_all, sync::mpsc, task::JoinSet}; 11 12#[derive(Debug, clap::Args)] 13pub struct Args { 14 /// the wrapped did-method-plc server (not needed when using --wrap-fjall) 15 #[arg(long, env = "ALLEGEDLY_WRAP")] 16 wrap: Option<Url>, 17 /// the wrapped did-method-plc server's database (write access required) 18 #[arg(long, env = "ALLEGEDLY_WRAP_PG", conflicts_with = "wrap_fjall")] 19 wrap_pg: Option<Url>, 20 /// path to tls cert for the wrapped postgres db, if needed 21 #[arg(long, env = "ALLEGEDLY_WRAP_PG_CERT")] 22 wrap_pg_cert: Option<PathBuf>, 23 /// path to a local fjall database directory (alternative to postgres) 24 #[arg(long, env = "ALLEGEDLY_WRAP_FJALL", conflicts_with_all = ["wrap_pg", "wrap_pg_cert"])] 25 wrap_fjall: Option<PathBuf>, 26 /// compact the fjall db on startup 27 #[arg( 28 long, 29 env = "ALLEGEDLY_FJALL_COMPACT", 30 conflicts_with_all = ["wrap_pg", "wrap_pg_cert"] 31 )] 32 compact_fjall: bool, 33 /// wrapping server listen address 34 #[arg(short, long, env = "ALLEGEDLY_BIND")] 35 #[clap(default_value = "127.0.0.1:8000")] 36 bind: SocketAddr, 37 /// obtain a certificate from letsencrypt 38 /// 39 /// for now this will force listening on all interfaces at :80 and :443 40 /// (:80 will serve an "https required" error, *will not* redirect) 41 #[arg( 42 long, 43 conflicts_with("bind"), 44 requires("acme_cache_path"), 45 env = "ALLEGEDLY_ACME_DOMAIN" 46 )] 47 acme_domain: Vec<String>, 48 /// which local directory to keep the letsencrypt certs in 49 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_CACHE_PATH")] 50 acme_cache_path: Option<PathBuf>, 51 /// which public acme directory to use 52 /// 53 /// eg. letsencrypt staging: "https://acme-staging-v02.api.letsencrypt.org/directory" 54 #[arg(long, requires("acme_domain"), env = "ALLEGEDLY_ACME_DIRECTORY_URL")] 55 #[clap(default_value = "https://acme-v02.api.letsencrypt.org/directory")] 56 acme_directory_url: Url, 57 /// try to listen for ipv6 58 #[arg(long, action, requires("acme_domain"), env = "ALLEGEDLY_ACME_IPV6")] 59 acme_ipv6: bool, 60 /// only accept experimental requests at this hostname 61 /// 62 /// a cert will be provisioned for it from letsencrypt. if you're not using 63 /// acme (eg., behind a tls-terminating reverse proxy), open a feature request. 64 #[arg( 65 long, 66 requires("acme_domain"), 67 env = "ALLEGEDLY_EXPERIMENTAL_ACME_DOMAIN" 68 )] 69 experimental_acme_domain: Option<String>, 70 /// accept writes! by forwarding them upstream 71 #[arg(long, action, env = "ALLEGEDLY_EXPERIMENTAL_WRITE_UPSTREAM")] 72 experimental_write_upstream: bool, 73 /// switch from polling to /export/stream once the latest op is within 74 /// this many days of now (plc.directory only supports ~1 week of backfill) 75 #[arg(long, env = "ALLEGEDLY_STREAM_CUTOVER_DAYS", default_value = "5")] 76 stream_cutover_days: u32, 77} 78 79pub async fn run( 80 GlobalArgs { 81 upstream, 82 upstream_throttle_ms, 83 }: GlobalArgs, 84 Args { 85 wrap, 86 wrap_pg, 87 wrap_pg_cert, 88 wrap_fjall, 89 compact_fjall, 90 bind, 91 acme_domain, 92 acme_cache_path, 93 acme_directory_url, 94 acme_ipv6, 95 experimental_acme_domain, 96 experimental_write_upstream, 97 stream_cutover_days, 98 }: Args, 99 sync: bool, 100) -> anyhow::Result<()> { 101 let listen_conf = match (bind, acme_domain.is_empty(), acme_cache_path) { 102 (_, false, Some(cache_path)) => { 103 create_dir_all(&cache_path).await?; 104 let mut domains = acme_domain.clone(); 105 if let Some(ref experimental_domain) = experimental_acme_domain { 106 domains.push(experimental_domain.clone()) 107 } 108 log::info!("configuring acme for https at {domains:?}..."); 109 ListenConf::Acme { 110 domains, 111 cache_path, 112 directory_url: acme_directory_url.to_string(), 113 ipv6: acme_ipv6, 114 } 115 } 116 (bind, true, None) => ListenConf::Bind(bind), 117 (_, _, _) => unreachable!(), 118 }; 119 120 let experimental_conf = ExperimentalConf { 121 acme_domain: experimental_acme_domain, 122 write_upstream: experimental_write_upstream, 123 }; 124 125 let mut tasks = JoinSet::new(); 126 127 if let Some(fjall_path) = wrap_fjall { 128 let db = FjallDb::open(&fjall_path)?; 129 if compact_fjall { 130 log::info!("compacting fjall..."); 131 db.compact()?; 132 } 133 134 log::debug!("getting the latest seq from fjall..."); 135 let latest_seq = db 136 .get_latest()? 137 .map(|(seq, _)| seq) 138 .expect("there to be at least one op in the db. did you backfill?"); 139 log::info!("starting seq polling from seq {latest_seq}..."); 140 141 let (send_page, recv_page) = mpsc::channel::<allegedly::SeqPage>(8); 142 143 let mut export_url = upstream.clone(); 144 export_url.set_path("/export"); 145 let mut stream_url = upstream.clone(); 146 stream_url.set_path("/export/stream"); 147 let throttle = Duration::from_millis(upstream_throttle_ms); 148 let cutover_age = Duration::from_secs(stream_cutover_days as u64 * 86_400); 149 150 // the poll -> stream task: poll until we're caught up, then switch to stream. 151 // on stream disconnect, fall back to polling to resync. 152 let send_page_bg = send_page.clone(); 153 tasks.spawn(async move { 154 let mut current_seq = latest_seq; 155 loop { 156 log::info!("seq polling from seq {current_seq}"); 157 let (inner_tx, mut inner_rx) = mpsc::channel::<allegedly::SeqPage>(8); 158 159 // run poller; it ends only when the channel closes 160 let poll_url = export_url.clone(); 161 let poll_task = tokio::spawn(poll_upstream_seq( 162 Some(current_seq), 163 poll_url, 164 throttle, 165 inner_tx, 166 )); 167 168 // drain pages from poller until the last op is within cutover_age of now, 169 // meaning we're close enough to the tip that the stream can cover the rest 170 let mut last_seq_from_poll = current_seq; 171 172 while let Some(page) = inner_rx.recv().await { 173 let near_tip = page.ops.last().map_or(false, |op| { 174 let age = chrono::Utc::now().signed_duration_since(op.created_at); 175 age.to_std().map_or(false, |d| d <= cutover_age) 176 }); 177 if let Some(last) = page.ops.last() { 178 last_seq_from_poll = last.seq; 179 } 180 let _ = send_page_bg.send(page).await; 181 if near_tip { 182 break; 183 } 184 } 185 186 poll_task.abort(); 187 current_seq = last_seq_from_poll; 188 189 // switch to streaming 190 log::info!("caught up at seq {current_seq}, switching to /export/stream"); 191 let (stream_inner_tx, mut stream_inner_rx) = mpsc::channel::<allegedly::SeqPage>(8); 192 let stream_task = tokio::spawn(tail_upstream_stream( 193 Some(current_seq), 194 stream_url.clone(), 195 stream_inner_tx, 196 )); 197 198 while let Some(page) = stream_inner_rx.recv().await { 199 if let Some(last) = page.ops.last() { 200 current_seq = last.seq; 201 } 202 if send_page_bg.send(page).await.is_err() { 203 stream_task.abort(); 204 return anyhow::Ok("fjall-poll-stream (dest closed)"); 205 } 206 } 207 208 // stream ended/errored — loop back to polling to resync 209 match stream_task.await { 210 Ok(Ok(())) => log::info!("stream closed cleanly, resyncing via poll"), 211 Ok(Err(e)) => log::warn!("stream error: {e}, resyncing via poll"), 212 Err(e) => log::warn!("stream task join error: {e}"), 213 } 214 } 215 }); 216 217 tasks.spawn(seq_pages_to_fjall(db.clone(), recv_page)); 218 tasks.spawn(serve_fjall(upstream, listen_conf, experimental_conf, db)); 219 } else { 220 let wrap = wrap.ok_or(anyhow::anyhow!( 221 "--wrap is required unless using --wrap-fjall" 222 ))?; 223 224 let db: Option<Db> = if sync { 225 let wrap_pg = wrap_pg.ok_or(anyhow::anyhow!( 226 "a wrapped reference postgres (--wrap-pg) or fjall db (--wrap-fjall) must be provided to sync" 227 ))?; 228 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 229 230 log::debug!("getting the latest op from the db..."); 231 let latest = db 232 .get_latest() 233 .await? 234 .expect("there to be at least one op in the db. did you backfill?"); 235 log::debug!("starting polling from {latest}..."); 236 237 let (send_page, recv_page) = mpsc::channel(8); 238 239 let mut poll_url = upstream.clone(); 240 poll_url.set_path("/export"); 241 let throttle = Duration::from_millis(upstream_throttle_ms); 242 243 tasks.spawn(poll_upstream(Some(latest), poll_url, throttle, send_page)); 244 tasks.spawn(pages_to_pg(db.clone(), recv_page)); 245 Some(db) 246 } else { 247 None 248 }; 249 250 tasks.spawn(serve(upstream, wrap, listen_conf, experimental_conf, db)); 251 } 252 253 while let Some(next) = tasks.join_next().await { 254 match next { 255 Err(e) if e.is_panic() => { 256 log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 257 return Err(e.into()); 258 } 259 Err(e) => { 260 log::error!("a joinset task failed to join: {e}"); 261 return Err(e.into()); 262 } 263 Ok(Err(e)) => { 264 log::error!("a joinset task completed with error: {e}"); 265 return Err(e); 266 } 267 Ok(Ok(name)) => { 268 log::trace!("a task completed: {name:?}. {} left", tasks.len()); 269 } 270 } 271 } 272 273 Ok(()) 274} 275 276#[derive(Debug, Parser)] 277struct CliArgs { 278 #[command(flatten)] 279 globals: GlobalArgs, 280 #[command(flatten)] 281 instrumentation: InstrumentationArgs, 282 #[command(flatten)] 283 args: Args, 284 /// Run the mirror in wrap mode, no upstream synchronization (read-only) 285 #[arg(long, action)] 286 wrap_mode: bool, 287} 288 289#[allow(dead_code)] 290#[tokio::main] 291async fn main() -> anyhow::Result<()> { 292 let args = CliArgs::parse(); 293 bin_init(args.instrumentation.enable_opentelemetry); 294 log::info!("{}", logo("mirror")); 295 run(args.globals, args.args, !args.wrap_mode).await?; 296 Ok(()) 297}