forked from
parakeet.at/parakeet
Rust AppView - highly experimental!
1use crate::runtime::{Arc, Mutex};
2use crate::{yield_fn, BatchFn, WaitForWorkFn};
3use std::collections::{HashMap, HashSet};
4use std::fmt::Debug;
5use std::hash::{BuildHasher, Hash};
6use std::iter::IntoIterator;
7
8pub trait Cache {
9 type Key;
10 type Val;
11 fn get(&mut self, key: &Self::Key) -> Option<&Self::Val>;
12 fn insert(&mut self, key: Self::Key, val: Self::Val);
13 fn remove(&mut self, key: &Self::Key) -> Option<Self::Val>;
14 fn clear(&mut self);
15}
16
17impl<K, V, S: BuildHasher> Cache for HashMap<K, V, S>
18where
19 K: Eq + Hash,
20{
21 type Key = K;
22 type Val = V;
23
24 #[inline]
25 fn get(&mut self, key: &K) -> Option<&V> {
26 HashMap::get(self, key)
27 }
28
29 #[inline]
30 fn insert(&mut self, key: K, val: V) {
31 HashMap::insert(self, key, val);
32 }
33
34 #[inline]
35 fn remove(&mut self, key: &K) -> Option<V> {
36 HashMap::remove(self, key)
37 }
38
39 #[inline]
40 fn clear(&mut self) {
41 HashMap::clear(self)
42 }
43}
44
45struct State<K, V, C = HashMap<K, V>>
46where
47 C: Cache<Key = K, Val = V>,
48{
49 completed: C,
50 pending: HashSet<K>,
51}
52
53impl<K: Eq + Hash, V, C> State<K, V, C>
54where
55 C: Cache<Key = K, Val = V>,
56{
57 fn with_cache(cache: C) -> Self {
58 State {
59 completed: cache,
60 pending: HashSet::new(),
61 }
62 }
63}
64
65pub struct Loader<K, V, F, C = HashMap<K, V>>
66where
67 K: Eq + Hash + Clone,
68 V: Clone,
69 F: BatchFn<K, V>,
70 C: Cache<Key = K, Val = V>,
71{
72 state: Arc<Mutex<State<K, V, C>>>,
73 load_fn: Arc<Mutex<F>>,
74 wait_for_work_fn: Arc<dyn WaitForWorkFn>,
75 max_batch_size: usize,
76}
77
78impl<K, V, F, C> Clone for Loader<K, V, F, C>
79where
80 K: Eq + Hash + Clone,
81 V: Clone,
82 F: BatchFn<K, V>,
83 C: Cache<Key = K, Val = V>,
84{
85 fn clone(&self) -> Self {
86 Loader {
87 state: self.state.clone(),
88 max_batch_size: self.max_batch_size,
89 load_fn: self.load_fn.clone(),
90 wait_for_work_fn: self.wait_for_work_fn.clone(),
91 }
92 }
93}
94
95#[allow(clippy::implicit_hasher)]
96impl<K, V, F> Loader<K, V, F, HashMap<K, V>>
97where
98 K: Eq + Hash + Clone + Debug,
99 V: Clone,
100 F: BatchFn<K, V>,
101{
102 pub fn new(load_fn: F) -> Loader<K, V, F, HashMap<K, V>> {
103 Loader::with_cache(load_fn, HashMap::new())
104 }
105}
106
107impl<K, V, F, C> Loader<K, V, F, C>
108where
109 K: Eq + Hash + Clone + Debug,
110 V: Clone,
111 F: BatchFn<K, V>,
112 C: Cache<Key = K, Val = V>,
113{
114 pub fn with_cache(load_fn: F, cache: C) -> Loader<K, V, F, C> {
115 Loader {
116 state: Arc::new(Mutex::new(State::with_cache(cache))),
117 load_fn: Arc::new(Mutex::new(load_fn)),
118 max_batch_size: 200,
119 wait_for_work_fn: Arc::new(yield_fn(10)),
120 }
121 }
122
123 pub fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
124 self.max_batch_size = max_batch_size;
125 self
126 }
127
128 pub fn with_yield_count(mut self, yield_count: usize) -> Self {
129 self.wait_for_work_fn = Arc::new(yield_fn(yield_count));
130 self
131 }
132
133 /// Replaces the yielding for work behavior with an arbitrary future. Rather than yielding
134 /// the runtime repeatedly this will generate and `.await` a future of your choice.
135 /// ***This is incompatible with*** [`Self::with_yield_count()`].
136 pub fn with_custom_wait_for_work(mut self, wait_for_work_fn: impl WaitForWorkFn) -> Self {
137 self.wait_for_work_fn = Arc::new(wait_for_work_fn);
138 self
139 }
140
141 pub fn max_batch_size(&self) -> usize {
142 self.max_batch_size
143 }
144
145 pub async fn load(&self, key: K) -> Option<V> {
146 let mut state = self.state.lock().await;
147 if let Some(v) = state.completed.get(&key) {
148 return Some((*v).clone());
149 }
150
151 if !state.pending.contains(&key) {
152 state.pending.insert(key.clone());
153 if state.pending.len() >= self.max_batch_size {
154 let keys = state.pending.drain().collect::<Vec<K>>();
155 let mut load_fn = self.load_fn.lock().await;
156 let load_ret = load_fn.load(keys.as_ref()).await;
157 drop(load_fn);
158 for (k, v) in load_ret.into_iter() {
159 state.completed.insert(k, v);
160 }
161 return state.completed.get(&key).cloned();
162 }
163 }
164 drop(state);
165
166 (self.wait_for_work_fn)().await;
167
168 let mut state = self.state.lock().await;
169 if let Some(v) = state.completed.get(&key) {
170 return Some((*v).clone());
171 }
172
173 if !state.pending.is_empty() {
174 let keys = state.pending.drain().collect::<Vec<K>>();
175 let mut load_fn = self.load_fn.lock().await;
176 let load_ret = load_fn.load(keys.as_ref()).await;
177 drop(load_fn);
178 for (k, v) in load_ret.into_iter() {
179 state.completed.insert(k, v);
180 }
181 }
182
183 state.completed.get(&key).cloned()
184 }
185
186 pub async fn load_many(&self, keys: Vec<K>) -> HashMap<K, V> {
187 let mut state = self.state.lock().await;
188 let mut ret = HashMap::new();
189 let mut rest = Vec::new();
190 for key in keys.into_iter() {
191 if let Some(v) = state.completed.get(&key).cloned() {
192 ret.insert(key, v);
193 continue;
194 }
195 if !state.pending.contains(&key) {
196 state.pending.insert(key.clone());
197 if state.pending.len() >= self.max_batch_size {
198 let keys = state.pending.drain().collect::<Vec<K>>();
199 let mut load_fn = self.load_fn.lock().await;
200 let load_ret = load_fn.load(keys.as_ref()).await;
201 drop(load_fn);
202 for (k, v) in load_ret.into_iter() {
203 state.completed.insert(k, v);
204 }
205 }
206 }
207 rest.push(key);
208 }
209 drop(state);
210
211 (self.wait_for_work_fn)().await;
212
213 if !rest.is_empty() {
214 let mut state = self.state.lock().await;
215 if !state.pending.is_empty() {
216 let keys = state.pending.drain().collect::<Vec<K>>();
217 let mut load_fn = self.load_fn.lock().await;
218 let load_ret = load_fn.load(keys.as_ref()).await;
219 drop(load_fn);
220 for (k, v) in load_ret.into_iter() {
221 state.completed.insert(k, v);
222 }
223 }
224
225 for key in rest.into_iter() {
226 if let Some(v) = state.completed.get(&key).cloned() {
227 ret.insert(key, v);
228 }
229 }
230 }
231
232 ret
233 }
234
235 pub async fn prime(&self, key: K, val: V) {
236 let mut state = self.state.lock().await;
237 state.completed.insert(key, val);
238 }
239
240 pub async fn prime_many(&self, values: impl IntoIterator<Item = (K, V)>) {
241 let mut state = self.state.lock().await;
242 for (k, v) in values.into_iter() {
243 state.completed.insert(k, v);
244 }
245 }
246
247 pub async fn clear(&self, key: K) {
248 let mut state = self.state.lock().await;
249 state.completed.remove(&key);
250 }
251
252 pub async fn clear_all(&self) {
253 let mut state = self.state.lock().await;
254 state.completed.clear()
255 }
256}