Tiny Async Wasi Preview 2 Runtime
wasm wasi preview2 async runtime rust
at master 281 lines 8.5 kB view raw
1use crate::Timer; 2use crate::{bindings::wasi::io::poll::Pollable, poll_tasks::PollTasks}; 3use crate::{io::timer::TIMERS, poll_tasks::EventWithWaker}; 4use crossbeam::queue::SegQueue; 5use futures::channel::oneshot; 6use futures::FutureExt; 7use lazy_static::lazy_static; 8use std::collections::hash_map::Entry; 9use std::collections::HashMap; 10use std::sync::RwLock; 11use std::{ 12 future::Future, 13 pin::Pin, 14 sync::{Arc, Mutex}, 15 task::{Context, Wake, Waker}, 16}; 17use uuid::Uuid; 18lazy_static! { 19 /// The global reactor for this runtime 20 pub static ref REACTOR: Reactor<'static> = Reactor::default(); 21 //queue for ready tasks 22 #[derive(Debug)] 23 pub static ref READY_QUEUE: SegQueue<Uuid> = SegQueue::new(); 24 25} 26/// The async engine instance 27pub struct WasmRuntimeAsyncEngine; 28 29/// the reactor that processes poll submissions. Still Experimental 30struct Task<'a> { 31 id: Uuid, 32 task: Pin<Box<dyn Future<Output = ()> + Send + 'a>>, 33 waker: Arc<FutureWaker>, 34} 35 36impl<'a> Task<'a> { 37 fn new(task: Pin<Box<dyn Future<Output = ()> + Send + 'a>>) -> Self { 38 let id = Uuid::new_v4(); 39 let waker = Arc::new(FutureWaker::new(id)); 40 Self { id, task, waker } 41 } 42} 43#[derive(Default)] 44pub struct Reactor<'a> { 45 events: Mutex<PollTasks>, 46 spawn_queue: SegQueue<Task<'a>>, //right now the engine holds the tasks but depending 47 future_tasks: RwLock<HashMap<Uuid, Mutex<Task<'a>>>>, 48 timers: RwLock<HashMap<String, EventWithWaker<Timer>>>, 49} 50 51pub struct JoinHandle<T> { 52 id: Uuid, 53 receiver: oneshot::Receiver<T>, 54} 55 56impl<T> JoinHandle<T> { 57 pub fn cancel(&self) { 58 REACTOR.remove_task(self.id); 59 } 60} 61 62impl<T> Future for JoinHandle<T> { 63 type Output = T; 64 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { 65 let this = self.get_mut(); 66 match this.receiver.poll_unpin(cx) { 67 std::task::Poll::Ready(Ok(result)) => std::task::Poll::Ready(result), 68 std::task::Poll::Ready(_) => panic!("Could not join result"), 69 std::task::Poll::Pending => std::task::Poll::Pending, 70 } 71 } 72} 73impl<'a> Reactor<'a> { 74 //adds event to the queue 75 pub fn register(&self, event_name: String, pollable: EventWithWaker<Arc<Pollable>>) { 76 self.events.lock().unwrap().push(event_name, pollable); 77 } 78 //checks if descriptor has been added to the polling queue 79 pub fn is_pollable(&self, key: &str) -> bool { 80 self.events.lock().unwrap().contains(key) 81 } 82 83 //checks if timer is pollable 84 pub fn is_timer_pollable(&self, key: &str) -> bool { 85 self.timers.read().unwrap().contains_key(key) 86 } 87 88 //polls event queue to see if any of the events are readycar 89 pub fn wait_for_io(&self) { 90 self.events.lock().unwrap().wait_for_pollables(); 91 } 92 93 //checks if event is ready 94 pub fn check_ready(&self, event_name: &str) -> bool { 95 self.events.lock().unwrap().check_if_ready(event_name) 96 } 97 98 pub fn is_empty(&self) -> bool { 99 self.events.lock().unwrap().is_empty() && self.future_tasks.read().unwrap().is_empty() 100 } 101 102 pub(crate) fn update_timers(&self) { 103 self.timers 104 .write() 105 .unwrap() 106 .iter_mut() 107 .for_each(|(_, cell)| { 108 cell.0.update_elapsed(); 109 if cell.0.elapsed() { 110 cell.1.wake_by_ref(); 111 } 112 }); 113 } 114 pub(crate) fn remove_task(&self, id: Uuid) { 115 self.future_tasks.write().unwrap().remove(&id); 116 //Todo: See if you can remove timers and sockets as well 117 } 118 119 pub(crate) fn remove_pollable(&self, event_name: &str) { 120 self.events.lock().unwrap().remove(event_name); 121 } 122 123 pub(crate) fn timer_has_elapsed(&self, timer_key: &str) -> bool { 124 self.timers 125 .read() 126 .unwrap() 127 .get(timer_key) 128 .map(|s| s.0.elapsed()) 129 .unwrap_or_default() 130 } 131 132 pub(crate) fn remove_timer(&self, timer_key: &str) { 133 self.timers.write().unwrap().remove(timer_key); 134 } 135 136 pub(crate) fn register_timer(&self, timer_name: String, event: EventWithWaker<Timer>) { 137 self.timers.write().unwrap().insert(timer_name, event); 138 } 139 140 pub(crate) fn push_task<K: Send + 'a, F: Future<Output = K> + Send + 'static>( 141 &self, 142 future: F, 143 ) -> JoinHandle<K> { 144 let (sender, receiver) = oneshot::channel(); 145 let task = Task::new(Box::pin(async move { 146 let result = future.await; 147 let _ = sender.send(result); 148 })); 149 let id = task.id; 150 self.spawn_queue.push(task); 151 READY_QUEUE.push(id); 152 JoinHandle { id, receiver } 153 } 154 155 pub(crate) fn drain_queue(&self) { 156 while let Some(task) = self.spawn_queue.pop() { 157 self.future_tasks 158 .write() 159 .unwrap() 160 .insert(task.id, Mutex::new(task)); 161 } 162 } 163} 164 165impl WasmRuntimeAsyncEngine { 166 /// function to execute futures 167 pub fn block_on<K: Send + 'static, F: Future<Output = K> + Send + 'static>(future: F) -> K { 168 let reactor = &REACTOR; 169 let mut join_handle = reactor.push_task(future); 170 loop { 171 reactor.update_timers(); 172 reactor.wait_for_io(); 173 while let Some(id) = READY_QUEUE.pop() { 174 reactor.drain_queue(); 175 if let Entry::Occupied(mut entry) = reactor.future_tasks.write().unwrap().entry(id) 176 { 177 let task_info = entry.get_mut(); 178 let task_ref = task_info.get_mut().unwrap(); 179 let waker = task_ref.waker.clone(); 180 let waker: Waker = waker.into(); 181 let mut context = Context::from_waker(&waker); 182 let polling_state = task_ref.task.as_mut().poll(&mut context); 183 if polling_state.is_ready() { 184 entry.remove(); 185 } 186 } 187 } 188 189 if TIMERS.is_empty() && reactor.is_empty() { 190 break; 191 } 192 } 193 194 loop { 195 let mut context = Context::from_waker(Waker::noop()); 196 if let std::task::Poll::Ready(result) = join_handle.poll_unpin(&mut context) { 197 return result; 198 } 199 } 200 } 201 202 pub fn spawn<K: Send + 'static, F: Future<Output = K> + Send + 'static>( 203 future: F, 204 ) -> JoinHandle<K> { 205 REACTOR.push_task(future) 206 } 207} 208 209#[derive(Debug)] 210struct FutureWaker { 211 id: Uuid, 212} 213 214impl FutureWaker { 215 pub fn new(id: Uuid) -> Self { 216 Self { id } 217 } 218 fn wake_inner(&self) { 219 READY_QUEUE.push(self.id); 220 } 221} 222 223impl Wake for FutureWaker { 224 fn wake(self: std::sync::Arc<Self>) { 225 self.wake_inner(); 226 } 227} 228 229#[cfg(test)] 230mod test { 231 232 use super::*; 233 use std::future::Future; 234 235 struct CountFuture { 236 min: u8, 237 max: u8, 238 } 239 240 impl Future for CountFuture { 241 type Output = u8; 242 fn poll( 243 self: std::pin::Pin<&mut Self>, 244 cx: &mut std::task::Context<'_>, 245 ) -> std::task::Poll<Self::Output> { 246 let count_fut_mut = self.get_mut(); 247 if count_fut_mut.min == count_fut_mut.max { 248 return std::task::Poll::Ready(count_fut_mut.min); 249 } 250 251 count_fut_mut.min += 1; 252 cx.waker().wake_by_ref(); 253 std::task::Poll::Pending 254 } 255 } 256 257 #[test] 258 fn test_enqueue() { 259 let count_future = CountFuture { max: 3, min: 0 }; 260 let reactor = Reactor::default(); 261 let handle = reactor.push_task(async move { 262 count_future.await; 263 }); 264 reactor.drain_queue(); 265 let mut future_task = reactor.future_tasks.write().unwrap(); 266 let task = future_task.get_mut(&handle.id).unwrap(); 267 let fut_waker = task.lock().unwrap().waker.clone(); 268 let waker: Waker = fut_waker.into(); 269 let count_future = &mut task.lock().unwrap().task; 270 let mut context = Context::from_waker(&waker); 271 futures::pin_mut!(count_future); 272 let _ = count_future.as_mut().poll(&mut context); 273 } 274 275 #[test] 276 fn test_block_on() { 277 let count_future = CountFuture { max: 3, min: 0 }; 278 279 WasmRuntimeAsyncEngine::block_on(async move { assert_eq!(count_future.await, 3) }); 280 } 281}