Tiny Async Wasi Preview 2 Runtime
wasm wasi preview2 async runtime rust
at master 122 lines 3.3 kB view raw
1use crate::engine::REACTOR; 2use crate::poll_tasks::EventWithWaker; 3use dashmap::DashMap; 4use futures::FutureExt; 5use lazy_static::lazy_static; 6use std::future::Future; 7use std::task::Poll; 8use std::time::Duration; 9use std::time::Instant; 10use uuid::Uuid; 11lazy_static! { 12 pub static ref TIMERS: DashMap<u32, EventWithWaker<Timer>> = DashMap::new(); 13} 14 15#[derive(Debug, Clone)] 16pub struct Timer { 17 at: Instant, 18 deadline: Duration, 19 elapsed: bool, 20} 21 22impl Timer { 23 /// create a timer that resolves once it elapses 24 pub async fn sleep(until: std::time::Duration) { 25 let timeout = TimeFuture { 26 timer_key: format!("sleep-{}", Uuid::new_v4()), 27 timer: Self { 28 at: Instant::now(), 29 deadline: until, 30 elapsed: false, 31 }, 32 }; 33 timeout.await 34 } 35 36 pub async fn timeout<K, F: Future<Output = K>>( 37 fut: F, 38 deadline: std::time::Duration, 39 ) -> std::io::Result<K> { 40 let timer_future = TimeFuture { 41 timer_key: format!("timeout-{}", Uuid::new_v4()), 42 timer: Self { 43 at: Instant::now(), 44 deadline, 45 elapsed: false, 46 }, 47 }; 48 let timeout_future = TimeoutFuture { timer_future, fut }; 49 timeout_future.await 50 } 51 pub fn update_elapsed(&mut self) { 52 let new_now = Instant::now(); 53 let elapsed = new_now 54 .checked_duration_since(self.at) 55 .map(|s| s > self.deadline) 56 .unwrap_or_default(); 57 self.elapsed = elapsed; 58 } 59 60 pub fn elapsed(&self) -> bool { 61 self.elapsed 62 } 63} 64 65struct TimeFuture { 66 timer_key: String, 67 timer: Timer, 68} 69 70impl Future for TimeFuture { 71 type Output = (); 72 fn poll( 73 self: std::pin::Pin<&mut Self>, 74 cx: &mut std::task::Context<'_>, 75 ) -> std::task::Poll<Self::Output> { 76 let this = self.get_mut(); 77 let reactor = &REACTOR; 78 if reactor.is_timer_pollable(&this.timer_key) { 79 let has_elapsed = reactor.timer_has_elapsed(&this.timer_key); 80 if has_elapsed { 81 reactor.remove_timer(&this.timer_key); 82 return std::task::Poll::Ready(()); 83 } 84 } else { 85 reactor.register_timer( 86 this.timer_key.clone(), 87 (this.timer.clone(), cx.waker().clone()), 88 ); 89 } 90 91 std::task::Poll::Pending 92 } 93} 94 95pin_project_lite::pin_project! { 96 pub struct TimeoutFuture<K,F:Future<Output = K>> 97 { 98 #[pin] 99 fut:F, 100 #[pin] 101 timer_future:TimeFuture 102 } 103} 104 105impl<K, F: Future<Output = K>> Future for TimeoutFuture<K, F> { 106 type Output = Result<K, std::io::Error>; 107 fn poll( 108 self: std::pin::Pin<&mut Self>, 109 cx: &mut std::task::Context<'_>, 110 ) -> std::task::Poll<Self::Output> { 111 let mut this = self.project(); 112 if this.timer_future.poll_unpin(cx).is_pending() { 113 match this.fut.poll_unpin(cx) { 114 Poll::Ready(ready) => Poll::Ready(Ok(ready)), 115 Poll::Pending => Poll::Pending, 116 } 117 } else { 118 let error = std::io::Error::new(std::io::ErrorKind::TimedOut, "Timer has elapsed"); 119 Poll::Ready(Err(error)) 120 } 121 } 122}