Server tools to backfill, tail, mirror, and verify PLC logs
at main 147 lines 4.4 kB view raw
1use serde::{Deserialize, Serialize}; 2use tokio::sync::{mpsc, oneshot}; 3 4mod backfill; 5mod cached_value; 6mod client; 7mod mirror; 8mod plc_pg; 9mod poll; 10mod ratelimit; 11mod weekly; 12 13pub mod bin; 14 15pub use backfill::backfill; 16pub use cached_value::{CachedValue, Fetcher}; 17pub use client::{CLIENT, UA}; 18pub use mirror::{ExperimentalConf, ListenConf, serve}; 19pub use plc_pg::{Db, backfill_to_pg, pages_to_pg}; 20pub use poll::{PageBoundaryState, get_page, poll_upstream}; 21pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters}; 22pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages}; 23 24pub type Dt = chrono::DateTime<chrono::Utc>; 25 26/// One page of PLC export 27/// 28/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page. 29#[derive(Debug)] 30pub struct ExportPage { 31 pub ops: Vec<Op>, 32} 33 34impl ExportPage { 35 pub fn is_empty(&self) -> bool { 36 self.ops.is_empty() 37 } 38} 39 40/// A fully-deserialized plc operation 41/// 42/// including the plc's wrapping with timestmap and nullified state 43#[derive(Debug, Clone, Deserialize, Serialize)] 44#[serde(rename_all = "camelCase")] 45pub struct Op { 46 pub did: String, 47 pub cid: String, 48 pub created_at: Dt, 49 pub nullified: bool, 50 pub operation: Box<serde_json::value::RawValue>, 51} 52 53#[cfg(test)] 54impl PartialEq for Op { 55 fn eq(&self, other: &Self) -> bool { 56 self.did == other.did 57 && self.cid == other.cid 58 && self.created_at == other.created_at 59 && self.nullified == other.nullified 60 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap() 61 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap() 62 } 63} 64 65/// Database primary key for an op 66#[derive(Debug, PartialEq)] 67pub struct OpKey { 68 pub did: String, 69 pub cid: String, 70} 71 72impl From<&Op> for OpKey { 73 fn from(Op { did, cid, .. }: &Op) -> Self { 74 Self { 75 did: did.to_string(), 76 cid: cid.to_string(), 77 } 78 } 79} 80 81/// page forwarder who drops its channels on receipt of a small page 82/// 83/// PLC will return up to 1000 ops on a page, and returns full pages until it 84/// has caught up, so this is a (hacky?) way to stop polling once we're up. 85pub async fn full_pages( 86 mut rx: mpsc::Receiver<ExportPage>, 87 tx: mpsc::Sender<ExportPage>, 88) -> anyhow::Result<&'static str> { 89 while let Some(page) = rx.recv().await { 90 let n = page.ops.len(); 91 if n < 900 { 92 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 93 let Some(age) = last_age else { 94 log::info!("full_pages done, empty final page"); 95 return Ok("full pages (hmm)"); 96 }; 97 if age <= chrono::TimeDelta::hours(6) { 98 log::info!("full_pages done, final page of {n} ops"); 99 } else { 100 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 101 } 102 return Ok("full pages (cool)"); 103 } 104 log::trace!("full_pages: continuing with page of {n} ops"); 105 tx.send(page).await?; 106 } 107 Err(anyhow::anyhow!( 108 "full_pages ran out of source material, sender closed" 109 )) 110} 111 112pub async fn pages_to_stdout( 113 mut rx: mpsc::Receiver<ExportPage>, 114 notify_last_at: Option<oneshot::Sender<Option<Dt>>>, 115) -> anyhow::Result<&'static str> { 116 let mut last_at = None; 117 while let Some(page) = rx.recv().await { 118 for op in &page.ops { 119 println!("{}", serde_json::to_string(op)?); 120 } 121 if notify_last_at.is_some() 122 && let Some(s) = PageBoundaryState::new(&page) 123 { 124 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 125 } 126 } 127 if let Some(notify) = notify_last_at { 128 log::trace!("notifying last_at: {last_at:?}"); 129 if notify.send(last_at).is_err() { 130 log::error!("receiver for last_at dropped, can't notify"); 131 }; 132 } 133 Ok("pages_to_stdout") 134} 135 136pub fn logo(name: &str) -> String { 137 format!( 138 r" 139 140 \ | | | | 141 _ \ | | -_) _` | -_) _` | | | | ({name}) 142 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{}) 143 ____| __/ 144", 145 env!("CARGO_PKG_VERSION"), 146 ) 147}