Server tools to backfill, tail, mirror, and verify PLC logs
at main 73 lines 2.0 kB view raw
1use std::error::Error; 2use std::sync::Arc; 3use std::time::{Duration, Instant}; 4use tokio::sync::Mutex; 5 6pub trait Fetcher<T> { 7 fn fetch(&self) -> impl Future<Output = Result<T, Box<dyn Error>>>; 8} 9 10#[derive(Debug)] 11struct ExpiringValue<T: Clone> { 12 value: T, 13 expires: Instant, 14} 15 16impl<T: Clone> ExpiringValue<T> { 17 fn get(&self, now: Instant) -> Option<T> { 18 if now <= self.expires { 19 log::trace!("returning val (fresh for {:?})", self.expires - now); 20 Some(self.value.clone()) 21 } else { 22 log::trace!("hiding expired val"); 23 None 24 } 25 } 26} 27 28// TODO: generic over the fetcher's actual error type 29#[derive(Clone)] 30pub struct CachedValue<T: Clone, F: Fetcher<T>> { 31 latest: Arc<Mutex<Option<ExpiringValue<T>>>>, 32 fetcher: F, 33 validitiy: Duration, 34} 35 36impl<T: Clone, F: Fetcher<T>> CachedValue<T, F> { 37 pub fn new(f: F, validitiy: Duration) -> Self { 38 Self { 39 latest: Default::default(), 40 fetcher: f, 41 validitiy, 42 } 43 } 44 pub async fn get(&self) -> Result<T, Box<dyn Error>> { 45 let now = Instant::now(); 46 return self.get_impl(now).await; 47 } 48 async fn get_impl(&self, now: Instant) -> Result<T, Box<dyn Error>> { 49 let mut val = self.latest.lock().await; 50 if let Some(v) = val.as_ref().and_then(|v| v.get(now)) { 51 return Ok(v); 52 } 53 log::debug!( 54 "value {}, fetching...", 55 if val.is_some() { 56 "expired" 57 } else { 58 "not present" 59 } 60 ); 61 let new = self 62 .fetcher 63 .fetch() 64 .await 65 .inspect_err(|e| log::warn!("value fetch failed, next access will retry: {e}"))?; 66 log::debug!("fetched ok, saving a copy for cache."); 67 *val = Some(ExpiringValue { 68 value: new.clone(), 69 expires: now + self.validitiy, 70 }); 71 Ok(new) 72 } 73}