Server tools to backfill, tail, mirror, and verify PLC logs
1use serde::{Deserialize, Serialize};
2use tokio::sync::{mpsc, oneshot};
3
4mod backfill;
5mod cached_value;
6mod client;
7mod mirror;
8mod plc_pg;
9mod poll;
10mod ratelimit;
11mod weekly;
12
13pub mod bin;
14
15pub use backfill::backfill;
16pub use cached_value::{CachedValue, Fetcher};
17pub use client::{CLIENT, UA};
18pub use mirror::{ExperimentalConf, ListenConf, serve};
19pub use plc_pg::{Db, backfill_to_pg, pages_to_pg};
20pub use poll::{PageBoundaryState, get_page, poll_upstream};
21pub use ratelimit::{CreatePlcOpLimiter, GovernorMiddleware, IpLimiters};
22pub use weekly::{BundleSource, FolderSource, HttpSource, Week, pages_to_weeks, week_to_pages};
23
24pub type Dt = chrono::DateTime<chrono::Utc>;
25
26/// One page of PLC export
27///
28/// plc.directory caps /export at 1000 ops; backfill tasks may send more in a page.
29#[derive(Debug)]
30pub struct ExportPage {
31 pub ops: Vec<Op>,
32}
33
34impl ExportPage {
35 pub fn is_empty(&self) -> bool {
36 self.ops.is_empty()
37 }
38}
39
40/// A fully-deserialized plc operation
41///
42/// including the plc's wrapping with timestmap and nullified state
43#[derive(Debug, Clone, Deserialize, Serialize)]
44#[serde(rename_all = "camelCase")]
45pub struct Op {
46 pub did: String,
47 pub cid: String,
48 pub created_at: Dt,
49 pub nullified: bool,
50 pub operation: Box<serde_json::value::RawValue>,
51}
52
53#[cfg(test)]
54impl PartialEq for Op {
55 fn eq(&self, other: &Self) -> bool {
56 self.did == other.did
57 && self.cid == other.cid
58 && self.created_at == other.created_at
59 && self.nullified == other.nullified
60 && serde_json::from_str::<serde_json::Value>(self.operation.get()).unwrap()
61 == serde_json::from_str::<serde_json::Value>(other.operation.get()).unwrap()
62 }
63}
64
65/// Database primary key for an op
66#[derive(Debug, PartialEq)]
67pub struct OpKey {
68 pub did: String,
69 pub cid: String,
70}
71
72impl From<&Op> for OpKey {
73 fn from(Op { did, cid, .. }: &Op) -> Self {
74 Self {
75 did: did.to_string(),
76 cid: cid.to_string(),
77 }
78 }
79}
80
81/// page forwarder who drops its channels on receipt of a small page
82///
83/// PLC will return up to 1000 ops on a page, and returns full pages until it
84/// has caught up, so this is a (hacky?) way to stop polling once we're up.
85pub async fn full_pages(
86 mut rx: mpsc::Receiver<ExportPage>,
87 tx: mpsc::Sender<ExportPage>,
88) -> anyhow::Result<&'static str> {
89 while let Some(page) = rx.recv().await {
90 let n = page.ops.len();
91 if n < 900 {
92 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at);
93 let Some(age) = last_age else {
94 log::info!("full_pages done, empty final page");
95 return Ok("full pages (hmm)");
96 };
97 if age <= chrono::TimeDelta::hours(6) {
98 log::info!("full_pages done, final page of {n} ops");
99 } else {
100 log::warn!("full_pages finished with small page of {n} ops, but it's {age} old");
101 }
102 return Ok("full pages (cool)");
103 }
104 log::trace!("full_pages: continuing with page of {n} ops");
105 tx.send(page).await?;
106 }
107 Err(anyhow::anyhow!(
108 "full_pages ran out of source material, sender closed"
109 ))
110}
111
112pub async fn pages_to_stdout(
113 mut rx: mpsc::Receiver<ExportPage>,
114 notify_last_at: Option<oneshot::Sender<Option<Dt>>>,
115) -> anyhow::Result<&'static str> {
116 let mut last_at = None;
117 while let Some(page) = rx.recv().await {
118 for op in &page.ops {
119 println!("{}", serde_json::to_string(op)?);
120 }
121 if notify_last_at.is_some()
122 && let Some(s) = PageBoundaryState::new(&page)
123 {
124 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at));
125 }
126 }
127 if let Some(notify) = notify_last_at {
128 log::trace!("notifying last_at: {last_at:?}");
129 if notify.send(last_at).is_err() {
130 log::error!("receiver for last_at dropped, can't notify");
131 };
132 }
133 Ok("pages_to_stdout")
134}
135
136pub fn logo(name: &str) -> String {
137 format!(
138 r"
139
140 \ | | | |
141 _ \ | | -_) _` | -_) _` | | | | ({name})
142 _/ _\ _| _| \___| \__, | \___| \__,_| _| \_, | (v{})
143 ____| __/
144",
145 env!("CARGO_PKG_VERSION"),
146 )
147}