Tiny Async Wasi Preview 2 Runtime
wasm
wasi
preview2
async
runtime
rust
1use crate::Timer;
2use crate::{bindings::wasi::io::poll::Pollable, poll_tasks::PollTasks};
3use crate::{io::timer::TIMERS, poll_tasks::EventWithWaker};
4use crossbeam::queue::SegQueue;
5use futures::channel::oneshot;
6use futures::FutureExt;
7use lazy_static::lazy_static;
8use std::collections::hash_map::Entry;
9use std::collections::HashMap;
10use std::sync::RwLock;
11use std::{
12 future::Future,
13 pin::Pin,
14 sync::{Arc, Mutex},
15 task::{Context, Wake, Waker},
16};
17use uuid::Uuid;
18lazy_static! {
19 /// The global reactor for this runtime
20 pub static ref REACTOR: Reactor<'static> = Reactor::default();
21 //queue for ready tasks
22 #[derive(Debug)]
23 pub static ref READY_QUEUE: SegQueue<Uuid> = SegQueue::new();
24
25}
26/// The async engine instance
27pub struct WasmRuntimeAsyncEngine;
28
29/// the reactor that processes poll submissions. Still Experimental
30struct Task<'a> {
31 id: Uuid,
32 task: Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
33 waker: Arc<FutureWaker>,
34}
35
36impl<'a> Task<'a> {
37 fn new(task: Pin<Box<dyn Future<Output = ()> + Send + 'a>>) -> Self {
38 let id = Uuid::new_v4();
39 let waker = Arc::new(FutureWaker::new(id));
40 Self { id, task, waker }
41 }
42}
43#[derive(Default)]
44pub struct Reactor<'a> {
45 events: Mutex<PollTasks>,
46 spawn_queue: SegQueue<Task<'a>>, //right now the engine holds the tasks but depending
47 future_tasks: RwLock<HashMap<Uuid, Mutex<Task<'a>>>>,
48 timers: RwLock<HashMap<String, EventWithWaker<Timer>>>,
49}
50
51pub struct JoinHandle<T> {
52 id: Uuid,
53 receiver: oneshot::Receiver<T>,
54}
55
56impl<T> JoinHandle<T> {
57 pub fn cancel(&self) {
58 REACTOR.remove_task(self.id);
59 }
60}
61
62impl<T> Future for JoinHandle<T> {
63 type Output = T;
64 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
65 let this = self.get_mut();
66 match this.receiver.poll_unpin(cx) {
67 std::task::Poll::Ready(Ok(result)) => std::task::Poll::Ready(result),
68 std::task::Poll::Ready(_) => panic!("Could not join result"),
69 std::task::Poll::Pending => std::task::Poll::Pending,
70 }
71 }
72}
73impl<'a> Reactor<'a> {
74 //adds event to the queue
75 pub fn register(&self, event_name: String, pollable: EventWithWaker<Arc<Pollable>>) {
76 self.events.lock().unwrap().push(event_name, pollable);
77 }
78 //checks if descriptor has been added to the polling queue
79 pub fn is_pollable(&self, key: &str) -> bool {
80 self.events.lock().unwrap().contains(key)
81 }
82
83 //checks if timer is pollable
84 pub fn is_timer_pollable(&self, key: &str) -> bool {
85 self.timers.read().unwrap().contains_key(key)
86 }
87
88 //polls event queue to see if any of the events are readycar
89 pub fn wait_for_io(&self) {
90 self.events.lock().unwrap().wait_for_pollables();
91 }
92
93 //checks if event is ready
94 pub fn check_ready(&self, event_name: &str) -> bool {
95 self.events.lock().unwrap().check_if_ready(event_name)
96 }
97
98 pub fn is_empty(&self) -> bool {
99 self.events.lock().unwrap().is_empty() && self.future_tasks.read().unwrap().is_empty()
100 }
101
102 pub(crate) fn update_timers(&self) {
103 self.timers
104 .write()
105 .unwrap()
106 .iter_mut()
107 .for_each(|(_, cell)| {
108 cell.0.update_elapsed();
109 if cell.0.elapsed() {
110 cell.1.wake_by_ref();
111 }
112 });
113 }
114 pub(crate) fn remove_task(&self, id: Uuid) {
115 self.future_tasks.write().unwrap().remove(&id);
116 //Todo: See if you can remove timers and sockets as well
117 }
118
119 pub(crate) fn remove_pollable(&self, event_name: &str) {
120 self.events.lock().unwrap().remove(event_name);
121 }
122
123 pub(crate) fn timer_has_elapsed(&self, timer_key: &str) -> bool {
124 self.timers
125 .read()
126 .unwrap()
127 .get(timer_key)
128 .map(|s| s.0.elapsed())
129 .unwrap_or_default()
130 }
131
132 pub(crate) fn remove_timer(&self, timer_key: &str) {
133 self.timers.write().unwrap().remove(timer_key);
134 }
135
136 pub(crate) fn register_timer(&self, timer_name: String, event: EventWithWaker<Timer>) {
137 self.timers.write().unwrap().insert(timer_name, event);
138 }
139
140 pub(crate) fn push_task<K: Send + 'a, F: Future<Output = K> + Send + 'static>(
141 &self,
142 future: F,
143 ) -> JoinHandle<K> {
144 let (sender, receiver) = oneshot::channel();
145 let task = Task::new(Box::pin(async move {
146 let result = future.await;
147 let _ = sender.send(result);
148 }));
149 let id = task.id;
150 self.spawn_queue.push(task);
151 READY_QUEUE.push(id);
152 JoinHandle { id, receiver }
153 }
154
155 pub(crate) fn drain_queue(&self) {
156 while let Some(task) = self.spawn_queue.pop() {
157 self.future_tasks
158 .write()
159 .unwrap()
160 .insert(task.id, Mutex::new(task));
161 }
162 }
163}
164
165impl WasmRuntimeAsyncEngine {
166 /// function to execute futures
167 pub fn block_on<K: Send + 'static, F: Future<Output = K> + Send + 'static>(future: F) -> K {
168 let reactor = &REACTOR;
169 let mut join_handle = reactor.push_task(future);
170 loop {
171 reactor.update_timers();
172 reactor.wait_for_io();
173 while let Some(id) = READY_QUEUE.pop() {
174 reactor.drain_queue();
175 if let Entry::Occupied(mut entry) = reactor.future_tasks.write().unwrap().entry(id)
176 {
177 let task_info = entry.get_mut();
178 let task_ref = task_info.get_mut().unwrap();
179 let waker = task_ref.waker.clone();
180 let waker: Waker = waker.into();
181 let mut context = Context::from_waker(&waker);
182 let polling_state = task_ref.task.as_mut().poll(&mut context);
183 if polling_state.is_ready() {
184 entry.remove();
185 }
186 }
187 }
188
189 if TIMERS.is_empty() && reactor.is_empty() {
190 break;
191 }
192 }
193
194 loop {
195 let mut context = Context::from_waker(Waker::noop());
196 if let std::task::Poll::Ready(result) = join_handle.poll_unpin(&mut context) {
197 return result;
198 }
199 }
200 }
201
202 pub fn spawn<K: Send + 'static, F: Future<Output = K> + Send + 'static>(
203 future: F,
204 ) -> JoinHandle<K> {
205 REACTOR.push_task(future)
206 }
207}
208
209#[derive(Debug)]
210struct FutureWaker {
211 id: Uuid,
212}
213
214impl FutureWaker {
215 pub fn new(id: Uuid) -> Self {
216 Self { id }
217 }
218 fn wake_inner(&self) {
219 READY_QUEUE.push(self.id);
220 }
221}
222
223impl Wake for FutureWaker {
224 fn wake(self: std::sync::Arc<Self>) {
225 self.wake_inner();
226 }
227}
228
229#[cfg(test)]
230mod test {
231
232 use super::*;
233 use std::future::Future;
234
235 struct CountFuture {
236 min: u8,
237 max: u8,
238 }
239
240 impl Future for CountFuture {
241 type Output = u8;
242 fn poll(
243 self: std::pin::Pin<&mut Self>,
244 cx: &mut std::task::Context<'_>,
245 ) -> std::task::Poll<Self::Output> {
246 let count_fut_mut = self.get_mut();
247 if count_fut_mut.min == count_fut_mut.max {
248 return std::task::Poll::Ready(count_fut_mut.min);
249 }
250
251 count_fut_mut.min += 1;
252 cx.waker().wake_by_ref();
253 std::task::Poll::Pending
254 }
255 }
256
257 #[test]
258 fn test_enqueue() {
259 let count_future = CountFuture { max: 3, min: 0 };
260 let reactor = Reactor::default();
261 let handle = reactor.push_task(async move {
262 count_future.await;
263 });
264 reactor.drain_queue();
265 let mut future_task = reactor.future_tasks.write().unwrap();
266 let task = future_task.get_mut(&handle.id).unwrap();
267 let fut_waker = task.lock().unwrap().waker.clone();
268 let waker: Waker = fut_waker.into();
269 let count_future = &mut task.lock().unwrap().task;
270 let mut context = Context::from_waker(&waker);
271 futures::pin_mut!(count_future);
272 let _ = count_future.as_mut().poll(&mut context);
273 }
274
275 #[test]
276 fn test_block_on() {
277 let count_future = CountFuture { max: 3, min: 0 };
278
279 WasmRuntimeAsyncEngine::block_on(async move { assert_eq!(count_future.await, 3) });
280 }
281}