Rust AppView - highly experimental!
at nix 200 lines 6.2 kB view raw
1use crate::runtime::{Arc, Mutex}; 2use crate::{yield_fn, BatchFn, WaitForWorkFn}; 3use std::collections::{HashMap, HashSet}; 4use std::hash::Hash; 5use std::iter::IntoIterator; 6 7pub trait AsyncCache { 8 type Key; 9 type Val; 10 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val>; 11 async fn insert(&mut self, key: Self::Key, val: Self::Val); 12 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val>; 13 async fn clear(&mut self); 14} 15 16struct State<K, V, C> 17where 18 C: AsyncCache<Key = K, Val = V>, 19{ 20 completed: C, 21 pending: HashSet<K>, 22} 23 24impl<K: Eq + Hash, V, C> State<K, V, C> 25where 26 C: AsyncCache<Key = K, Val = V>, 27{ 28 fn with_cache(cache: C) -> Self { 29 State { 30 completed: cache, 31 pending: HashSet::new(), 32 } 33 } 34} 35 36#[derive(Clone)] 37pub struct Loader<K, V, F, C> 38where 39 K: Eq + Hash + Clone, 40 V: Clone, 41 F: BatchFn<K, V>, 42 C: AsyncCache<Key = K, Val = V>, 43{ 44 state: Arc<Mutex<State<K, V, C>>>, 45 load_fn: Arc<Mutex<F>>, 46 wait_for_work_fn: Arc<dyn WaitForWorkFn>, 47 max_batch_size: usize, 48} 49 50impl<K, V, F, C> Loader<K, V, F, C> 51where 52 K: Eq + Hash + Clone, 53 V: Clone, 54 F: BatchFn<K, V>, 55 C: AsyncCache<Key = K, Val = V>, 56{ 57 pub fn new(load_fn: F, cache: C) -> Self { 58 Loader { 59 state: Arc::new(Mutex::new(State::with_cache(cache))), 60 load_fn: Arc::new(Mutex::new(load_fn)), 61 max_batch_size: 200, 62 wait_for_work_fn: Arc::new(yield_fn(10)), 63 } 64 } 65 66 pub fn with_max_batch_size(mut self, max_batch_size: usize) -> Self { 67 self.max_batch_size = max_batch_size; 68 self 69 } 70 71 pub fn with_yield_count(mut self, yield_count: usize) -> Self { 72 self.wait_for_work_fn = Arc::new(yield_fn(yield_count)); 73 self 74 } 75 76 /// Replaces the yielding for work behavior with an arbitrary future. Rather than yielding 77 /// the runtime repeatedly this will generate and `.await` a future of your choice. 78 /// ***This is incompatible with*** [`Self::with_yield_count()`]. 79 pub fn with_custom_wait_for_work(mut self, wait_for_work_fn: impl WaitForWorkFn) -> Self { 80 self.wait_for_work_fn = Arc::new(wait_for_work_fn); 81 self 82 } 83 84 pub fn max_batch_size(&self) -> usize { 85 self.max_batch_size 86 } 87 88 pub async fn load(&self, key: K) -> Option<V> { 89 let mut state = self.state.lock().await; 90 if let Some(v) = state.completed.get(&key).await { 91 return Some(v.clone()); 92 } 93 94 if !state.pending.contains(&key) { 95 state.pending.insert(key.clone()); 96 if state.pending.len() >= self.max_batch_size { 97 let keys = state.pending.drain().collect::<Vec<K>>(); 98 let mut load_fn = self.load_fn.lock().await; 99 let load_ret = load_fn.load(keys.as_ref()).await; 100 drop(load_fn); 101 for (k, v) in load_ret.into_iter() { 102 state.completed.insert(k, v).await; 103 } 104 return state.completed.get(&key).await.clone(); 105 } 106 } 107 drop(state); 108 109 (self.wait_for_work_fn)().await; 110 111 let mut state = self.state.lock().await; 112 if let Some(v) = state.completed.get(&key).await { 113 return Some(v.clone()); 114 } 115 116 if !state.pending.is_empty() { 117 let keys = state.pending.drain().collect::<Vec<K>>(); 118 let mut load_fn = self.load_fn.lock().await; 119 let load_ret = load_fn.load(keys.as_ref()).await; 120 drop(load_fn); 121 for (k, v) in load_ret.into_iter() { 122 state.completed.insert(k, v).await; 123 } 124 } 125 126 state.completed.get(&key).await.clone() 127 } 128 129 pub async fn load_many(&self, keys: Vec<K>) -> HashMap<K, V> { 130 let mut state = self.state.lock().await; 131 let mut ret = HashMap::new(); 132 let mut rest = Vec::new(); 133 for key in keys.into_iter() { 134 if let Some(v) = state.completed.get(&key).await.clone() { 135 ret.insert(key, v); 136 continue; 137 } 138 if !state.pending.contains(&key) { 139 state.pending.insert(key.clone()); 140 141 if state.pending.len() >= self.max_batch_size { 142 let keys = state.pending.drain().collect::<Vec<K>>(); 143 let mut load_fn = self.load_fn.lock().await; 144 let load_ret = load_fn.load(keys.as_ref()).await; 145 drop(load_fn); 146 for (k, v) in load_ret.into_iter() { 147 state.completed.insert(k, v).await; 148 } 149 } 150 } 151 rest.push(key); 152 } 153 drop(state); 154 155 (self.wait_for_work_fn)().await; 156 157 if !rest.is_empty() { 158 let mut state = self.state.lock().await; 159 if !state.pending.is_empty() { 160 let keys = state.pending.drain().collect::<Vec<K>>(); 161 let mut load_fn = self.load_fn.lock().await; 162 let load_ret = load_fn.load(keys.as_ref()).await; 163 drop(load_fn); 164 for (k, v) in load_ret.into_iter() { 165 state.completed.insert(k, v).await; 166 } 167 } 168 169 for key in rest.into_iter() { 170 if let Some(v) = state.completed.get(&key).await.clone() { 171 ret.insert(key, v); 172 } 173 } 174 } 175 176 ret 177 } 178 179 pub async fn prime(&self, key: K, val: V) { 180 let mut state = self.state.lock().await; 181 state.completed.insert(key, val).await; 182 } 183 184 pub async fn prime_many(&self, values: impl IntoIterator<Item = (K, V)>) { 185 let mut state = self.state.lock().await; 186 for (k, v) in values.into_iter() { 187 state.completed.insert(k, v).await; 188 } 189 } 190 191 pub async fn clear(&self, key: K) { 192 let mut state = self.state.lock().await; 193 state.completed.remove(&key).await; 194 } 195 196 pub async fn clear_all(&self) { 197 let mut state = self.state.lock().await; 198 state.completed.clear().await 199 } 200}