forked from
microcosm.blue/Allegedly
fork
Configure Feed
Select the types of activity you want to include in your feed.
Server tools to backfill, tail, mirror, and verify PLC logs
fork
Configure Feed
Select the types of activity you want to include in your feed.
1use crate::{CLIENT, Dt, ExportPage, Op};
2use async_compression::tokio::bufread::GzipDecoder;
3use async_compression::tokio::write::GzipEncoder;
4use core::pin::pin;
5use reqwest::Url;
6use std::future::Future;
7use std::ops::{Bound, RangeBounds};
8use std::path::PathBuf;
9use tokio::{
10 fs::File,
11 io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader},
12 sync::mpsc,
13};
14use tokio_stream::wrappers::LinesStream;
15use tokio_util::compat::FuturesAsyncReadCompatExt;
16
17const WEEK_IN_SECONDS: i64 = 7 * 86_400;
18
19#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
20pub struct Week(i64);
21
22impl Week {
23 pub const fn from_n(n: i64) -> Self {
24 Self(n)
25 }
26 pub fn range(r: impl RangeBounds<Week>) -> Vec<Self> {
27 let first = match r.start_bound() {
28 Bound::Included(week) => *week,
29 Bound::Excluded(week) => week.next(),
30 Bound::Unbounded => panic!("week range must have a defined start bound"),
31 };
32 let last = match r.end_bound() {
33 Bound::Included(week) => *week,
34 Bound::Excluded(week) => week.prev(),
35 Bound::Unbounded => Self(Self::nullification_cutoff()).prev(),
36 };
37 let mut out = Vec::new();
38 let mut current = first;
39 while current <= last {
40 out.push(current);
41 current = current.next();
42 }
43 out
44 }
45 pub fn n_ago(&self) -> i64 {
46 let now = chrono::Utc::now().timestamp();
47 (now - self.0) / WEEK_IN_SECONDS
48 }
49 pub fn n_until(&self, other: Week) -> i64 {
50 let Self(until) = other;
51 (until - self.0) / WEEK_IN_SECONDS
52 }
53 pub fn next(&self) -> Week {
54 Self(self.0 + WEEK_IN_SECONDS)
55 }
56 pub fn prev(&self) -> Week {
57 Self(self.0 - WEEK_IN_SECONDS)
58 }
59 /// whether the plc log for this week outside the 72h nullification window
60 ///
61 /// plus one hour for safety (week must have ended > 73 hours ago)
62 pub fn is_immutable(&self) -> bool {
63 self.next().0 <= Self::nullification_cutoff()
64 }
65 fn nullification_cutoff() -> i64 {
66 const HOUR_IN_SECONDS: i64 = 3600;
67 let now = chrono::Utc::now().timestamp();
68 now - (73 * HOUR_IN_SECONDS)
69 }
70}
71
72impl From<Dt> for Week {
73 fn from(dt: Dt) -> Self {
74 let ts = dt.timestamp();
75 let truncated = (ts / WEEK_IN_SECONDS) * WEEK_IN_SECONDS;
76 Week(truncated)
77 }
78}
79
80impl From<Week> for Dt {
81 fn from(week: Week) -> Dt {
82 let Week(ts) = week;
83 Dt::from_timestamp(ts, 0).expect("the week to be in valid range")
84 }
85}
86
87pub trait BundleSource: Clone {
88 fn reader_for(
89 &self,
90 week: Week,
91 ) -> impl Future<Output = anyhow::Result<impl AsyncRead + Send>> + Send;
92}
93
94#[derive(Debug, Clone)]
95pub struct FolderSource(pub PathBuf);
96impl BundleSource for FolderSource {
97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
98 let FolderSource(dir) = self;
99 let path = dir.join(format!("{}.jsonl.gz", week.0));
100 log::debug!("opening folder source: {path:?}");
101 let file = File::open(path)
102 .await
103 .inspect_err(|e| log::error!("failed to open file: {e}"))?;
104 Ok(file)
105 }
106}
107
108#[derive(Debug, Clone)]
109pub struct HttpSource(pub Url);
110impl BundleSource for HttpSource {
111 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> {
112 use futures::TryStreamExt;
113 let HttpSource(base) = self;
114 let url = base.join(&format!("{}.jsonl.gz", week.0))?;
115 Ok(CLIENT
116 .get(url)
117 .send()
118 .await?
119 .error_for_status()?
120 .bytes_stream()
121 .map_err(futures::io::Error::other)
122 .into_async_read()
123 .compat())
124 }
125}
126
127pub async fn pages_to_weeks(
128 mut rx: mpsc::Receiver<ExportPage>,
129 dir: PathBuf,
130 clobber: bool,
131) -> anyhow::Result<()> {
132 pub use std::time::Instant;
133
134 // ...there is certainly a nicer way to write this
135 let mut current_week: Option<Week> = None;
136 let dummy_file = File::create(dir.join("_dummy")).await?;
137 let mut encoder = GzipEncoder::new(dummy_file);
138
139 let mut total_ops = 0;
140 let total_t0 = Instant::now();
141 let mut week_ops = 0;
142 let mut week_t0 = total_t0;
143
144 while let Some(page) = rx.recv().await {
145 for op in page.ops {
146 let op_week = op.created_at.into();
147 if current_week.map(|w| w != op_week).unwrap_or(true) {
148 encoder.shutdown().await?;
149 let now = Instant::now();
150
151 log::info!(
152 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)",
153 current_week.map(|w| -w.n_ago()).unwrap_or(0),
154 current_week.unwrap_or(Week(0)).0,
155 (week_ops as f64) / (now - week_t0).as_secs_f64(),
156 total_ops / 1000,
157 (total_ops as f64) / (now - total_t0).as_secs_f64(),
158 );
159 let path = dir.join(format!("{}.jsonl.gz", op_week.0));
160 let file = if clobber {
161 File::create(path).await?
162 } else {
163 File::create_new(path).await?
164 };
165 encoder = GzipEncoder::with_quality(file, async_compression::Level::Best);
166 current_week = Some(op_week);
167 week_ops = 0;
168 week_t0 = now;
169 }
170 log::trace!("writing: {op:?}");
171 encoder
172 .write_all(serde_json::to_string(&op)?.as_bytes())
173 .await?;
174 total_ops += 1;
175 week_ops += 1;
176 }
177 }
178
179 // don't forget the final file
180 encoder.shutdown().await?;
181 let now = Instant::now();
182 log::info!(
183 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)",
184 current_week.map(|w| -w.n_ago()).unwrap_or(0),
185 current_week.unwrap_or(Week(0)).0,
186 (week_ops as f64) / (now - week_t0).as_secs_f64(),
187 total_ops / 1000,
188 (total_ops as f64) / (now - total_t0).as_secs_f64(),
189 );
190
191 Ok(())
192}
193
194pub async fn week_to_pages(
195 source: impl BundleSource,
196 week: Week,
197 dest: mpsc::Sender<ExportPage>,
198) -> anyhow::Result<()> {
199 use futures::TryStreamExt;
200 let reader = source
201 .reader_for(week)
202 .await
203 .inspect_err(|e| log::error!("week_to_pages reader failed: {e}"))?;
204 let decoder = GzipDecoder::new(BufReader::new(reader));
205 let mut chunks = pin!(LinesStream::new(BufReader::new(decoder).lines()).try_chunks(10000));
206
207 while let Some(chunk) = chunks
208 .try_next()
209 .await
210 .inspect_err(|e| log::error!("failed to get next chunk: {e}"))?
211 {
212 let ops: Vec<Op> = chunk
213 .into_iter()
214 .filter_map(|s| {
215 serde_json::from_str::<Op>(&s)
216 .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})"))
217 .ok()
218 })
219 .collect();
220 let page = ExportPage { ops };
221 dest.send(page)
222 .await
223 .inspect_err(|e| log::error!("failed to send page: {e}"))?;
224 }
225 Ok(())
226}