Tiny Async Wasi Preview 2 Runtime
wasm
wasi
preview2
async
runtime
rust
1use crate::engine::REACTOR;
2use crate::poll_tasks::EventWithWaker;
3use dashmap::DashMap;
4use futures::FutureExt;
5use lazy_static::lazy_static;
6use std::future::Future;
7use std::task::Poll;
8use std::time::Duration;
9use std::time::Instant;
10use uuid::Uuid;
11lazy_static! {
12 pub static ref TIMERS: DashMap<u32, EventWithWaker<Timer>> = DashMap::new();
13}
14
15#[derive(Debug, Clone)]
16pub struct Timer {
17 at: Instant,
18 deadline: Duration,
19 elapsed: bool,
20}
21
22impl Timer {
23 /// create a timer that resolves once it elapses
24 pub async fn sleep(until: std::time::Duration) {
25 let timeout = TimeFuture {
26 timer_key: format!("sleep-{}", Uuid::new_v4()),
27 timer: Self {
28 at: Instant::now(),
29 deadline: until,
30 elapsed: false,
31 },
32 };
33 timeout.await
34 }
35
36 pub async fn timeout<K, F: Future<Output = K>>(
37 fut: F,
38 deadline: std::time::Duration,
39 ) -> std::io::Result<K> {
40 let timer_future = TimeFuture {
41 timer_key: format!("timeout-{}", Uuid::new_v4()),
42 timer: Self {
43 at: Instant::now(),
44 deadline,
45 elapsed: false,
46 },
47 };
48 let timeout_future = TimeoutFuture { timer_future, fut };
49 timeout_future.await
50 }
51 pub fn update_elapsed(&mut self) {
52 let new_now = Instant::now();
53 let elapsed = new_now
54 .checked_duration_since(self.at)
55 .map(|s| s > self.deadline)
56 .unwrap_or_default();
57 self.elapsed = elapsed;
58 }
59
60 pub fn elapsed(&self) -> bool {
61 self.elapsed
62 }
63}
64
65struct TimeFuture {
66 timer_key: String,
67 timer: Timer,
68}
69
70impl Future for TimeFuture {
71 type Output = ();
72 fn poll(
73 self: std::pin::Pin<&mut Self>,
74 cx: &mut std::task::Context<'_>,
75 ) -> std::task::Poll<Self::Output> {
76 let this = self.get_mut();
77 let reactor = &REACTOR;
78 if reactor.is_timer_pollable(&this.timer_key) {
79 let has_elapsed = reactor.timer_has_elapsed(&this.timer_key);
80 if has_elapsed {
81 reactor.remove_timer(&this.timer_key);
82 return std::task::Poll::Ready(());
83 }
84 } else {
85 reactor.register_timer(
86 this.timer_key.clone(),
87 (this.timer.clone(), cx.waker().clone()),
88 );
89 }
90
91 std::task::Poll::Pending
92 }
93}
94
95pin_project_lite::pin_project! {
96 pub struct TimeoutFuture<K,F:Future<Output = K>>
97 {
98 #[pin]
99 fut:F,
100 #[pin]
101 timer_future:TimeFuture
102 }
103}
104
105impl<K, F: Future<Output = K>> Future for TimeoutFuture<K, F> {
106 type Output = Result<K, std::io::Error>;
107 fn poll(
108 self: std::pin::Pin<&mut Self>,
109 cx: &mut std::task::Context<'_>,
110 ) -> std::task::Poll<Self::Output> {
111 let mut this = self.project();
112 if this.timer_future.poll_unpin(cx).is_pending() {
113 match this.fut.poll_unpin(cx) {
114 Poll::Ready(ready) => Poll::Ready(Ok(ready)),
115 Poll::Pending => Poll::Pending,
116 }
117 } else {
118 let error = std::io::Error::new(std::io::ErrorKind::TimedOut, "Timer has elapsed");
119 Poll::Ready(Err(error))
120 }
121 }
122}