forked from
parakeet.at/parakeet
Rust AppView - highly experimental!
1use crate::runtime::{Arc, Mutex};
2use crate::{yield_fn, BatchFn, WaitForWorkFn};
3use std::collections::{HashMap, HashSet};
4use std::hash::Hash;
5use std::iter::IntoIterator;
6
7pub trait AsyncCache {
8 type Key;
9 type Val;
10 async fn get(&mut self, key: &Self::Key) -> Option<Self::Val>;
11 async fn insert(&mut self, key: Self::Key, val: Self::Val);
12 async fn remove(&mut self, key: &Self::Key) -> Option<Self::Val>;
13 async fn clear(&mut self);
14}
15
16struct State<K, V, C>
17where
18 C: AsyncCache<Key = K, Val = V>,
19{
20 completed: C,
21 pending: HashSet<K>,
22}
23
24impl<K: Eq + Hash, V, C> State<K, V, C>
25where
26 C: AsyncCache<Key = K, Val = V>,
27{
28 fn with_cache(cache: C) -> Self {
29 State {
30 completed: cache,
31 pending: HashSet::new(),
32 }
33 }
34}
35
36#[derive(Clone)]
37pub struct Loader<K, V, F, C>
38where
39 K: Eq + Hash + Clone,
40 V: Clone,
41 F: BatchFn<K, V>,
42 C: AsyncCache<Key = K, Val = V>,
43{
44 state: Arc<Mutex<State<K, V, C>>>,
45 load_fn: Arc<Mutex<F>>,
46 wait_for_work_fn: Arc<dyn WaitForWorkFn>,
47 max_batch_size: usize,
48}
49
50impl<K, V, F, C> Loader<K, V, F, C>
51where
52 K: Eq + Hash + Clone,
53 V: Clone,
54 F: BatchFn<K, V>,
55 C: AsyncCache<Key = K, Val = V>,
56{
57 pub fn new(load_fn: F, cache: C) -> Self {
58 Loader {
59 state: Arc::new(Mutex::new(State::with_cache(cache))),
60 load_fn: Arc::new(Mutex::new(load_fn)),
61 max_batch_size: 200,
62 wait_for_work_fn: Arc::new(yield_fn(10)),
63 }
64 }
65
66 pub fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
67 self.max_batch_size = max_batch_size;
68 self
69 }
70
71 pub fn with_yield_count(mut self, yield_count: usize) -> Self {
72 self.wait_for_work_fn = Arc::new(yield_fn(yield_count));
73 self
74 }
75
76 /// Replaces the yielding for work behavior with an arbitrary future. Rather than yielding
77 /// the runtime repeatedly this will generate and `.await` a future of your choice.
78 /// ***This is incompatible with*** [`Self::with_yield_count()`].
79 pub fn with_custom_wait_for_work(mut self, wait_for_work_fn: impl WaitForWorkFn) -> Self {
80 self.wait_for_work_fn = Arc::new(wait_for_work_fn);
81 self
82 }
83
84 pub fn max_batch_size(&self) -> usize {
85 self.max_batch_size
86 }
87
88 pub async fn load(&self, key: K) -> Option<V> {
89 let mut state = self.state.lock().await;
90 if let Some(v) = state.completed.get(&key).await {
91 return Some(v.clone());
92 }
93
94 if !state.pending.contains(&key) {
95 state.pending.insert(key.clone());
96 if state.pending.len() >= self.max_batch_size {
97 let keys = state.pending.drain().collect::<Vec<K>>();
98 let mut load_fn = self.load_fn.lock().await;
99 let load_ret = load_fn.load(keys.as_ref()).await;
100 drop(load_fn);
101 for (k, v) in load_ret.into_iter() {
102 state.completed.insert(k, v).await;
103 }
104 return state.completed.get(&key).await.clone();
105 }
106 }
107 drop(state);
108
109 (self.wait_for_work_fn)().await;
110
111 let mut state = self.state.lock().await;
112 if let Some(v) = state.completed.get(&key).await {
113 return Some(v.clone());
114 }
115
116 if !state.pending.is_empty() {
117 let keys = state.pending.drain().collect::<Vec<K>>();
118 let mut load_fn = self.load_fn.lock().await;
119 let load_ret = load_fn.load(keys.as_ref()).await;
120 drop(load_fn);
121 for (k, v) in load_ret.into_iter() {
122 state.completed.insert(k, v).await;
123 }
124 }
125
126 state.completed.get(&key).await.clone()
127 }
128
129 pub async fn load_many(&self, keys: Vec<K>) -> HashMap<K, V> {
130 let mut state = self.state.lock().await;
131 let mut ret = HashMap::new();
132 let mut rest = Vec::new();
133 for key in keys.into_iter() {
134 if let Some(v) = state.completed.get(&key).await.clone() {
135 ret.insert(key, v);
136 continue;
137 }
138 if !state.pending.contains(&key) {
139 state.pending.insert(key.clone());
140
141 if state.pending.len() >= self.max_batch_size {
142 let keys = state.pending.drain().collect::<Vec<K>>();
143 let mut load_fn = self.load_fn.lock().await;
144 let load_ret = load_fn.load(keys.as_ref()).await;
145 drop(load_fn);
146 for (k, v) in load_ret.into_iter() {
147 state.completed.insert(k, v).await;
148 }
149 }
150 }
151 rest.push(key);
152 }
153 drop(state);
154
155 (self.wait_for_work_fn)().await;
156
157 if !rest.is_empty() {
158 let mut state = self.state.lock().await;
159 if !state.pending.is_empty() {
160 let keys = state.pending.drain().collect::<Vec<K>>();
161 let mut load_fn = self.load_fn.lock().await;
162 let load_ret = load_fn.load(keys.as_ref()).await;
163 drop(load_fn);
164 for (k, v) in load_ret.into_iter() {
165 state.completed.insert(k, v).await;
166 }
167 }
168
169 for key in rest.into_iter() {
170 if let Some(v) = state.completed.get(&key).await.clone() {
171 ret.insert(key, v);
172 }
173 }
174 }
175
176 ret
177 }
178
179 pub async fn prime(&self, key: K, val: V) {
180 let mut state = self.state.lock().await;
181 state.completed.insert(key, val).await;
182 }
183
184 pub async fn prime_many(&self, values: impl IntoIterator<Item = (K, V)>) {
185 let mut state = self.state.lock().await;
186 for (k, v) in values.into_iter() {
187 state.completed.insert(k, v).await;
188 }
189 }
190
191 pub async fn clear(&self, key: K) {
192 let mut state = self.state.lock().await;
193 state.completed.remove(&key).await;
194 }
195
196 pub async fn clear_all(&self) {
197 let mut state = self.state.lock().await;
198 state.completed.clear().await
199 }
200}