Rust AppView - highly experimental!
at nix 256 lines 7.4 kB view raw
1use crate::runtime::{Arc, Mutex}; 2use crate::{yield_fn, BatchFn, WaitForWorkFn}; 3use std::collections::{HashMap, HashSet}; 4use std::fmt::Debug; 5use std::hash::{BuildHasher, Hash}; 6use std::iter::IntoIterator; 7 8pub trait Cache { 9 type Key; 10 type Val; 11 fn get(&mut self, key: &Self::Key) -> Option<&Self::Val>; 12 fn insert(&mut self, key: Self::Key, val: Self::Val); 13 fn remove(&mut self, key: &Self::Key) -> Option<Self::Val>; 14 fn clear(&mut self); 15} 16 17impl<K, V, S: BuildHasher> Cache for HashMap<K, V, S> 18where 19 K: Eq + Hash, 20{ 21 type Key = K; 22 type Val = V; 23 24 #[inline] 25 fn get(&mut self, key: &K) -> Option<&V> { 26 HashMap::get(self, key) 27 } 28 29 #[inline] 30 fn insert(&mut self, key: K, val: V) { 31 HashMap::insert(self, key, val); 32 } 33 34 #[inline] 35 fn remove(&mut self, key: &K) -> Option<V> { 36 HashMap::remove(self, key) 37 } 38 39 #[inline] 40 fn clear(&mut self) { 41 HashMap::clear(self) 42 } 43} 44 45struct State<K, V, C = HashMap<K, V>> 46where 47 C: Cache<Key = K, Val = V>, 48{ 49 completed: C, 50 pending: HashSet<K>, 51} 52 53impl<K: Eq + Hash, V, C> State<K, V, C> 54where 55 C: Cache<Key = K, Val = V>, 56{ 57 fn with_cache(cache: C) -> Self { 58 State { 59 completed: cache, 60 pending: HashSet::new(), 61 } 62 } 63} 64 65pub struct Loader<K, V, F, C = HashMap<K, V>> 66where 67 K: Eq + Hash + Clone, 68 V: Clone, 69 F: BatchFn<K, V>, 70 C: Cache<Key = K, Val = V>, 71{ 72 state: Arc<Mutex<State<K, V, C>>>, 73 load_fn: Arc<Mutex<F>>, 74 wait_for_work_fn: Arc<dyn WaitForWorkFn>, 75 max_batch_size: usize, 76} 77 78impl<K, V, F, C> Clone for Loader<K, V, F, C> 79where 80 K: Eq + Hash + Clone, 81 V: Clone, 82 F: BatchFn<K, V>, 83 C: Cache<Key = K, Val = V>, 84{ 85 fn clone(&self) -> Self { 86 Loader { 87 state: self.state.clone(), 88 max_batch_size: self.max_batch_size, 89 load_fn: self.load_fn.clone(), 90 wait_for_work_fn: self.wait_for_work_fn.clone(), 91 } 92 } 93} 94 95#[allow(clippy::implicit_hasher)] 96impl<K, V, F> Loader<K, V, F, HashMap<K, V>> 97where 98 K: Eq + Hash + Clone + Debug, 99 V: Clone, 100 F: BatchFn<K, V>, 101{ 102 pub fn new(load_fn: F) -> Loader<K, V, F, HashMap<K, V>> { 103 Loader::with_cache(load_fn, HashMap::new()) 104 } 105} 106 107impl<K, V, F, C> Loader<K, V, F, C> 108where 109 K: Eq + Hash + Clone + Debug, 110 V: Clone, 111 F: BatchFn<K, V>, 112 C: Cache<Key = K, Val = V>, 113{ 114 pub fn with_cache(load_fn: F, cache: C) -> Loader<K, V, F, C> { 115 Loader { 116 state: Arc::new(Mutex::new(State::with_cache(cache))), 117 load_fn: Arc::new(Mutex::new(load_fn)), 118 max_batch_size: 200, 119 wait_for_work_fn: Arc::new(yield_fn(10)), 120 } 121 } 122 123 pub fn with_max_batch_size(mut self, max_batch_size: usize) -> Self { 124 self.max_batch_size = max_batch_size; 125 self 126 } 127 128 pub fn with_yield_count(mut self, yield_count: usize) -> Self { 129 self.wait_for_work_fn = Arc::new(yield_fn(yield_count)); 130 self 131 } 132 133 /// Replaces the yielding for work behavior with an arbitrary future. Rather than yielding 134 /// the runtime repeatedly this will generate and `.await` a future of your choice. 135 /// ***This is incompatible with*** [`Self::with_yield_count()`]. 136 pub fn with_custom_wait_for_work(mut self, wait_for_work_fn: impl WaitForWorkFn) -> Self { 137 self.wait_for_work_fn = Arc::new(wait_for_work_fn); 138 self 139 } 140 141 pub fn max_batch_size(&self) -> usize { 142 self.max_batch_size 143 } 144 145 pub async fn load(&self, key: K) -> Option<V> { 146 let mut state = self.state.lock().await; 147 if let Some(v) = state.completed.get(&key) { 148 return Some((*v).clone()); 149 } 150 151 if !state.pending.contains(&key) { 152 state.pending.insert(key.clone()); 153 if state.pending.len() >= self.max_batch_size { 154 let keys = state.pending.drain().collect::<Vec<K>>(); 155 let mut load_fn = self.load_fn.lock().await; 156 let load_ret = load_fn.load(keys.as_ref()).await; 157 drop(load_fn); 158 for (k, v) in load_ret.into_iter() { 159 state.completed.insert(k, v); 160 } 161 return state.completed.get(&key).cloned(); 162 } 163 } 164 drop(state); 165 166 (self.wait_for_work_fn)().await; 167 168 let mut state = self.state.lock().await; 169 if let Some(v) = state.completed.get(&key) { 170 return Some((*v).clone()); 171 } 172 173 if !state.pending.is_empty() { 174 let keys = state.pending.drain().collect::<Vec<K>>(); 175 let mut load_fn = self.load_fn.lock().await; 176 let load_ret = load_fn.load(keys.as_ref()).await; 177 drop(load_fn); 178 for (k, v) in load_ret.into_iter() { 179 state.completed.insert(k, v); 180 } 181 } 182 183 state.completed.get(&key).cloned() 184 } 185 186 pub async fn load_many(&self, keys: Vec<K>) -> HashMap<K, V> { 187 let mut state = self.state.lock().await; 188 let mut ret = HashMap::new(); 189 let mut rest = Vec::new(); 190 for key in keys.into_iter() { 191 if let Some(v) = state.completed.get(&key).cloned() { 192 ret.insert(key, v); 193 continue; 194 } 195 if !state.pending.contains(&key) { 196 state.pending.insert(key.clone()); 197 if state.pending.len() >= self.max_batch_size { 198 let keys = state.pending.drain().collect::<Vec<K>>(); 199 let mut load_fn = self.load_fn.lock().await; 200 let load_ret = load_fn.load(keys.as_ref()).await; 201 drop(load_fn); 202 for (k, v) in load_ret.into_iter() { 203 state.completed.insert(k, v); 204 } 205 } 206 } 207 rest.push(key); 208 } 209 drop(state); 210 211 (self.wait_for_work_fn)().await; 212 213 if !rest.is_empty() { 214 let mut state = self.state.lock().await; 215 if !state.pending.is_empty() { 216 let keys = state.pending.drain().collect::<Vec<K>>(); 217 let mut load_fn = self.load_fn.lock().await; 218 let load_ret = load_fn.load(keys.as_ref()).await; 219 drop(load_fn); 220 for (k, v) in load_ret.into_iter() { 221 state.completed.insert(k, v); 222 } 223 } 224 225 for key in rest.into_iter() { 226 if let Some(v) = state.completed.get(&key).cloned() { 227 ret.insert(key, v); 228 } 229 } 230 } 231 232 ret 233 } 234 235 pub async fn prime(&self, key: K, val: V) { 236 let mut state = self.state.lock().await; 237 state.completed.insert(key, val); 238 } 239 240 pub async fn prime_many(&self, values: impl IntoIterator<Item = (K, V)>) { 241 let mut state = self.state.lock().await; 242 for (k, v) in values.into_iter() { 243 state.completed.insert(k, v); 244 } 245 } 246 247 pub async fn clear(&self, key: K) { 248 let mut state = self.state.lock().await; 249 state.completed.remove(&key); 250 } 251 252 pub async fn clear_all(&self) { 253 let mut state = self.state.lock().await; 254 state.completed.clear() 255 } 256}