Server tools to backfill, tail, mirror, and verify PLC logs
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey};
2use reqwest::Url;
3use std::time::Duration;
4use thiserror::Error;
5use tokio::sync::mpsc;
6
7#[derive(Debug, Error)]
8pub enum GetPageError {
9 #[error(transparent)]
10 Reqwest(#[from] reqwest::Error),
11 #[error(transparent)]
12 ReqwestMiddleware(#[from] reqwest_middleware::Error),
13 #[error(transparent)]
14 Serde(#[from] serde_json::Error),
15}
16
17/// ops are primary-keyed by (did, cid)
18/// plc orders by `created_at` but does not guarantee distinct times per op
19/// we assume that the order will at least be deterministic: this may be unsound
20#[derive(Debug, PartialEq)]
21pub struct LastOp {
22 pub created_at: Dt, // any op greater is definitely not duplicated
23 pk: (String, String), // did, cid
24}
25
26impl From<Op> for LastOp {
27 fn from(op: Op) -> Self {
28 Self {
29 created_at: op.created_at,
30 pk: (op.did, op.cid),
31 }
32 }
33}
34
35impl From<&Op> for LastOp {
36 fn from(op: &Op) -> Self {
37 Self {
38 created_at: op.created_at,
39 pk: (op.did.clone(), op.cid.clone()),
40 }
41 }
42}
43
44// bit of a hack
45impl From<Dt> for LastOp {
46 fn from(dt: Dt) -> Self {
47 Self {
48 created_at: dt,
49 pk: ("".to_string(), "".to_string()),
50 }
51 }
52}
53
54/// State for removing duplicates ops between PLC export page boundaries
55#[derive(Debug, PartialEq)]
56pub struct PageBoundaryState {
57 /// The previous page's last timestamp
58 ///
59 /// Duplicate ops from /export only occur for the same exact timestamp
60 pub last_at: Dt,
61 /// The previous page's ops at its last timestamp
62 keys_at: Vec<OpKey>, // expected to ~always be length one
63}
64
65impl PageBoundaryState {
66 /// Initialize the boundary state with a PLC page
67 pub fn new(page: &ExportPage) -> Option<Self> {
68 // grab the very last op
69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?;
70
71 // set initial state
72 let mut me = Self {
73 last_at,
74 keys_at: vec![last_key],
75 };
76
77 // and make sure all keys at this time are captured from the back
78 me.capture_nth_last_at(page, last_at, 1);
79
80 Some(me)
81 }
82 /// Apply the deduplication and update state
83 ///
84 /// The beginning of the page will be modified to remove duplicates from the
85 /// previous page.
86 ///
87 /// The end of the page is inspected to update the deduplicator state for
88 /// the next page.
89 fn apply_to_next(&mut self, page: &mut ExportPage) {
90 // walk ops forward, kicking previously-seen ops until created_at advances
91 let to_remove: Vec<usize> = page
92 .ops
93 .iter()
94 .enumerate()
95 .take_while(|(_, op)| op.created_at == self.last_at)
96 .filter(|(_, op)| self.keys_at.contains(&(*op).into()))
97 .map(|(i, _)| i)
98 .collect();
99
100 // actually remove them. last to first so indices don't shift
101 for dup_idx in to_remove.into_iter().rev() {
102 page.ops.remove(dup_idx);
103 }
104
105 // grab the very last op
106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else {
107 // there are no ops left? oop. bail.
108 // last_at and existing keys remain in tact
109 return;
110 };
111
112 // reset state (as long as time actually moved forward on this page)
113 if last_at > self.last_at {
114 self.last_at = last_at;
115 self.keys_at = vec![last_key];
116 } else {
117 // weird cases: either time didn't move (fine...) or went backwards (not fine)
118 assert_eq!(last_at, self.last_at, "time moved backwards on a page");
119 self.keys_at.push(last_key);
120 }
121 // and make sure all keys at this time are captured from the back
122 self.capture_nth_last_at(page, last_at, 1);
123 }
124
125 /// walk backwards from 2nd last and collect keys until created_at changes
126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) {
127 page.ops
128 .iter()
129 .rev()
130 .skip(skips)
131 .take_while(|op| op.created_at == last_at)
132 .for_each(|op| {
133 self.keys_at.push(op.into());
134 });
135 }
136}
137
138/// Get one PLC export page
139///
140/// Extracts the final op so it can be used to fetch the following page
141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> {
142 log::trace!("Getting page: {url}");
143
144 let ops: Vec<Op> = CLIENT
145 .get(url)
146 .send()
147 .await?
148 .error_for_status()?
149 .text()
150 .await?
151 .trim()
152 .split('\n')
153 .filter_map(|s| {
154 serde_json::from_str::<Op>(s)
155 .inspect_err(|e| {
156 if !s.is_empty() {
157 log::warn!("failed to parse op: {e} ({s})")
158 }
159 })
160 .ok()
161 })
162 .collect();
163
164 let last_op = ops.last().map(Into::into);
165
166 Ok((ExportPage { ops }, last_op))
167}
168
169/// Poll an upstream PLC server for new ops
170///
171/// Pages of operations are written to the `dest` channel.
172///
173/// ```no_run
174/// # #[tokio::main]
175/// # async fn main() {
176/// use allegedly::{ExportPage, Op, poll_upstream};
177///
178/// let after = Some(chrono::Utc::now());
179/// let upstream = "https://plc.wtf/export".parse().unwrap();
180/// let throttle = std::time::Duration::from_millis(300);
181///
182/// let (tx, mut rx) = tokio::sync::mpsc::channel(1);
183/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx));
184///
185/// while let Some(ExportPage { ops }) = rx.recv().await {
186/// println!("received {} plc ops", ops.len());
187///
188/// for Op { did, cid, operation, .. } in ops {
189/// // in this example we're alerting when changes are found for one
190/// // specific identity
191/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" {
192/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get());
193/// }
194/// }
195/// }
196/// # }
197/// ```
198pub async fn poll_upstream(
199 after: Option<Dt>,
200 base: Url,
201 throttle: Duration,
202 dest: mpsc::Sender<ExportPage>,
203) -> anyhow::Result<&'static str> {
204 log::info!("starting upstream poller at {base} after {after:?}");
205 let mut tick = tokio::time::interval(throttle);
206 let mut prev_last: Option<LastOp> = after.map(Into::into);
207 let mut boundary_state: Option<PageBoundaryState> = None;
208 loop {
209 tick.tick().await;
210
211 let mut url = base.clone();
212 if let Some(ref pl) = prev_last {
213 url.query_pairs_mut()
214 .append_pair("after", &pl.created_at.to_rfc3339());
215 };
216
217 let (mut page, next_last) = get_page(url).await?;
218 if let Some(ref mut state) = boundary_state {
219 state.apply_to_next(&mut page);
220 } else {
221 boundary_state = PageBoundaryState::new(&page);
222 }
223 if !page.is_empty() {
224 match dest.try_send(page) {
225 Ok(()) => {}
226 Err(mpsc::error::TrySendError::Full(page)) => {
227 log::warn!("export: destination channel full, awaiting...");
228 dest.send(page).await?;
229 }
230 e => e?,
231 };
232 }
233
234 prev_last = next_last.or(prev_last);
235 }
236}
237
238#[cfg(test)]
239mod test {
240 use super::*;
241
242 const FIVES_TS: i64 = 1431648000;
243 const NEXT_TS: i64 = 1431648001;
244
245 fn valid_op() -> Op {
246 serde_json::from_value(serde_json::json!({
247 "did": "did",
248 "cid": "cid",
249 "createdAt": "2015-05-15T00:00:00Z",
250 "nullified": false,
251 "operation": {},
252 }))
253 .unwrap()
254 }
255
256 fn next_op() -> Op {
257 serde_json::from_value(serde_json::json!({
258 "did": "didnext",
259 "cid": "cidnext",
260 "createdAt": "2015-05-15T00:00:01Z",
261 "nullified": false,
262 "operation": {},
263 }))
264 .unwrap()
265 }
266
267 fn base_state() -> PageBoundaryState {
268 let page = ExportPage {
269 ops: vec![valid_op()],
270 };
271 PageBoundaryState::new(&page).expect("to have a base page boundary state")
272 }
273
274 #[test]
275 fn test_boundary_new_empty() {
276 let page = ExportPage { ops: vec![] };
277 let state = PageBoundaryState::new(&page);
278 assert!(state.is_none());
279 }
280
281 #[test]
282 fn test_boundary_new_one_op() {
283 let page = ExportPage {
284 ops: vec![valid_op()],
285 };
286 let state = PageBoundaryState::new(&page).unwrap();
287 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
288 assert_eq!(
289 state.keys_at,
290 vec![OpKey {
291 cid: "cid".to_string(),
292 did: "did".to_string(),
293 }]
294 );
295 }
296
297 #[test]
298 fn test_add_new_empty() {
299 let mut state = base_state();
300 state.apply_to_next(&mut ExportPage { ops: vec![] });
301 assert_eq!(state, base_state());
302 }
303
304 #[test]
305 fn test_add_new_same_op() {
306 let mut page = ExportPage {
307 ops: vec![valid_op()],
308 };
309 let mut state = base_state();
310 state.apply_to_next(&mut page);
311 assert_eq!(state, base_state());
312 }
313
314 #[test]
315 fn test_add_new_same_time() {
316 // make an op with a different OpKey
317 let mut op = valid_op();
318 op.cid = "cid2".to_string();
319 let mut page = ExportPage { ops: vec![op] };
320
321 let mut state = base_state();
322 state.apply_to_next(&mut page);
323 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
324 assert_eq!(
325 state.keys_at,
326 vec![
327 OpKey {
328 cid: "cid".to_string(),
329 did: "did".to_string(),
330 },
331 OpKey {
332 cid: "cid2".to_string(),
333 did: "did".to_string(),
334 },
335 ]
336 );
337 }
338
339 #[test]
340 fn test_add_new_same_time_dup_before() {
341 // make an op with a different OpKey
342 let mut op = valid_op();
343 op.cid = "cid2".to_string();
344 let mut page = ExportPage {
345 ops: vec![valid_op(), op],
346 };
347
348 let mut state = base_state();
349 state.apply_to_next(&mut page);
350 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
351 assert_eq!(
352 state.keys_at,
353 vec![
354 OpKey {
355 cid: "cid".to_string(),
356 did: "did".to_string(),
357 },
358 OpKey {
359 cid: "cid2".to_string(),
360 did: "did".to_string(),
361 },
362 ]
363 );
364 }
365
366 #[test]
367 fn test_add_new_same_time_dup_after() {
368 // make an op with a different OpKey
369 let mut op = valid_op();
370 op.cid = "cid2".to_string();
371 let mut page = ExportPage {
372 ops: vec![op, valid_op()],
373 };
374
375 let mut state = base_state();
376 state.apply_to_next(&mut page);
377 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap());
378 assert_eq!(
379 state.keys_at,
380 vec![
381 OpKey {
382 cid: "cid".to_string(),
383 did: "did".to_string(),
384 },
385 OpKey {
386 cid: "cid2".to_string(),
387 did: "did".to_string(),
388 },
389 ]
390 );
391 }
392
393 #[test]
394 fn test_add_new_next_time() {
395 let mut page = ExportPage {
396 ops: vec![next_op()],
397 };
398 let mut state = base_state();
399 state.apply_to_next(&mut page);
400 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
401 assert_eq!(
402 state.keys_at,
403 vec![OpKey {
404 cid: "cidnext".to_string(),
405 did: "didnext".to_string(),
406 },]
407 );
408 }
409
410 #[test]
411 fn test_add_new_next_time_with_dup() {
412 let mut page = ExportPage {
413 ops: vec![valid_op(), next_op()],
414 };
415 let mut state = base_state();
416 state.apply_to_next(&mut page);
417 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
418 assert_eq!(
419 state.keys_at,
420 vec![OpKey {
421 cid: "cidnext".to_string(),
422 did: "didnext".to_string(),
423 },]
424 );
425 assert_eq!(page.ops.len(), 1);
426 assert_eq!(page.ops[0], next_op());
427 }
428
429 #[test]
430 fn test_add_new_next_time_with_dup_and_new_prev_same_time() {
431 // make an op with a different OpKey
432 let mut op = valid_op();
433 op.cid = "cid2".to_string();
434
435 let mut page = ExportPage {
436 ops: vec![
437 valid_op(), // should get dropped
438 op.clone(), // should be kept
439 next_op(),
440 ],
441 };
442 let mut state = base_state();
443 state.apply_to_next(&mut page);
444 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
445 assert_eq!(
446 state.keys_at,
447 vec![OpKey {
448 cid: "cidnext".to_string(),
449 did: "didnext".to_string(),
450 },]
451 );
452 assert_eq!(page.ops.len(), 2);
453 assert_eq!(page.ops[0], op);
454 assert_eq!(page.ops[1], next_op());
455 }
456
457 #[test]
458 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() {
459 // make an op with a different OpKey
460 let mut op = valid_op();
461 op.cid = "cid2".to_string();
462
463 let mut page = ExportPage {
464 ops: vec![
465 op.clone(), // should be kept
466 valid_op(), // should get dropped
467 next_op(),
468 ],
469 };
470 let mut state = base_state();
471 state.apply_to_next(&mut page);
472 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap());
473 assert_eq!(
474 state.keys_at,
475 vec![OpKey {
476 cid: "cidnext".to_string(),
477 did: "didnext".to_string(),
478 },]
479 );
480 assert_eq!(page.ops.len(), 2);
481 assert_eq!(page.ops[0], op);
482 assert_eq!(page.ops[1], next_op());
483 }
484}