Tiny Async Wasi Preview 2 Runtime
wasm wasi preview2 async runtime rust
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Change around synchronization primitives

Instead of the lock around the reactor:
- use dashmaps on the future tasks
- use mutex for the poll tasks
- use integrated tests for testing functionality

+100 -44
+1 -1
Cargo.toml
··· 1 1 [package] 2 - name = "wasm-runtime" 2 + name = "tiny-wasm-runtime" 3 3 version = "0.1.0" 4 4 edition = "2021" 5 5
+50 -38
src/engine.rs
··· 2 2 use crate::{bindings::wasi::io::poll::Pollable, poll_tasks::PollTasks}; 3 3 use crate::{io::timer::TIMERS, poll_tasks::EventWithWaker}; 4 4 use dashmap::DashMap; 5 - use futures::pin_mut; 6 5 use lazy_static::lazy_static; 7 6 use std::{ 8 7 future::Future, ··· 13 12 }, 14 13 task::{Context, Wake, Waker}, 15 14 }; 15 + use uuid::Uuid; 16 16 lazy_static! { 17 17 /// The global reactor for this runtime 18 - pub static ref REACTOR: Mutex<Reactor<'static>> = Mutex::new(Reactor::default()); 18 + pub static ref REACTOR: Reactor<'static> = Reactor::default(); 19 19 20 20 } 21 21 /// The async engine instance ··· 35 35 } 36 36 #[derive(Default)] 37 37 pub struct Reactor<'a> { 38 - events: PollTasks, 39 - future_tasks: Vec<Task<'a>>, //right now the engine holds the tasks but depending 38 + events: Mutex<PollTasks>, 39 + future_tasks: DashMap<Uuid, Mutex<Task<'a>>>, //right now the engine holds the tasks but depending 40 40 timers: DashMap<String, EventWithWaker<Timer>>, 41 41 } 42 42 43 43 impl<'a> Reactor<'a> { 44 44 //adds event to the queue 45 - pub fn register(&mut self, event_name: String, pollable: EventWithWaker<Arc<Pollable>>) { 46 - self.events.push(event_name, pollable); 45 + pub fn register(&self, event_name: String, pollable: EventWithWaker<Arc<Pollable>>) { 46 + self.events.lock().unwrap().push(event_name, pollable); 47 47 } 48 48 //checks if descriptor has been added to the polling queue 49 49 pub fn is_pollable(&self, key: &str) -> bool { 50 - self.events.contains(key) 50 + self.events.lock().unwrap().contains(key) 51 51 } 52 52 53 53 //checks if timer is pollable ··· 56 56 } 57 57 58 58 //polls event queue to see if any of the events are readycar 59 - pub fn wait_for_io(&mut self) { 60 - self.events.wait_for_pollables(); 59 + pub fn wait_for_io(&self) { 60 + self.events.lock().unwrap().wait_for_pollables(); 61 61 } 62 62 63 63 //checks if event is ready 64 - pub fn check_ready(&mut self, event_name: &str) -> bool { 65 - self.events.check_if_ready(event_name) 64 + pub fn check_ready(&self, event_name: &str) -> bool { 65 + self.events.lock().unwrap().check_if_ready(event_name) 66 66 } 67 67 68 68 pub fn is_empty(&self) -> bool { 69 - self.events.is_empty() && self.future_tasks.is_empty() 69 + self.events.lock().unwrap().is_empty() && self.future_tasks.is_empty() 70 70 } 71 71 72 72 pub(crate) fn update_timers(&self) { ··· 85 85 .unwrap_or_default() 86 86 } 87 87 88 - pub(crate) fn register_timer(&mut self, timer_name: String, event: EventWithWaker<Timer>) { 88 + pub(crate) fn register_timer(&self, timer_name: String, event: EventWithWaker<Timer>) { 89 89 self.timers.insert(timer_name, event); 90 90 } 91 - } 92 91 93 - impl WasmRuntimeAsyncEngine { 94 - /// function to execute futures 95 - pub fn block_on<K, F: Future<Output = K> + Send, Fun: FnOnce() -> F>(async_closure: Fun) { 96 - let future = async_closure(); 97 - pin_mut!(future); 92 + pub(crate) fn push_task<K, F: Future<Output = K> + Send + 'static>(&self, future: F) -> Uuid { 98 93 let task = Task::new(Box::pin(async move { 99 94 let _result = future.await; 100 95 })); 101 - let mut future_tasks = Vec::new(); 102 - future_tasks.push(task); 96 + let id = Uuid::new_v4(); 97 + self.future_tasks.insert(id, Mutex::new(task)); 98 + id 99 + } 100 + } 101 + 102 + impl WasmRuntimeAsyncEngine { 103 + /// function to execute futures 104 + pub fn block_on<K, F: Future<Output = K> + Send + 'static>(future: F) { 105 + let reactor = &REACTOR; 106 + reactor.push_task(future); 103 107 loop { 104 - let mut reactor = REACTOR.lock().unwrap(); 105 108 reactor.update_timers(); 106 109 reactor.wait_for_io(); 107 - reactor.future_tasks.retain_mut(|task_info| { 108 - if task_info.waker.should_wake() { 109 - task_info.waker.reset(); 110 - let waker: Waker = task_info.waker.clone().into(); 110 + 111 + reactor.future_tasks.retain(|_, task_info| { 112 + let waker = task_info.get_mut().unwrap().waker.clone(); 113 + if waker.should_wake() { 114 + waker.reset(); 115 + let waker: Waker = waker.into(); 111 116 let mut context = Context::from_waker(&waker); 112 - if task_info.task.as_mut().poll(&mut context).is_ready() { 117 + 118 + if task_info 119 + .get_mut() 120 + .unwrap() 121 + .task 122 + .as_mut() 123 + .poll(&mut context) 124 + .is_ready() 125 + { 113 126 return false; 114 127 } 115 128 } 116 129 true 117 130 }); 131 + 118 132 if TIMERS.is_empty() && reactor.is_empty() { 119 133 break; 120 134 } ··· 122 136 } 123 137 124 138 pub fn spawn<K, F: Future<Output = ()> + Send + 'static>(future: F) { 125 - let task = Task::new(Box::pin(async move { 126 - let _result = future.await; 127 - })); 128 - REACTOR.lock().unwrap().future_tasks.push(task); 139 + REACTOR.push_task(future); 129 140 } 130 141 } 131 142 ··· 188 199 #[test] 189 200 fn test_enqueue() { 190 201 let count_future = CountFuture { max: 3, min: 0 }; 191 - let mut reactor = Reactor::default(); 192 - reactor.future_tasks.push(Task::new(Box::pin(async move { 202 + let reactor = Reactor::default(); 203 + let id = reactor.push_task(async move { 193 204 count_future.await; 194 - }))); 195 - let task = reactor.future_tasks.first_mut().unwrap(); 196 - let fut_waker = task.waker.clone(); 205 + }); 206 + let mut future_task = reactor.future_tasks.get_mut(&id).unwrap(); 207 + let task = future_task.value_mut(); 208 + let fut_waker = task.lock().unwrap().waker.clone(); 197 209 let waker: Waker = fut_waker.into(); 198 - let count_future = &mut task.task; 210 + let count_future = &mut task.lock().unwrap().task; 199 211 let mut context = Context::from_waker(&waker); 200 212 futures::pin_mut!(count_future); 201 213 let _ = count_future.as_mut().poll(&mut context); ··· 205 217 fn test_block_on() { 206 218 let count_future = CountFuture { max: 3, min: 0 }; 207 219 208 - WasmRuntimeAsyncEngine::block_on(|| async move { assert_eq!(count_future.await, 3) }); 220 + WasmRuntimeAsyncEngine::block_on(async move { assert_eq!(count_future.await, 3) }); 209 221 } 210 222 }
+3 -3
src/io/net.rs
··· 157 157 cx: &mut std::task::Context<'_>, 158 158 ) -> std::task::Poll<Self::Output> { 159 159 let this = self.get_mut(); 160 - if !REACTOR.lock().unwrap().is_pollable(&this.async_key) { 160 + if !REACTOR.is_pollable(&this.async_key) { 161 161 this.stream.start_connect(this.address, this.port)?; 162 - REACTOR.lock().unwrap().register( 162 + REACTOR.register( 163 163 this.async_key.clone(), 164 164 (this.stream.pollable.clone(), cx.waker().clone()), 165 165 ); 166 166 } 167 167 168 168 //A PLACE TO CHECK IF THE REACTOR UPDATED THIS KEY 169 - if REACTOR.lock().unwrap().check_ready(&this.async_key) { 169 + if REACTOR.check_ready(&this.async_key) { 170 170 this.stream.finish_connecting()?; 171 171 Poll::Ready(Ok(())) 172 172 } else {
+1 -2
src/io/timer.rs
··· 34 34 } 35 35 36 36 pub async fn timeout<K, F: Future<Output = K>>( 37 - &self, 38 37 fut: F, 39 38 deadline: std::time::Duration, 40 39 ) -> std::io::Result<K> { ··· 75 74 cx: &mut std::task::Context<'_>, 76 75 ) -> std::task::Poll<Self::Output> { 77 76 let this = self.get_mut(); 78 - let mut reactor = REACTOR.lock().unwrap(); 77 + let reactor = &REACTOR; 79 78 if reactor.is_timer_pollable(&this.timer_key) { 80 79 let has_elapsed = reactor.timer_has_elapsed(&this.timer_key); 81 80 if has_elapsed {
+45
tests/engine.rs
··· 1 + use std::sync::atomic::{AtomicBool, Ordering}; 2 + use std::sync::Arc; 3 + use tiny_wasm_runtime::{Timer, WasmRuntimeAsyncEngine}; 4 + 5 + pub async fn test_timers_with_assertions() { 6 + let task_a_done: Arc<AtomicBool> = Arc::new(AtomicBool::new(false)); 7 + let task_b_done = Arc::new(AtomicBool::new(false)); 8 + 9 + let task_a_done_clone = task_a_done.clone(); 10 + let task_b_done_clone = task_b_done.clone(); 11 + println!("spaw goes here"); 12 + WasmRuntimeAsyncEngine::spawn::<(), _>(async move { 13 + println!("sleep happens here"); 14 + Timer::sleep(std::time::Duration::from_secs(1)).await; 15 + task_a_done_clone.store(true, Ordering::SeqCst); 16 + }); 17 + 18 + println!("spaw goes here 2"); 19 + 20 + WasmRuntimeAsyncEngine::spawn::<(), _>(async move { 21 + Timer::sleep(std::time::Duration::from_millis(500)).await; 22 + task_b_done_clone.store(true, Ordering::SeqCst); 23 + }); 24 + 25 + // Do main timeout 26 + let slow_future = async { 27 + Timer::sleep(std::time::Duration::from_secs(2)).await; 28 + "completed" 29 + }; 30 + 31 + let res = Timer::timeout(slow_future, std::time::Duration::from_secs(1)).await; 32 + 33 + assert!(res.is_err(), "Expected the main future to time out"); 34 + assert!( 35 + task_a_done.load(Ordering::SeqCst) || task_b_done.load(Ordering::SeqCst), 36 + "At least one background task should have completed by now" 37 + ); 38 + } 39 + 40 + #[test] 41 + fn test_full_engine_runtime() { 42 + WasmRuntimeAsyncEngine::block_on(async { 43 + test_timers_with_assertions().await; 44 + }); 45 + }