Server tools to backfill, tail, mirror, and verify PLC logs
at main 484 lines 14 kB view raw
1use crate::{CLIENT, Dt, ExportPage, Op, OpKey}; 2use reqwest::Url; 3use std::time::Duration; 4use thiserror::Error; 5use tokio::sync::mpsc; 6 7#[derive(Debug, Error)] 8pub enum GetPageError { 9 #[error(transparent)] 10 Reqwest(#[from] reqwest::Error), 11 #[error(transparent)] 12 ReqwestMiddleware(#[from] reqwest_middleware::Error), 13 #[error(transparent)] 14 Serde(#[from] serde_json::Error), 15} 16 17/// ops are primary-keyed by (did, cid) 18/// plc orders by `created_at` but does not guarantee distinct times per op 19/// we assume that the order will at least be deterministic: this may be unsound 20#[derive(Debug, PartialEq)] 21pub struct LastOp { 22 pub created_at: Dt, // any op greater is definitely not duplicated 23 pk: (String, String), // did, cid 24} 25 26impl From<Op> for LastOp { 27 fn from(op: Op) -> Self { 28 Self { 29 created_at: op.created_at, 30 pk: (op.did, op.cid), 31 } 32 } 33} 34 35impl From<&Op> for LastOp { 36 fn from(op: &Op) -> Self { 37 Self { 38 created_at: op.created_at, 39 pk: (op.did.clone(), op.cid.clone()), 40 } 41 } 42} 43 44// bit of a hack 45impl From<Dt> for LastOp { 46 fn from(dt: Dt) -> Self { 47 Self { 48 created_at: dt, 49 pk: ("".to_string(), "".to_string()), 50 } 51 } 52} 53 54/// State for removing duplicates ops between PLC export page boundaries 55#[derive(Debug, PartialEq)] 56pub struct PageBoundaryState { 57 /// The previous page's last timestamp 58 /// 59 /// Duplicate ops from /export only occur for the same exact timestamp 60 pub last_at: Dt, 61 /// The previous page's ops at its last timestamp 62 keys_at: Vec<OpKey>, // expected to ~always be length one 63} 64 65impl PageBoundaryState { 66 /// Initialize the boundary state with a PLC page 67 pub fn new(page: &ExportPage) -> Option<Self> { 68 // grab the very last op 69 let (last_at, last_key) = page.ops.last().map(|op| (op.created_at, op.into()))?; 70 71 // set initial state 72 let mut me = Self { 73 last_at, 74 keys_at: vec![last_key], 75 }; 76 77 // and make sure all keys at this time are captured from the back 78 me.capture_nth_last_at(page, last_at, 1); 79 80 Some(me) 81 } 82 /// Apply the deduplication and update state 83 /// 84 /// The beginning of the page will be modified to remove duplicates from the 85 /// previous page. 86 /// 87 /// The end of the page is inspected to update the deduplicator state for 88 /// the next page. 89 fn apply_to_next(&mut self, page: &mut ExportPage) { 90 // walk ops forward, kicking previously-seen ops until created_at advances 91 let to_remove: Vec<usize> = page 92 .ops 93 .iter() 94 .enumerate() 95 .take_while(|(_, op)| op.created_at == self.last_at) 96 .filter(|(_, op)| self.keys_at.contains(&(*op).into())) 97 .map(|(i, _)| i) 98 .collect(); 99 100 // actually remove them. last to first so indices don't shift 101 for dup_idx in to_remove.into_iter().rev() { 102 page.ops.remove(dup_idx); 103 } 104 105 // grab the very last op 106 let Some((last_at, last_key)) = page.ops.last().map(|op| (op.created_at, op.into())) else { 107 // there are no ops left? oop. bail. 108 // last_at and existing keys remain in tact 109 return; 110 }; 111 112 // reset state (as long as time actually moved forward on this page) 113 if last_at > self.last_at { 114 self.last_at = last_at; 115 self.keys_at = vec![last_key]; 116 } else { 117 // weird cases: either time didn't move (fine...) or went backwards (not fine) 118 assert_eq!(last_at, self.last_at, "time moved backwards on a page"); 119 self.keys_at.push(last_key); 120 } 121 // and make sure all keys at this time are captured from the back 122 self.capture_nth_last_at(page, last_at, 1); 123 } 124 125 /// walk backwards from 2nd last and collect keys until created_at changes 126 fn capture_nth_last_at(&mut self, page: &ExportPage, last_at: Dt, skips: usize) { 127 page.ops 128 .iter() 129 .rev() 130 .skip(skips) 131 .take_while(|op| op.created_at == last_at) 132 .for_each(|op| { 133 self.keys_at.push(op.into()); 134 }); 135 } 136} 137 138/// Get one PLC export page 139/// 140/// Extracts the final op so it can be used to fetch the following page 141pub async fn get_page(url: Url) -> Result<(ExportPage, Option<LastOp>), GetPageError> { 142 log::trace!("Getting page: {url}"); 143 144 let ops: Vec<Op> = CLIENT 145 .get(url) 146 .send() 147 .await? 148 .error_for_status()? 149 .text() 150 .await? 151 .trim() 152 .split('\n') 153 .filter_map(|s| { 154 serde_json::from_str::<Op>(s) 155 .inspect_err(|e| { 156 if !s.is_empty() { 157 log::warn!("failed to parse op: {e} ({s})") 158 } 159 }) 160 .ok() 161 }) 162 .collect(); 163 164 let last_op = ops.last().map(Into::into); 165 166 Ok((ExportPage { ops }, last_op)) 167} 168 169/// Poll an upstream PLC server for new ops 170/// 171/// Pages of operations are written to the `dest` channel. 172/// 173/// ```no_run 174/// # #[tokio::main] 175/// # async fn main() { 176/// use allegedly::{ExportPage, Op, poll_upstream}; 177/// 178/// let after = Some(chrono::Utc::now()); 179/// let upstream = "https://plc.wtf/export".parse().unwrap(); 180/// let throttle = std::time::Duration::from_millis(300); 181/// 182/// let (tx, mut rx) = tokio::sync::mpsc::channel(1); 183/// tokio::task::spawn(poll_upstream(after, upstream, throttle, tx)); 184/// 185/// while let Some(ExportPage { ops }) = rx.recv().await { 186/// println!("received {} plc ops", ops.len()); 187/// 188/// for Op { did, cid, operation, .. } in ops { 189/// // in this example we're alerting when changes are found for one 190/// // specific identity 191/// if did == "did:plc:hdhoaan3xa3jiuq4fg4mefid" { 192/// println!("Update found for {did}! cid={cid}\n -> operation: {}", operation.get()); 193/// } 194/// } 195/// } 196/// # } 197/// ``` 198pub async fn poll_upstream( 199 after: Option<Dt>, 200 base: Url, 201 throttle: Duration, 202 dest: mpsc::Sender<ExportPage>, 203) -> anyhow::Result<&'static str> { 204 log::info!("starting upstream poller at {base} after {after:?}"); 205 let mut tick = tokio::time::interval(throttle); 206 let mut prev_last: Option<LastOp> = after.map(Into::into); 207 let mut boundary_state: Option<PageBoundaryState> = None; 208 loop { 209 tick.tick().await; 210 211 let mut url = base.clone(); 212 if let Some(ref pl) = prev_last { 213 url.query_pairs_mut() 214 .append_pair("after", &pl.created_at.to_rfc3339()); 215 }; 216 217 let (mut page, next_last) = get_page(url).await?; 218 if let Some(ref mut state) = boundary_state { 219 state.apply_to_next(&mut page); 220 } else { 221 boundary_state = PageBoundaryState::new(&page); 222 } 223 if !page.is_empty() { 224 match dest.try_send(page) { 225 Ok(()) => {} 226 Err(mpsc::error::TrySendError::Full(page)) => { 227 log::warn!("export: destination channel full, awaiting..."); 228 dest.send(page).await?; 229 } 230 e => e?, 231 }; 232 } 233 234 prev_last = next_last.or(prev_last); 235 } 236} 237 238#[cfg(test)] 239mod test { 240 use super::*; 241 242 const FIVES_TS: i64 = 1431648000; 243 const NEXT_TS: i64 = 1431648001; 244 245 fn valid_op() -> Op { 246 serde_json::from_value(serde_json::json!({ 247 "did": "did", 248 "cid": "cid", 249 "createdAt": "2015-05-15T00:00:00Z", 250 "nullified": false, 251 "operation": {}, 252 })) 253 .unwrap() 254 } 255 256 fn next_op() -> Op { 257 serde_json::from_value(serde_json::json!({ 258 "did": "didnext", 259 "cid": "cidnext", 260 "createdAt": "2015-05-15T00:00:01Z", 261 "nullified": false, 262 "operation": {}, 263 })) 264 .unwrap() 265 } 266 267 fn base_state() -> PageBoundaryState { 268 let page = ExportPage { 269 ops: vec![valid_op()], 270 }; 271 PageBoundaryState::new(&page).expect("to have a base page boundary state") 272 } 273 274 #[test] 275 fn test_boundary_new_empty() { 276 let page = ExportPage { ops: vec![] }; 277 let state = PageBoundaryState::new(&page); 278 assert!(state.is_none()); 279 } 280 281 #[test] 282 fn test_boundary_new_one_op() { 283 let page = ExportPage { 284 ops: vec![valid_op()], 285 }; 286 let state = PageBoundaryState::new(&page).unwrap(); 287 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 288 assert_eq!( 289 state.keys_at, 290 vec![OpKey { 291 cid: "cid".to_string(), 292 did: "did".to_string(), 293 }] 294 ); 295 } 296 297 #[test] 298 fn test_add_new_empty() { 299 let mut state = base_state(); 300 state.apply_to_next(&mut ExportPage { ops: vec![] }); 301 assert_eq!(state, base_state()); 302 } 303 304 #[test] 305 fn test_add_new_same_op() { 306 let mut page = ExportPage { 307 ops: vec![valid_op()], 308 }; 309 let mut state = base_state(); 310 state.apply_to_next(&mut page); 311 assert_eq!(state, base_state()); 312 } 313 314 #[test] 315 fn test_add_new_same_time() { 316 // make an op with a different OpKey 317 let mut op = valid_op(); 318 op.cid = "cid2".to_string(); 319 let mut page = ExportPage { ops: vec![op] }; 320 321 let mut state = base_state(); 322 state.apply_to_next(&mut page); 323 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 324 assert_eq!( 325 state.keys_at, 326 vec![ 327 OpKey { 328 cid: "cid".to_string(), 329 did: "did".to_string(), 330 }, 331 OpKey { 332 cid: "cid2".to_string(), 333 did: "did".to_string(), 334 }, 335 ] 336 ); 337 } 338 339 #[test] 340 fn test_add_new_same_time_dup_before() { 341 // make an op with a different OpKey 342 let mut op = valid_op(); 343 op.cid = "cid2".to_string(); 344 let mut page = ExportPage { 345 ops: vec![valid_op(), op], 346 }; 347 348 let mut state = base_state(); 349 state.apply_to_next(&mut page); 350 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 351 assert_eq!( 352 state.keys_at, 353 vec![ 354 OpKey { 355 cid: "cid".to_string(), 356 did: "did".to_string(), 357 }, 358 OpKey { 359 cid: "cid2".to_string(), 360 did: "did".to_string(), 361 }, 362 ] 363 ); 364 } 365 366 #[test] 367 fn test_add_new_same_time_dup_after() { 368 // make an op with a different OpKey 369 let mut op = valid_op(); 370 op.cid = "cid2".to_string(); 371 let mut page = ExportPage { 372 ops: vec![op, valid_op()], 373 }; 374 375 let mut state = base_state(); 376 state.apply_to_next(&mut page); 377 assert_eq!(state.last_at, Dt::from_timestamp(FIVES_TS, 0).unwrap()); 378 assert_eq!( 379 state.keys_at, 380 vec![ 381 OpKey { 382 cid: "cid".to_string(), 383 did: "did".to_string(), 384 }, 385 OpKey { 386 cid: "cid2".to_string(), 387 did: "did".to_string(), 388 }, 389 ] 390 ); 391 } 392 393 #[test] 394 fn test_add_new_next_time() { 395 let mut page = ExportPage { 396 ops: vec![next_op()], 397 }; 398 let mut state = base_state(); 399 state.apply_to_next(&mut page); 400 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 401 assert_eq!( 402 state.keys_at, 403 vec![OpKey { 404 cid: "cidnext".to_string(), 405 did: "didnext".to_string(), 406 },] 407 ); 408 } 409 410 #[test] 411 fn test_add_new_next_time_with_dup() { 412 let mut page = ExportPage { 413 ops: vec![valid_op(), next_op()], 414 }; 415 let mut state = base_state(); 416 state.apply_to_next(&mut page); 417 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 418 assert_eq!( 419 state.keys_at, 420 vec![OpKey { 421 cid: "cidnext".to_string(), 422 did: "didnext".to_string(), 423 },] 424 ); 425 assert_eq!(page.ops.len(), 1); 426 assert_eq!(page.ops[0], next_op()); 427 } 428 429 #[test] 430 fn test_add_new_next_time_with_dup_and_new_prev_same_time() { 431 // make an op with a different OpKey 432 let mut op = valid_op(); 433 op.cid = "cid2".to_string(); 434 435 let mut page = ExportPage { 436 ops: vec![ 437 valid_op(), // should get dropped 438 op.clone(), // should be kept 439 next_op(), 440 ], 441 }; 442 let mut state = base_state(); 443 state.apply_to_next(&mut page); 444 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 445 assert_eq!( 446 state.keys_at, 447 vec![OpKey { 448 cid: "cidnext".to_string(), 449 did: "didnext".to_string(), 450 },] 451 ); 452 assert_eq!(page.ops.len(), 2); 453 assert_eq!(page.ops[0], op); 454 assert_eq!(page.ops[1], next_op()); 455 } 456 457 #[test] 458 fn test_add_new_next_time_with_dup_later_and_new_prev_same_time() { 459 // make an op with a different OpKey 460 let mut op = valid_op(); 461 op.cid = "cid2".to_string(); 462 463 let mut page = ExportPage { 464 ops: vec![ 465 op.clone(), // should be kept 466 valid_op(), // should get dropped 467 next_op(), 468 ], 469 }; 470 let mut state = base_state(); 471 state.apply_to_next(&mut page); 472 assert_eq!(state.last_at, Dt::from_timestamp(NEXT_TS, 0).unwrap()); 473 assert_eq!( 474 state.keys_at, 475 vec![OpKey { 476 cid: "cidnext".to_string(), 477 did: "didnext".to_string(), 478 },] 479 ); 480 assert_eq!(page.ops.len(), 2); 481 assert_eq!(page.ops[0], op); 482 assert_eq!(page.ops[1], next_op()); 483 } 484}