Next Generation WASM Microkernel Operating System
1// Copyright 2025 Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::error::Closed;
9use crate::loom::cell::UnsafeCell;
10use crate::loom::sync::atomic::{AtomicUsize, Ordering};
11use bitflags::bitflags;
12use core::panic::{RefUnwindSafe, UnwindSafe};
13use core::pin::Pin;
14use core::task::{Context, Poll, Waker};
15use core::{fmt, task};
16use static_assertions::const_assert_eq;
17use util::{CachePadded, loom_const_fn};
18
19/// An atomically registered [`Waker`].
20///
21/// This cell stores the [`Waker`] of a single task. A [`Waker`] is stored in
22/// the cell either by calling [`poll_wait`], or by polling a [`wait`]
23/// future. Once a task's [`Waker`] is stored in a `WaitCell`, it can be woken
24/// by calling [`wake`] on the `WaitCell`.
25///
26/// # Implementation Notes
27///
28/// This type is copied from [`maitake-sync`](https://github.com/hawkw/mycelium/blob/dd0020892564c77ee4c20ffbc2f7f5b046ad54c8/maitake-sync/src/wait_cell.rs)
29/// which is in turn inspired by the [`AtomicWaker`] type used in Tokio's
30/// synchronization primitives, with the following modifications:
31///
32/// - An additional bit of state is added to allow [setting a "close"
33/// bit](Self::close).
34/// - A `WaitCell` is always woken by value (for now).
35///
36/// [`AtomicWaker`]: https://github.com/tokio-rs/tokio/blob/09b770c5db31a1f35631600e1d239679354da2dd/tokio/src/sync/task/atomic_waker.rs
37/// [`Waker`]: Waker
38/// [`poll_wait`]: Self::poll_wait
39/// [`wait`]: Self::wait
40/// [`wake`]: Self::wake
41pub struct WaitCell {
42 state: CachePadded<AtomicUsize>,
43 waker: UnsafeCell<Option<Waker>>,
44}
45
46bitflags! {
47 #[derive(Debug, PartialEq, Eq)]
48 struct State: usize {
49 const WAITING = 0b0000;
50 const REGISTERING = 0b0001;
51 const WAKING = 0b0010;
52 const WOKEN = 0b0100;
53 const CLOSED = 0b1000;
54 }
55}
56// WAITING MUST be zero
57const_assert_eq!(State::WAITING.bits(), 0);
58
59/// Future returned from [`WaitCell::wait()`].
60///
61/// This future is fused, so once it has completed, any future calls to poll
62/// will immediately return [`Poll::Ready`].
63#[derive(Debug)]
64#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
65pub struct Wait<'a> {
66 /// The [`WaitCell`] being waited on.
67 cell: &'a WaitCell,
68
69 presubscribe: Poll<Result<(), Closed>>,
70}
71
72/// Future returned from [`WaitCell::subscribe()`].
73///
74/// See the documentation for [`WaitCell::subscribe()`] for details.
75#[derive(Debug)]
76#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
77pub struct Subscribe<'a> {
78 /// The [`WaitCell`] being waited on.
79 cell: &'a WaitCell,
80}
81
82/// An error indicating that a [`WaitCell`] was closed or busy while
83/// attempting register a [`Waker`].
84///
85/// This error is returned by the [`WaitCell::poll_wait`] method.
86#[derive(Copy, Clone, Debug, Eq, PartialEq)]
87pub enum PollWaitError {
88 /// The [`Waker`] was not registered because the [`WaitCell`] has been
89 /// [closed](WaitCell::close).
90 Closed,
91
92 /// The [`Waker`] was not registered because another task was concurrently
93 /// storing its own [`Waker`] in the [`WaitCell`].
94 Busy,
95}
96
97// === impl WaitCell ===
98
99impl WaitCell {
100 loom_const_fn! {
101 pub const fn new() -> Self {
102 Self {
103 state: CachePadded(AtomicUsize::new(State::WAITING.bits())),
104 waker: UnsafeCell::new(None),
105 }
106 }
107 }
108
109 #[tracing::instrument]
110 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<Result<(), PollWaitError>> {
111 // this is based on tokio's AtomicWaker synchronization strategy
112 match self.compare_exchange(State::WAITING, State::REGISTERING, Ordering::Acquire) {
113 Err(actual) if actual.contains(State::CLOSED) => {
114 return Poll::Ready(Err(PollWaitError::Closed));
115 }
116 Err(actual) if actual.contains(State::WOKEN) => {
117 // take the wakeup
118 self.fetch_and(!State::WOKEN, Ordering::Release);
119 return Poll::Ready(Ok(()));
120 }
121 // someone else is notifying, so don't wait!
122 Err(actual) if actual.contains(State::WAKING) => {
123 return Poll::Ready(Ok(()));
124 }
125 Err(_) => return Poll::Ready(Err(PollWaitError::Busy)),
126 Ok(_) => {}
127 }
128
129 let waker = cx.waker();
130 tracing::trace!(
131 /*wait_cell = ?fmt::ptr(self),*/ ?waker,
132 "registering waker"
133 );
134
135 if let Some(prev_waker) = self.replace_waker(waker.clone()) {
136 tracing::debug!("Replaced an old waker in cell, waking");
137 prev_waker.wake();
138 }
139
140 if let Err(actual) =
141 self.compare_exchange(State::REGISTERING, State::WAITING, Ordering::AcqRel)
142 {
143 // If the `compare_exchange` fails above, this means that we were notified for one of
144 // two reasons: either the cell was awoken, or the cell was closed.
145 //
146 // Bail out of the parking state, and determine what to report to the caller.
147 tracing::trace!(state = ?actual, "was notified");
148
149 // Safety: No one else is touching the waker right now, so it is safe to access it
150 // mutably
151 let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() });
152
153 // Reset to the WAITING state by clearing everything *except*
154 // the closed bits (which must remain set). This `fetch_and`
155 // does *not* set the CLOSED bit if it is unset, it just doesn't
156 // clear it.
157 let state = self.fetch_and(State::CLOSED, Ordering::AcqRel);
158 // The only valid state transition while we were parking is to
159 // add the CLOSED bit.
160 debug_assert!(
161 state == actual || state == actual | State::CLOSED,
162 "state changed unexpectedly while parking!"
163 );
164
165 if let Some(waker) = waker {
166 waker.wake();
167 }
168
169 // Was the `CLOSED` bit set while we were clearing other bits?
170 // If so, the cell is closed. Otherwise, we must have been notified.
171 if state.contains(State::CLOSED) {
172 return Poll::Ready(Err(PollWaitError::Closed));
173 }
174
175 return Poll::Ready(Ok(()));
176 }
177
178 // Waker registered, time to yield!
179 Poll::Pending
180 }
181
182 /// Wait to be woken up by this cell.
183 ///
184 /// # Returns
185 ///
186 /// This future completes with the following values:
187 ///
188 /// - [`Ok`]`(())` if the future was woken by a call to [`wake`] or another
189 /// task calling [`poll_wait`] or [`wait`] on this [`WaitCell`].
190 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by a call to [`close`],
191 /// or the [`WaitCell`] was already closed.
192 ///
193 /// **Note**: The calling task's [`Waker`] is not registered until AFTER the
194 /// first time the returned [`Wait`] future is polled. This means that if a
195 /// call to [`wake`] occurs between when [`wait`] is called and when the
196 /// future is first polled, the future will *not* complete. If the caller is
197 /// responsible for performing an operation which will result in an eventual
198 /// wakeup, prefer calling [`subscribe`] _before_ performing that operation
199 /// and `.await`ing the [`Wait`] future returned by [`subscribe`].
200 ///
201 /// [`wake`]: Self::wake
202 /// [`poll_wait`]: Self::poll_wait
203 /// [`wait`]: Self::wait
204 /// [`close`]: Self::close
205 /// [`subscribe`]: Self::subscribe
206 pub fn wait(&self) -> Wait<'_> {
207 Wait {
208 cell: self,
209 presubscribe: Poll::Pending,
210 }
211 }
212
213 /// Eagerly subscribe to notifications from this `WaitCell`.
214 ///
215 /// This method returns a [`Subscribe`] [`Future`], which outputs a [`Wait`]
216 /// [`Future`]. Awaiting the [`Subscribe`] future will eagerly register the
217 /// calling task to be woken by this [`WaitCell`], so that the returned
218 /// [`Wait`] future will be woken by any calls to [`wake`] (or [`close`])
219 /// that occur between when the [`Subscribe`] future completes and when the
220 /// returned [`Wait`] future is `.await`ed.
221 ///
222 /// This is primarily intended for scenarios where the task that waits on a
223 /// [`WaitCell`] is responsible for performing some operation that
224 /// ultimately results in the [`WaitCell`] being woken. If the task were to
225 /// simply perform the operation and then call [`wait`] on the [`WaitCell`],
226 /// a potential race condition could occur where the operation completes and
227 /// wakes the [`WaitCell`] *before* the [`Wait`] future is first `.await`ed.
228 /// Using `subscribe`, the task can ensure that it is ready to be woken by
229 /// the cell *before* performing an operation that could result in it being
230 /// woken.
231 ///
232 /// These scenarios occur when a wakeup is triggered by another thread/CPU
233 /// core in response to an operation performed in the task waiting on the
234 /// `WaitCell`, or when the wakeup is triggered by a hardware interrupt
235 /// resulting from operations performed in the task.
236 ///
237 /// [`wait`]: Self::wait
238 /// [`wake`]: Self::wake
239 /// [`close`]: Self::close
240 pub fn subscribe(&self) -> Subscribe<'_> {
241 Subscribe { cell: self }
242 }
243
244 /// Wake the [`Waker`] stored in this cell.
245 ///
246 /// # Returns
247 ///
248 /// - `true` if a waiting task was woken.
249 /// - `false` if no task was woken (no [`Waker`] was stored in the cell)
250 #[tracing::instrument]
251 pub fn wake(&self) -> bool {
252 if let Some(waker) = self.take_waker(false) {
253 waker.wake();
254 true
255 } else {
256 false
257 }
258 }
259
260 /// Close the [`WaitCell`].
261 ///
262 /// This wakes any waiting task with an error indicating the `WaitCell` is
263 /// closed. Subsequent calls to [`wait`] or [`poll_wait`] will return an
264 /// error indicating that the cell has been closed.
265 ///
266 /// [`wait`]: Self::wait
267 /// [`poll_wait`]: Self::poll_wait
268 #[tracing::instrument]
269 pub fn close(&self) -> bool {
270 if let Some(waker) = self.take_waker(true) {
271 waker.wake();
272 true
273 } else {
274 false
275 }
276 }
277
278 /// Returns `true` if this `WaitCell` is [closed](Self::close).
279 #[must_use]
280 pub fn is_closed(&self) -> bool {
281 self.current_state() == State::CLOSED
282 }
283
284 /// Asynchronously poll the given function `f` until a condition occurs,
285 /// using the [`WaitCell`] to only re-poll when notified.
286 ///
287 /// This can be used to implement a "wait loop", turning a "try" function
288 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
289 /// "recv" or "send").
290 ///
291 /// In particular, this function correctly *registers* interest in the [`WaitCell`]
292 /// prior to polling the function, ensuring that there is not a chance of a race
293 /// where the condition occurs AFTER checking but BEFORE registering interest
294 /// in the [`WaitCell`], which could lead to deadlock.
295 ///
296 /// This is intended to have similar behavior to `Condvar` in the standard library,
297 /// but asynchronous, and not requiring operating system intervention (or existence).
298 ///
299 /// In particular, this can be used in cases where interrupts or events are used
300 /// to signify readiness or completion of some task, such as the completion of a
301 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
302 /// can wake the cell, allowing the polling function to check status fields for
303 /// partial progress or completion.
304 ///
305 /// Consider using [`Self::wait_for_value()`] if your function does return a value.
306 ///
307 // Consider using [`WaitQueue::wait_for()`](super::wait_queue::WaitQueue::wait_for)
308 // if you need multiple waiters.
309 ///
310 /// # Errors
311 ///
312 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
313 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
314 loop {
315 let wait = self.subscribe().await;
316 if f() {
317 return Ok(());
318 }
319 wait.await?;
320 }
321 }
322
323 /// Asynchronously poll the given function `f` until a condition occurs,
324 /// using the [`WaitCell`] to only re-poll when notified.
325 ///
326 /// This can be used to implement a "wait loop", turning a "try" function
327 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
328 /// "recv" or "send").
329 ///
330 /// In particular, this function correctly *registers* interest in the [`WaitCell`]
331 /// prior to polling the function, ensuring that there is not a chance of a race
332 /// where the condition occurs AFTER checking but BEFORE registering interest
333 /// in the [`WaitCell`], which could lead to deadlock.
334 ///
335 /// This is intended to have similar behavior to `Condvar` in the standard library,
336 /// but asynchronous, and not requiring operating system intervention (or existence).
337 ///
338 /// In particular, this can be used in cases where interrupts or events are used
339 /// to signify readiness or completion of some task, such as the completion of a
340 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
341 /// can wake the cell, allowing the polling function to check status fields for
342 /// partial progress or completion, and also return the status flags at the same time.
343 ///
344 /// Consider using [`Self::wait_for()`] if your function does not return a value.
345 ///
346 // Consider using [`WaitQueue::wait_for_value()`](super::wait_queue::WaitQueue::wait_for_value) if you need multiple waiters.
347 ///
348 /// # Errors
349 ///
350 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed.
351 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
352 loop {
353 let wait = self.subscribe().await;
354 if let Some(t) = f() {
355 return Ok(t);
356 }
357 wait.await?;
358 }
359 }
360
361 #[tracing::instrument]
362 fn take_waker(&self, close: bool) -> Option<Waker> {
363 // Set the WAKING bit (to indicate that we're touching the waker) and
364 // the WOKEN bit (to indicate that we intend to wake it up).
365 let state = {
366 let mut bits = State::WAKING | State::WOKEN;
367 if close {
368 bits.0 |= State::CLOSED.0;
369 }
370 self.fetch_or(bits, Ordering::AcqRel)
371 };
372
373 // Is anyone else touching the waker?
374 if !state.contains(State::WAKING | State::REGISTERING | State::CLOSED) {
375 // Safety: No one else is touching the waker right now, so it is safe to access it
376 // mutably
377 let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() });
378
379 // Release the lock.
380 self.fetch_and(!State::WAKING, Ordering::Release);
381
382 if let Some(waker) = waker {
383 tracing::trace!(wait_cell = ?self, ?close, ?waker, "took_waker");
384 return Some(waker);
385 }
386 }
387
388 None
389 }
390
391 #[tracing::instrument]
392 fn replace_waker(&self, waker: Waker) -> Option<Waker> {
393 // Set the WAKING bit (to indicate that we're touching the waker) and
394 // the WOKEN bit (to indicate that we intend to wake it up).
395 let state = self.fetch_or(State::WAKING, Ordering::AcqRel);
396
397 // Is anyone else touching the waker?
398 if !state.contains(State::WAKING | State::REGISTERING | State::CLOSED) {
399 // Safety: No one else is touching the waker right now, so it is safe to access it
400 // mutably
401 let prev_waker = self.waker.with_mut(|old_waker| unsafe {
402 match &mut *old_waker {
403 Some(old_waker) if waker.will_wake(old_waker) => None,
404 old => old.replace(waker.clone()),
405 }
406 });
407
408 // Release the lock.
409 self.fetch_and(!State::WAKING, Ordering::Release);
410
411 tracing::trace!(wait_cell = ?self, ?prev_waker, ?waker, "replaced_waker");
412 return prev_waker;
413 }
414
415 None
416 }
417
418 #[inline(always)]
419 fn compare_exchange(&self, curr: State, new: State, success: Ordering) -> Result<State, State> {
420 self.state
421 .0
422 .compare_exchange(curr.bits(), new.bits(), success, Ordering::Acquire)
423 .map(State::from_bits_retain)
424 .map_err(State::from_bits_retain)
425 }
426
427 #[inline(always)]
428 fn fetch_and(&self, state: State, order: Ordering) -> State {
429 State::from_bits_retain(self.state.0.fetch_and(state.bits(), order))
430 }
431
432 #[inline(always)]
433 fn fetch_or(&self, state: State, order: Ordering) -> State {
434 State::from_bits_retain(self.state.0.fetch_or(state.bits(), order))
435 }
436
437 #[inline(always)]
438 fn current_state(&self) -> State {
439 State::from_bits_retain(self.state.0.load(Ordering::Acquire))
440 }
441}
442
443impl Default for WaitCell {
444 fn default() -> Self {
445 WaitCell::new()
446 }
447}
448
449impl RefUnwindSafe for WaitCell {}
450impl UnwindSafe for WaitCell {}
451
452// Safety: `WaitCell` synchronizes all accesses through atomic operations
453unsafe impl Send for WaitCell {}
454// Safety: `WaitCell` synchronizes all accesses through atomic operations
455unsafe impl Sync for WaitCell {}
456
457impl fmt::Debug for WaitCell {
458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459 f.debug_struct("WaitCell")
460 .field("state", &self.current_state())
461 .finish_non_exhaustive()
462 }
463}
464
465impl Drop for WaitCell {
466 fn drop(&mut self) {
467 self.close();
468 }
469}
470
471// === impl Wait ===
472
473impl Future for Wait<'_> {
474 type Output = Result<(), Closed>;
475
476 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
477 // Did a wakeup occur while we were pre-registering the future?
478 if self.presubscribe.is_ready() {
479 return self.presubscribe;
480 }
481
482 // Okay, actually poll the cell, then.
483 match task::ready!(self.cell.poll_wait(cx)) {
484 Ok(()) => Poll::Ready(Ok(())),
485 Err(PollWaitError::Closed) => Poll::Ready(Err(Closed(()))),
486 Err(PollWaitError::Busy) => {
487 // If some other task was registering, yield and try to re-register
488 // our waker when that task is done.
489 cx.waker().wake_by_ref();
490 Poll::Pending
491 }
492 }
493 }
494}
495
496// === impl Subscribe ===
497
498impl<'cell> Future for Subscribe<'cell> {
499 type Output = Wait<'cell>;
500
501 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
502 // Pre-register the waker in the cell.
503 let presubscribe = match self.cell.poll_wait(cx) {
504 Poll::Ready(Err(PollWaitError::Busy)) => {
505 // Someone else is in the process of registering. Yield now so we
506 // can wait until that task is done, and then try again.
507 cx.waker().wake_by_ref();
508 return Poll::Pending;
509 }
510 Poll::Ready(Err(PollWaitError::Closed)) => Poll::Ready(Err(Closed(()))),
511 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
512 Poll::Pending => Poll::Pending,
513 };
514
515 Poll::Ready(Wait {
516 cell: self.cell,
517 presubscribe,
518 })
519 }
520}