Server tools to backfill, tail, mirror, and verify PLC logs
52
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 226 lines 7.3 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op}; 2use async_compression::tokio::bufread::GzipDecoder; 3use async_compression::tokio::write::GzipEncoder; 4use core::pin::pin; 5use reqwest::Url; 6use std::future::Future; 7use std::ops::{Bound, RangeBounds}; 8use std::path::PathBuf; 9use tokio::{ 10 fs::File, 11 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader}, 12 sync::mpsc, 13}; 14use tokio_stream::wrappers::LinesStream; 15use tokio_util::compat::FuturesAsyncReadCompatExt; 16 17const WEEK_IN_SECONDS: i64 = 7 * 86_400; 18 19#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] 20pub struct Week(i64); 21 22impl Week { 23 pub const fn from_n(n: i64) -> Self { 24 Self(n) 25 } 26 pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> { 27 let first = match r.start_bound() { 28 Bound::Included(week) => *week, 29 Bound::Excluded(week) => week.next(), 30 Bound::Unbounded => panic!("week range must have a defined start bound"), 31 }; 32 let last = match r.end_bound() { 33 Bound::Included(week) => *week, 34 Bound::Excluded(week) => week.prev(), 35 Bound::Unbounded => Self(Self::nullification_cutoff()).prev(), 36 }; 37 let mut out = Vec::new(); 38 let mut current = first; 39 while current <= last { 40 out.push(current); 41 current = current.next(); 42 } 43 out 44 } 45 pub fn n_ago(&self) -> i64 { 46 let now = chrono::Utc::now().timestamp(); 47 (now - self.0) / WEEK_IN_SECONDS 48 } 49 pub fn n_until(&self, other: Week) -> i64 { 50 let Self(until) = other; 51 (until - self.0) / WEEK_IN_SECONDS 52 } 53 pub fn next(&self) -> Week { 54 Self(self.0 + WEEK_IN_SECONDS) 55 } 56 pub fn prev(&self) -> Week { 57 Self(self.0 - WEEK_IN_SECONDS) 58 } 59 /// whether the plc log for this week outside the 72h nullification window 60 /// 61 /// plus one hour for safety (week must have ended > 73 hours ago) 62 pub fn is_immutable(&self) -> bool { 63 self.next().0 <= Self::nullification_cutoff() 64 } 65 fn nullification_cutoff() -> i64 { 66 const HOUR_IN_SECONDS: i64 = 3600; 67 let now = chrono::Utc::now().timestamp(); 68 now - (73 * HOUR_IN_SECONDS) 69 } 70} 71 72impl From<Dt> for Week { 73 fn from(dt: Dt) -> Self { 74 let ts = dt.timestamp(); 75 let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS; 76 Week(truncated) 77 } 78} 79 80impl From<Week> for Dt { 81 fn from(week: Week) -> Dt { 82 let Week(ts) = week; 83 Dt::from_timestamp(ts, 0).expect("the week to be in valid range") 84 } 85} 86 87pub trait BundleSource: Clone { 88 fn reader_for( 89 &self, 90 week: Week, 91 ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send; 92} 93 94#[derive(Debug, Clone)] 95pub struct FolderSource(pub PathBuf); 96impl BundleSource for FolderSource { 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 let FolderSource(dir) = self; 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 log::debug!("opening folder source: {path:?}"); 101 let file = File::open(path) 102 .await 103 .inspect_err(|e| log::error!("failed to open file: {e}"))?; 104 Ok(file) 105 } 106} 107 108#[derive(Debug, Clone)] 109pub struct HttpSource(pub Url); 110impl BundleSource for HttpSource { 111 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 112 use futures::TryStreamExt; 113 let HttpSource(base) = self; 114 let url = base.join(&format!("{}.jsonl.gz", week.0))?; 115 Ok(CLIENT 116 .get(url) 117 .send() 118 .await? 119 .error_for_status()? 120 .bytes_stream() 121 .map_err(futures::io::Error::other) 122 .into_async_read() 123 .compat()) 124 } 125} 126 127pub async fn pages_to_weeks( 128 mut rx: mpsc::Receiver<ExportPage>, 129 dir: PathBuf, 130 clobber: bool, 131) -> anyhow::Result<()> { 132 pub use std::time::Instant; 133 134 // ...there is certainly a nicer way to write this 135 let mut current_week: Option<Week> = None; 136 let dummy_file = File::create(dir.join("_dummy")).await?; 137 let mut encoder = GzipEncoder::new(dummy_file); 138 139 let mut total_ops = 0; 140 let total_t0 = Instant::now(); 141 let mut week_ops = 0; 142 let mut week_t0 = total_t0; 143 144 while let Some(page) = rx.recv().await { 145 for op in page.ops { 146 let op_week = op.created_at.into(); 147 if current_week.map(|w| w != op_week).unwrap_or(true) { 148 encoder.shutdown().await?; 149 let now = Instant::now(); 150 151 log::info!( 152 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 153 current_week.map(|w| -w.n_ago()).unwrap_or(0), 154 current_week.unwrap_or(Week(0)).0, 155 (week_ops as f64) / (now - week_t0).as_secs_f64(), 156 total_ops / 1000, 157 (total_ops as f64) / (now - total_t0).as_secs_f64(), 158 ); 159 let path = dir.join(format!("{}.jsonl.gz", op_week.0)); 160 let file = if clobber { 161 File::create(path).await? 162 } else { 163 File::create_new(path).await? 164 }; 165 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best); 166 current_week = Some(op_week); 167 week_ops = 0; 168 week_t0 = now; 169 } 170 log::trace!("writing: {op:?}"); 171 encoder 172 .write_all(serde_json::to_string(&op)?.as_bytes()) 173 .await?; 174 total_ops += 1; 175 week_ops += 1; 176 } 177 } 178 179 // don't forget the final file 180 encoder.shutdown().await?; 181 let now = Instant::now(); 182 log::info!( 183 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 184 current_week.map(|w| -w.n_ago()).unwrap_or(0), 185 current_week.unwrap_or(Week(0)).0, 186 (week_ops as f64) / (now - week_t0).as_secs_f64(), 187 total_ops / 1000, 188 (total_ops as f64) / (now - total_t0).as_secs_f64(), 189 ); 190 191 Ok(()) 192} 193 194pub async fn week_to_pages( 195 source: impl BundleSource, 196 week: Week, 197 dest: mpsc::Sender<ExportPage>, 198) -> anyhow::Result<()> { 199 use futures::TryStreamExt; 200 let reader = source 201 .reader_for(week) 202 .await 203 .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?; 204 let decoder = GzipDecoder::new(BufReader::new(reader)); 205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000)); 206 207 while let Some(chunk) = chunks 208 .try_next() 209 .await 210 .inspect_err(|e| log::error!("failed to get next chunk: {e}"))? 211 { 212 let ops: Vec<Op> = chunk 213 .into_iter() 214 .filter_map(|s| { 215 serde_json::from_str::<Op>(&s) 216 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 217 .ok() 218 }) 219 .collect(); 220 let page = ExportPage { ops }; 221 dest.send(page) 222 .await 223 .inspect_err(|e| log::error!("failed to send page: {e}"))?; 224 } 225 Ok(()) 226}