Next Generation WASM Microkernel Operating System
1// Copyright 2025 Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::error::Closed;
9use crate::sync::wake_batch::WakeBatch;
10use alloc::sync::Arc;
11use cordyceps::{Linked, List, list};
12use core::cell::UnsafeCell;
13use core::marker::PhantomPinned;
14use core::pin::Pin;
15use core::ptr::NonNull;
16use core::sync::atomic::{AtomicUsize, Ordering};
17use core::task::{Context, Poll, Waker};
18use core::{fmt, mem, ptr};
19use mycelium_bitfield::{FromBits, bitfield, enum_from_bits};
20use pin_project::{pin_project, pinned_drop};
21use spin::{Mutex, MutexGuard};
22use util::{CachePadded, loom_const_fn};
23
24/// A queue of waiting tasks which can be [woken in first-in, first-out
25/// order][wake], or [all at once][wake_all].
26///
27/// This type is taken from [maitake-sync](https://github.com/hawkw/mycelium/blob/dd0020892564c77ee4c20ffbc2f7f5b046ad54c8/maitake-sync/src/wait_queue.rs#L1577).
28///
29/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be
30/// woken when some event occurs, either [individually][wake] in first-in,
31/// first-out order, or [all at once][wake_all]. This makes it a vital building
32/// block of runtime services (such as timers or I/O resources), where it may be
33/// used to wake a set of tasks when a timer completes or when a resource
34/// becomes available. It can be equally useful for implementing higher-level
35/// synchronization primitives: for example, a `WaitQueue` plus an
36/// [`UnsafeCell`] is essentially an entire implementation of a fair
37/// asynchronous mutex. Finally, a `WaitQueue` can be a useful
38/// synchronization primitive on its own: sometimes, you just need to have a
39/// bunch of tasks wait for something and then wake them all up.
40///
41/// # Implementation Notes
42///
43/// This type is currently implemented using [intrusive doubly-linked
44/// list][ilist].
45///
46/// The *[intrusive]* aspect of this map is important, as it means that it does
47/// not allocate memory. Instead, nodes in the linked list are stored in the
48/// futures of tasks trying to wait for capacity. This means that it is not
49/// necessary to allocate any heap memory for each task waiting to be woken.
50///
51/// However, the intrusive linked list introduces one new danger: because
52/// futures can be *cancelled*, and the linked list nodes live within the
53/// futures trying to wait on the queue, we *must* ensure that the node
54/// is unlinked from the list before dropping a cancelled future. Failure to do
55/// so would result in the list containing dangling pointers. Therefore, we must
56/// use a *doubly-linked* list, so that nodes can edit both the previous and
57/// next node when they have to remove themselves. This is kind of a bummer, as
58/// it means we can't use something nice like this [intrusive queue by Dmitry
59/// Vyukov][2], and there are not really practical designs for lock-free
60/// doubly-linked lists that don't rely on some kind of deferred reclamation
61/// scheme such as hazard pointers or QSBR.
62///
63/// Instead, we just stick a [`Mutex`] around the linked list, which must be
64/// acquired to pop nodes from it, or for nodes to remove themselves when
65/// futures are cancelled. This is a bit sad, but the critical sections for this
66/// mutex are short enough that we still get pretty good performance despite it.
67///
68/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
69/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
70/// [`Waker`]: Waker
71/// [wait]: WaitQueue::wait
72/// [wake]: WaitQueue::wake
73/// [wake_all]: WaitQueue::wake_all
74/// [`UnsafeCell`]: UnsafeCell
75/// [ilist]: linked_list::List
76#[derive(Debug)]
77pub struct WaitQueue {
78 /// The wait maps's state variable.
79 state: CachePadded<AtomicUsize>,
80 /// The linked list of waiters.
81 ///
82 /// # Safety
83 ///
84 /// This is protected by a mutex; the mutex *must* be acquired when
85 /// manipulating the linked list, OR when manipulating waiter nodes that may
86 /// be linked into the list. If a node is known to not be linked, it is safe
87 /// to modify that node (such as by waking the stored [`Waker`]) without
88 /// holding the lock; otherwise, it may be modified through the list, so the
89 /// lock must be held when modifying the
90 /// node.
91 queue: Mutex<List<Waiter>>,
92}
93
94bitfield! {
95 #[derive(Eq, PartialEq)]
96 struct State<usize> {
97 /// The queue's state.
98 const INNER: StateInner;
99
100 /// The number of times [`WaitQueue::wake_all`] has been called.
101 const WAKE_ALLS = ..;
102 }
103}
104
105/// The queue's current state.
106#[derive(Debug, Copy, Clone, Eq, PartialEq)]
107#[repr(u8)]
108enum StateInner {
109 /// No waiters are queued, and there is no pending notification.
110 /// Waiting while the queue is in this state will enqueue the waiter;
111 /// notifying while in this state will store a pending notification in the
112 /// queue, transitioning to [`StateInner::Woken`].
113 Empty = 0b00,
114 /// There are one or more waiters in the queue. Waiting while
115 /// the queue is in this state will not transition the state. Waking while
116 /// in this state will wake the first waiter in the queue; if this empties
117 /// the queue, then the queue will transition to [`StateInner::Empty`].
118 Waiting = 0b01,
119 /// The queue has a stored notification. Waiting while the queue
120 /// is in this state will consume the pending notification *without*
121 /// enqueueing the waiter and transition the queue to [`StateInner::Empty`].
122 /// Waking while in this state will leave the queue in this state.
123 Woken = 0b10,
124 /// The queue is closed. Waiting while in this state will return
125 /// [`Closed`] without transitioning the queue's state.
126 ///
127 /// *Note*: This *must* correspond to all state bits being set, as it's set
128 /// via a [`fetch_or`].
129 ///
130 /// [`fetch_or`]: AtomicUsize::fetch_or
131 Closed = 0b11,
132}
133
134#[pin_project(PinnedDrop)]
135#[derive(Debug)]
136#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
137pub struct Wait<'a> {
138 // The [`WaitQueue`] being waited on.
139 queue: &'a WaitQueue,
140 // Entry in the wait queue linked list.
141 #[pin]
142 waiter: Waiter,
143}
144
145/// Future returned from [`WaitQueue::wait_owned()`].
146///
147/// This is identical to the [`Wait`] future, except that it takes an
148/// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to
149/// live for the `'static` lifetime.
150///
151/// This future is fused, so once it has completed, any future calls to poll
152/// will immediately return [`Poll::Ready`].
153///
154/// # Notes
155///
156/// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a
157/// `WaitOwned` future once it has been polled. For instance, the following
158/// code must not compile:
159///
160///```compile_fail
161/// use maitake_sync::wait_queue::WaitOwned;
162///
163/// // Calls to this function should only compile if `T` is `Unpin`.
164/// fn assert_unpin<T: Unpin>() {}
165///
166/// assert_unpin::<WaitOwned<'_>>();
167/// ```
168#[pin_project(PinnedDrop)]
169#[derive(Debug)]
170pub struct WaitOwned {
171 // The `WaitQueue` being waited on.
172 queue: Arc<WaitQueue>,
173 // Entry in the wait queue.
174 #[pin]
175 waiter: Waiter,
176}
177
178/// A waiter node which may be linked into a wait queue.
179#[pin_project]
180#[repr(C)]
181struct Waiter {
182 // The intrusive linked list node.
183 //
184 // This *must* be the first field in the struct in order for the `Linked`
185 // implementation to be sound.
186 #[pin]
187 node: UnsafeCell<WaiterInner>,
188 // The future's state.
189 state: WaitState,
190}
191
192struct WaiterInner {
193 /// Intrusive linked list pointers.
194 links: list::Links<Waiter>,
195 /// The node's waker
196 waker: Wakeup,
197 // This type is !Unpin due to the heuristic from:
198 // <https://github.com/rust-lang/rust/pull/82834>
199 _pin: PhantomPinned,
200}
201
202bitfield! {
203 #[derive(Eq, PartialEq)]
204 struct WaitState<usize> {
205 /// The waiter's state.
206 const INNER: WaitStateInner;
207 /// The number of times [`WaitQueue::wake_all`] has been called.
208 const WAKE_ALLS = ..;
209 }
210}
211
212enum_from_bits! {
213 /// The state of a [`Waiter`] node in a [`WaitQueue`].
214 #[derive(Debug, Eq, PartialEq)]
215 enum WaitStateInner<u8> {
216 /// The waiter has not yet been enqueued.
217 ///
218 /// The number of times [`WaitQueue::wake_all`] has been called is stored
219 /// when the node is created, in order to determine whether it was woken by
220 /// a stored wakeup when enqueueing.
221 ///
222 /// When in this state, the node is **not** part of the linked list, and
223 /// can be dropped without removing it from the list.
224 Start = 0b00,
225
226 /// The waiter is waiting.
227 ///
228 /// When in this state, the node **is** part of the linked list. If the
229 /// node is dropped in this state, it **must** be removed from the list
230 /// before dropping it. Failure to ensure this will result in dangling
231 /// pointers in the linked list!
232 Waiting = 0b01,
233
234 /// The waiter has been woken.
235 ///
236 /// When in this state, the node is **not** part of the linked list, and
237 /// can be dropped without removing it from the list.
238 Woken = 0b10,
239 }
240}
241
242#[derive(Clone, Debug)]
243enum Wakeup {
244 Empty,
245 Waiting(Waker),
246 One,
247 All,
248 // Closed,
249}
250
251// === impl WaitQueue ===
252
253impl Default for WaitQueue {
254 fn default() -> Self {
255 Self::new()
256 }
257}
258
259impl WaitQueue {
260 loom_const_fn! {
261 pub const fn new() -> Self {
262 Self {
263 state: CachePadded(AtomicUsize::new(StateInner::Empty.into_usize())),
264 queue: Mutex::new(List::new()),
265 }
266 }
267 }
268
269 /// Wake the next task in the queue.
270 ///
271 /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the
272 /// **next** call to [`wait().await`] will complete immediately. If one or more
273 /// tasks are currently in the queue, the first task in the queue is woken.
274 ///
275 /// At most one wakeup will be stored in the queue at any time. If `wake()`
276 /// is called many times while there are no tasks in the queue, only a
277 /// single wakeup is stored.
278 ///
279 /// [`wait().await`]: Self::wait()
280 #[inline]
281 pub fn wake(&self) {
282 // snapshot the queue's current state.
283 let mut state = self.load();
284
285 // check if any tasks are currently waiting on this queue. if there are
286 // no waiting tasks, store the wakeup to be consumed by the next call to
287 // `wait`.
288 loop {
289 match state.get(State::INNER) {
290 // if the queue is closed, bail.
291 StateInner::Closed => return,
292 // if there are waiting tasks, break out of the loop and wake one.
293 StateInner::Waiting => break,
294 _ => {}
295 }
296
297 let next = state.with_inner(StateInner::Woken);
298 // advance the state to `Woken`, and return (if we did so
299 // successfully)
300 match self.compare_exchange(state, next) {
301 Ok(_) => return,
302 Err(actual) => state = actual,
303 }
304 }
305
306 // okay, there are tasks waiting on the queue; we must acquire the lock
307 // on the linked list and wake the next task from the queue.
308 let mut queue = self.queue.lock();
309
310 // the queue's state may have changed while we were waiting to acquire
311 // the lock, so we need to acquire a new snapshot before we take the
312 // waker.
313 state = self.load();
314 let waker = self.wake_locked(&mut queue, state);
315 drop(queue);
316
317 //now that we've released the lock, wake the waiting task (if we
318 //actually deuqueued one).
319 if let Some(waker) = waker {
320 waker.wake();
321 }
322 }
323
324 /// Wake *all* tasks currently in the queue.
325 ///
326 /// All tasks currently waiting on the queue are woken. Unlike [`wake()`], a
327 /// wakeup is *not* stored in the queue to wake the next call to [`wait()`]
328 /// if the queue is empty. Instead, this method only wakes all currently
329 /// registered waiters. Registering a task to be woken is done by `await`ing
330 /// the [`Future`] returned by the [`wait()`] method on this queue.
331 ///
332 /// [`wake()`]: Self::wake
333 /// [`wait()`]: Self::wait
334 #[expect(clippy::missing_panics_doc, reason = "internal assertion")]
335 pub fn wake_all(&self) -> usize {
336 let mut batch = WakeBatch::new();
337 let mut waiters_remaining = true;
338
339 let mut queue = self.queue.lock();
340 let state = self.load();
341
342 match state.get(State::INNER) {
343 StateInner::Woken | StateInner::Empty => {
344 self.state
345 .0
346 .fetch_add(State::ONE_WAKE_ALL, Ordering::SeqCst);
347 return 0;
348 }
349 StateInner::Closed => return 0,
350 StateInner::Waiting => {
351 let next_state = State::new()
352 .with_inner(StateInner::Empty)
353 .with(State::WAKE_ALLS, state.get(State::WAKE_ALLS) + 1);
354 self.compare_exchange(state, next_state)
355 .expect("state should not have transitioned while locked");
356 }
357 }
358
359 // As long as there are waiters remaining to wake, lock the queue, drain
360 // another batch, release the lock, and wake them.
361 let mut woken = 0;
362 while waiters_remaining {
363 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All);
364 woken += batch.len();
365 MutexGuard::unlocked(&mut queue, || batch.wake_all());
366 }
367
368 woken
369 }
370
371 /// Wait to be woken up by this queue.
372 ///
373 /// Equivalent to:
374 ///
375 /// ```ignore
376 /// async fn wait(&self);
377 /// ```
378 ///
379 /// This returns a [`Wait`] [`Future`] that will complete when the task is
380 /// woken by a call to [`wake()`] or [`wake_all()`], or when the `WaitQueue`
381 /// is dropped.
382 ///
383 /// Each `WaitQueue` holds a single wakeup. If [`wake()`] was previously
384 /// called while no tasks were waiting on the queue, then `wait().await`
385 /// will complete immediately, consuming the stored wakeup. Otherwise,
386 /// `wait().await` waits to be woken by the next call to [`wake()`] or
387 /// [`wake_all()`].
388 ///
389 /// The [`Wait`] future is not guaranteed to receive wakeups from calls to
390 /// [`wake()`] if it has not yet been polled. See the documentation for the
391 /// [`Wait::subscribe()`] method for details on receiving wakeups from the
392 /// queue prior to polling the `Wait` future for the first time.
393 ///
394 /// A `Wait` future **is** is guaranteed to receive wakeups from calls to
395 /// [`wake_all()`] as soon as it is created, even if it has not yet been
396 /// polled.
397 ///
398 /// # Returns
399 ///
400 /// The [`Future`] returned by this method completes with one of the
401 /// following [outputs](Future::Output):
402 ///
403 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
404 /// [`wake_all()`].
405 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` being
406 /// [`close`d](WaitQueue::close).
407 ///
408 /// # Cancellation
409 ///
410 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the order
411 /// that they started to wait. If a [`Wait`] future is dropped, the task
412 /// will forfeit its position in the queue.
413 ///
414 /// [`wake()`]: Self::wake
415 /// [`wake_all()`]: Self::wake_all
416 pub fn wait(&self) -> Wait<'_> {
417 Wait {
418 queue: self,
419 waiter: self.waiter(),
420 }
421 }
422
423 /// Wait to be woken up by this queue, returning a future that's valid
424 /// for the `'static` lifetime.
425 ///
426 /// This returns a [`WaitOwned`] future that will complete when the task
427 /// is woken by a call to [`wake()`] or [`wake_all()`], or when the
428 /// `WaitQueue` is [closed].
429 ///
430 /// This is identical to the [`wait()`] method, except that it takes a
431 /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future
432 /// to live for the `'static` lifetime. See the documentation for
433 /// [`wait()`] for details on how to use the future returned by this
434 /// method.
435 ///
436 /// # Returns
437 ///
438 /// The [`Future`] returned by this method completes with one of the
439 /// following [outputs](Future::Output):
440 ///
441 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
442 /// [`wake_all()`].
443 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue`
444 /// being [closed].
445 ///
446 /// # Cancellation
447 ///
448 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the
449 /// order that they started to wait. If a [`WaitOwned`] future is
450 /// dropped, the task will forfeit its position in the queue.
451 ///
452 /// [`wake()`]: Self::wake
453 /// [`wake_all()`]: Self::wake_all
454 /// [`wait()`]: Self::wait
455 /// [closed]: Self::close
456 pub fn wait_owned(self: &Arc<Self>) -> WaitOwned {
457 let waiter = self.waiter();
458 let queue = self.clone();
459 WaitOwned { queue, waiter }
460 }
461
462 /// Asynchronously poll the given function `f` until a condition occurs,
463 /// using the [`WaitQueue`] to only re-poll when notified.
464 ///
465 /// This can be used to implement a "wait loop", turning a "try" function
466 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
467 /// "recv" or "send").
468 ///
469 /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
470 /// prior to polling the function, ensuring that there is not a chance of a race
471 /// where the condition occurs AFTER checking but BEFORE registering interest
472 /// in the [`WaitQueue`], which could lead to deadlock.
473 ///
474 /// This is intended to have similar behavior to `Condvar` in the standard library,
475 /// but asynchronous, and not requiring operating system intervention (or existence).
476 ///
477 /// In particular, this can be used in cases where interrupts or events are used
478 /// to signify readiness or completion of some task, such as the completion of a
479 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
480 /// can wake the queue, allowing the polling function to check status fields for
481 /// partial progress or completion.
482 ///
483 /// Consider using [`Self::wait_for_value()`] if your function does return a value.
484 ///
485 /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for)
486 /// if you do not need multiple waiters.
487 ///
488 /// # Errors
489 ///
490 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed.
491 ///
492 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
493 loop {
494 let wait = self.wait();
495 let mut wait = core::pin::pin!(wait);
496 let _ = wait.as_mut().subscribe()?;
497 if f() {
498 return Ok(());
499 }
500 wait.await?;
501 }
502 }
503
504 /// Asynchronously poll the given function `f` until a condition occurs,
505 /// using the [`WaitQueue`] to only re-poll when notified.
506 ///
507 /// This can be used to implement a "wait loop", turning a "try" function
508 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
509 /// "recv" or "send").
510 ///
511 /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
512 /// prior to polling the function, ensuring that there is not a chance of a race
513 /// where the condition occurs AFTER checking but BEFORE registering interest
514 /// in the [`WaitQueue`], which could lead to deadlock.
515 ///
516 /// This is intended to have similar behavior to `Condvar` in the standard library,
517 /// but asynchronous, and not requiring operating system intervention (or existence).
518 ///
519 /// In particular, this can be used in cases where interrupts or events are used
520 /// to signify readiness or completion of some task, such as the completion of a
521 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
522 /// can wake the queue, allowing the polling function to check status fields for
523 /// partial progress or completion, and also return the status flags at the same time.
524 ///
525 /// Consider using [`Self::wait_for()`] if your function does not return a value.
526 ///
527 /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value)
528 /// if you do not need multiple waiters.
529 ///
530 /// # Errors
531 ///
532 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed.
533 ///
534 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
535 loop {
536 let wait = self.wait();
537 let mut wait = core::pin::pin!(wait);
538 match wait.as_mut().subscribe() {
539 Poll::Ready(wr) => wr?,
540 Poll::Pending => {}
541 }
542 if let Some(t) = f() {
543 return Ok(t);
544 }
545 wait.await?;
546 }
547 }
548
549 /// Close the queue, indicating that it may no longer be used.
550 ///
551 /// Once a queue is closed, all [`wait()`] calls (current or future) will
552 /// return an error.
553 ///
554 /// This method is generally used when implementing higher-level
555 /// synchronization primitives or resources: when an event makes a resource
556 /// permanently unavailable, the queue can be closed.
557 ///
558 /// [`wait()`]: Self::wait
559 pub fn close(&self) {
560 let state = self
561 .state
562 .0
563 .fetch_or(StateInner::Closed.into_bits(), Ordering::SeqCst);
564 let state = State::from_bits(state);
565 if state.get(State::INNER) != StateInner::Waiting {
566 return;
567 }
568
569 let mut queue = self.queue.lock();
570 let mut batch = WakeBatch::new();
571 let mut waiters_remaining = true;
572
573 while waiters_remaining {
574 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All);
575 MutexGuard::unlocked(&mut queue, || batch.wake_all());
576 }
577 }
578
579 /// Returns `true` if this `WaitQueue` is [closed](Self::close).
580 #[must_use]
581 pub fn is_closed(&self) -> bool {
582 self.load().get(State::INNER) == StateInner::Closed
583 }
584
585 /// Returns a [`Waiter`] entry in this queue.
586 ///
587 /// This is factored out into a separate function because it's used by both
588 /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`].
589 fn waiter(&self) -> Waiter {
590 // how many times has `wake_all` been called when this waiter is created?
591 let current_wake_alls = self.load().get(State::WAKE_ALLS);
592 let state = WaitState::new()
593 .with(WaitState::WAKE_ALLS, current_wake_alls)
594 .with(WaitState::INNER, WaitStateInner::Start);
595 Waiter {
596 state,
597 node: UnsafeCell::new(WaiterInner {
598 links: list::Links::new(),
599 waker: Wakeup::Empty,
600 _pin: PhantomPinned,
601 }),
602 }
603 }
604
605 // fn try_wait(&self) -> Poll<Result<(), Closed>> {
606 // let mut state = self.load();
607 // let initial_wake_alls = state.get(State::WAKE_ALLS);
608 // while state.get(State::INNER) == StateInner::Woken {
609 // match self.compare_exchange(state, state.with_inner(StateInner::Empty)) {
610 // Ok(_) => return Poll::Ready(Ok(())),
611 // Err(actual) => state = actual,
612 // }
613 // }
614 //
615 // match state.get(State::INNER) {
616 // StateInner::Closed => Poll::Ready(Err(Closed(()))),
617 // _ if state.get(State::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())),
618 // StateInner::Empty | StateInner::Waiting => Poll::Pending,
619 // StateInner::Woken => Poll::Ready(Ok(())),
620 // }
621 // }
622
623 #[cold]
624 #[inline(never)]
625 fn wake_locked(&self, queue: &mut List<Waiter>, curr: State) -> Option<Waker> {
626 let inner = curr.get(State::INNER);
627
628 // is the queue still in the `Waiting` state? it is possible that we
629 // transitioned to a different state while locking the queue.
630 if inner != StateInner::Waiting {
631 // if there are no longer any queued tasks, try to store the
632 // wakeup in the queue and bail.
633 if let Err(actual) = self.compare_exchange(curr, curr.with_inner(StateInner::Woken)) {
634 debug_assert!(actual.get(State::INNER) != StateInner::Waiting);
635 self.store(actual.with_inner(StateInner::Woken));
636 }
637
638 return None;
639 }
640
641 // otherwise, we have to dequeue a task and wake it.
642 let node = queue
643 .pop_back()
644 .expect("if we are in the Waiting state, there must be waiters in the queue");
645 let waker = Waiter::wake(node, queue, Wakeup::One);
646
647 // if we took the final waiter currently in the queue, transition to the
648 // `Empty` state.
649 if queue.is_empty() {
650 self.store(curr.with_inner(StateInner::Empty));
651 }
652
653 waker
654 }
655
656 /// Drain waiters from `queue` and add them to `batch`. Returns `true` if
657 /// the batch was filled while more waiters remain in the queue, indicating
658 /// that this function must be called again to wake all waiters.
659 fn drain_to_wake_batch(
660 batch: &mut WakeBatch,
661 queue: &mut List<Waiter>,
662 wakeup: Wakeup,
663 ) -> bool {
664 while let Some(node) = queue.pop_back() {
665 let Some(waker) = Waiter::wake(node, queue, wakeup.clone()) else {
666 // this waiter was enqueued by `Wait::register` and doesn't have
667 // a waker, just keep going.
668 continue;
669 };
670
671 if batch.add_waker(waker) {
672 // there's still room in the wake set, just keep adding to it.
673 continue;
674 }
675
676 // wake set is full, drop the lock and wake everyone!
677 break;
678 }
679
680 !queue.is_empty()
681 }
682
683 fn load(&self) -> State {
684 State::from_bits(self.state.0.load(Ordering::SeqCst))
685 }
686
687 fn store(&self, state: State) {
688 self.state.0.store(state.0, Ordering::SeqCst);
689 }
690
691 fn compare_exchange(&self, current: State, new: State) -> Result<State, State> {
692 self.state
693 .0
694 .compare_exchange(current.0, new.0, Ordering::SeqCst, Ordering::SeqCst)
695 .map(State::from_bits)
696 .map_err(State::from_bits)
697 }
698}
699
700// === impl State & StateInner ===
701
702impl State {
703 const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit();
704
705 fn with_inner(self, inner: StateInner) -> Self {
706 self.with(Self::INNER, inner)
707 }
708}
709impl StateInner {
710 const fn into_usize(self) -> usize {
711 self as u8 as usize
712 }
713}
714
715impl FromBits<usize> for StateInner {
716 const BITS: u32 = 2;
717 type Error = core::convert::Infallible;
718
719 fn try_from_bits(bits: usize) -> Result<Self, Self::Error> {
720 #[expect(
721 clippy::cast_possible_truncation,
722 reason = "`bits` will only have 3 bits set"
723 )]
724 Ok(match bits as u8 {
725 bits if bits == Self::Empty as u8 => Self::Empty,
726 bits if bits == Self::Waiting as u8 => Self::Waiting,
727 bits if bits == Self::Woken as u8 => Self::Woken,
728 bits if bits == Self::Closed as u8 => Self::Closed,
729 _ => {
730 unreachable!();
731 }
732 })
733 }
734
735 fn into_bits(self) -> usize {
736 self as usize
737 }
738}
739
740// === impl Waiter ===
741
742impl fmt::Debug for Waiter {
743 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
744 f.debug_struct("Waiter")
745 .field("state", &self.state)
746 .finish_non_exhaustive()
747 }
748}
749
750impl Waiter {
751 /// Returns the [`Waker`] for the task that owns this `Waiter`.
752 ///
753 /// # Safety
754 ///
755 /// This is only safe to call while the list is locked. The `list`
756 /// parameter ensures this method is only called while holding the lock, so
757 /// this can be safe.
758 ///
759 /// Of course, that must be the *same* list that this waiter is a member of,
760 /// and currently, there is no way to ensure that...
761 #[inline(always)]
762 fn wake(this: NonNull<Self>, list: &mut List<Self>, wakeup: Wakeup) -> Option<Waker> {
763 Waiter::with_inner(this, list, |node| {
764 let waker = mem::replace(&mut node.waker, wakeup);
765 match waker {
766 // the node has a registered waker, so wake the task.
767 Wakeup::Waiting(waker) => Some(waker),
768 // do nothing: the node was registered by `Wait::register`
769 // without a waker, so the future will already be woken when it is
770 // actually polled.
771 Wakeup::Empty => None,
772 // the node was already woken? this should not happen and
773 // probably indicates a race!
774 _ => unreachable!("tried to wake a waiter in the {:?} state!", waker),
775 }
776 })
777 }
778
779 /// # Safety
780 ///
781 /// This is only safe to call while the list is locked. The dummy `_list`
782 /// parameter ensures this method is only called while holding the lock, so
783 /// this can be safe.
784 ///
785 /// Of course, that must be the *same* list that this waiter is a member of,
786 /// and currently, there is no way to ensure that...
787 #[inline(always)]
788 fn with_inner<T>(
789 mut this: NonNull<Self>,
790 _list: &mut List<Self>,
791 f: impl FnOnce(&mut WaiterInner) -> T,
792 ) -> T {
793 // safety: this is only called while holding the lock on the queue,
794 // so it's safe to mutate the waiter.
795 unsafe { f(&mut *this.as_mut().node.get()) }
796 }
797
798 fn poll_wait(
799 mut self: Pin<&mut Self>,
800 queue: &WaitQueue,
801 waker: Option<&Waker>,
802 ) -> Poll<Result<(), Closed>> {
803 // Safety: we never move out of `ptr` below, only mutate its fields
804 let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self.as_mut())) };
805 let mut this = self.as_mut().project();
806
807 match this.state.get(WaitState::INNER) {
808 WaitStateInner::Start => {
809 let queue_state = queue.load();
810
811 // can we consume a pending wakeup?
812 if queue
813 .compare_exchange(
814 queue_state.with_inner(StateInner::Woken),
815 queue_state.with_inner(StateInner::Empty),
816 )
817 .is_ok()
818 {
819 this.state.set(WaitState::INNER, WaitStateInner::Woken);
820 return Poll::Ready(Ok(()));
821 }
822
823 // okay, no pending wakeups. try to wait...
824
825 let mut waiters = queue.queue.lock();
826 let mut queue_state = queue.load();
827
828 // the whole queue was woken while we were trying to acquire
829 // the lock!
830 if queue_state.get(State::WAKE_ALLS) != this.state.get(WaitState::WAKE_ALLS) {
831 this.state.set(WaitState::INNER, WaitStateInner::Woken);
832 return Poll::Ready(Ok(()));
833 }
834
835 // transition the queue to the waiting state
836 'to_waiting: loop {
837 match queue_state.get(State::INNER) {
838 // the queue is `Empty`, transition to `Waiting`
839 StateInner::Empty => {
840 match queue.compare_exchange(
841 queue_state,
842 queue_state.with_inner(StateInner::Waiting),
843 ) {
844 Ok(_) => break 'to_waiting,
845 Err(actual) => queue_state = actual,
846 }
847 }
848 // the queue is already `Waiting`
849 StateInner::Waiting => break 'to_waiting,
850 // the queue was woken, consume the wakeup.
851 StateInner::Woken => {
852 match queue.compare_exchange(
853 queue_state,
854 queue_state.with_inner(StateInner::Empty),
855 ) {
856 Ok(_) => {
857 this.state.set(WaitState::INNER, WaitStateInner::Woken);
858 return Poll::Ready(Ok(()));
859 }
860 Err(actual) => queue_state = actual,
861 }
862 }
863 StateInner::Closed => return Poll::Ready(Err(Closed(()))),
864 }
865 }
866
867 // enqueue the node
868 this.state.set(WaitState::INNER, WaitStateInner::Waiting);
869 if let Some(waker) = waker {
870 // safety: we may mutate the inner state because we are
871 // holding the lock.
872 unsafe {
873 let node = this.node.as_mut().get();
874 debug_assert!(matches!((*node).waker, Wakeup::Empty));
875 (*node).waker = Wakeup::Waiting(waker.clone());
876 }
877 }
878
879 waiters.push_front(ptr);
880
881 Poll::Pending
882 }
883 WaitStateInner::Waiting => {
884 let _waiters = queue.queue.lock();
885 // safety: we may mutate the inner state because we are
886 // holding the lock.
887 unsafe {
888 let node = &mut *this.node.get();
889 match node.waker {
890 Wakeup::Waiting(ref mut curr_waker) => {
891 match waker {
892 Some(waker) if !curr_waker.will_wake(waker) => {
893 *curr_waker = waker.clone();
894 }
895 _ => {}
896 }
897 Poll::Pending
898 }
899 Wakeup::All | Wakeup::One => {
900 this.state.set(WaitState::INNER, WaitStateInner::Woken);
901 Poll::Ready(Ok(()))
902 }
903 // Wakeup::Closed => {
904 // this.state.set(WaitState::INNER, WaitStateInner::Woken);
905 // Poll::Ready(Err(Closed(())))
906 // }
907 Wakeup::Empty => {
908 if let Some(waker) = waker {
909 node.waker = Wakeup::Waiting(waker.clone());
910 }
911
912 Poll::Pending
913 }
914 }
915 }
916 }
917 WaitStateInner::Woken => Poll::Ready(Ok(())),
918 }
919 }
920
921 /// Release this `Waiter` from the queue.
922 ///
923 /// This is called from the `drop` implementation for the [`Wait`] and
924 /// [`WaitOwned`] futures.
925 fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) {
926 let state = *(self.as_mut().project().state);
927 // Safety: we never move out of `ptr` below, only mutate its fields
928 let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) });
929
930 // if we're not enqueued, we don't have to do anything else.
931 if state.get(WaitState::INNER) != WaitStateInner::Waiting {
932 return;
933 }
934
935 let mut waiters = queue.queue.lock();
936 let state = queue.load();
937 // remove the node
938 // safety: we have the lock on the queue, so this is safe.
939 unsafe {
940 waiters.remove(ptr);
941 };
942
943 // if we removed the last waiter from the queue, transition the state to
944 // `Empty`.
945 if waiters.is_empty() && state.get(State::INNER) == StateInner::Waiting {
946 queue.store(state.with_inner(StateInner::Empty));
947 }
948
949 // if the node has an unconsumed wakeup, it must be assigned to the next
950 // node in the queue.
951 let next_waiter =
952 if Waiter::with_inner(ptr, &mut waiters, |node| matches!(&node.waker, Wakeup::One)) {
953 queue.wake_locked(&mut waiters, state)
954 } else {
955 None
956 };
957
958 drop(waiters);
959
960 if let Some(next) = next_waiter {
961 next.wake();
962 }
963 }
964}
965
966// Safety: TODO
967unsafe impl Linked<list::Links<Self>> for Waiter {
968 type Handle = NonNull<Waiter>;
969
970 fn into_ptr(r: Self::Handle) -> NonNull<Self> {
971 r
972 }
973
974 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
975 ptr
976 }
977
978 unsafe fn links(target: NonNull<Self>) -> NonNull<list::Links<Waiter>> {
979 // Safety: ensured by caller
980 unsafe {
981 // Safety: using `ptr::addr_of!` avoids creating a temporary
982 // reference, which stacked borrows dislikes.
983 let node = &*ptr::addr_of!((*target.as_ptr()).node);
984 let links = ptr::addr_of_mut!((*node.get()).links);
985 // Safety: since the `target` pointer is `NonNull`, we can assume
986 // that pointers to its members are also not null, making this use
987 // of `new_unchecked` fine.
988 NonNull::new_unchecked(links)
989 }
990 }
991}
992
993// === impl Wait ===
994
995impl Wait<'_> {
996 /// Returns `true` if this `WaitOwned` future is waiting for a
997 /// notification from the provided [`WaitQueue`].
998 #[inline]
999 #[must_use]
1000 pub fn waits_on(&self, queue: &WaitQueue) -> bool {
1001 ptr::eq(self.queue, queue)
1002 }
1003
1004 /// Returns `true` if `self` and `other` are waiting on a notification
1005 /// from the same [`WaitQueue`].
1006 #[inline]
1007 #[must_use]
1008 pub fn same_queue(&self, other: &Wait<'_>) -> bool {
1009 ptr::eq(self.queue, other.queue)
1010 }
1011
1012 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1013 ///
1014 /// Polling a `Wait` future adds that future to the list of waiters that may
1015 /// receive a wakeup from a `WaitQueue`. However, in some cases, it is
1016 /// desirable to subscribe to wakeups *prior* to actually waiting for one.
1017 /// This method should be used when it is necessary to ensure a `Wait`
1018 /// future is in the list of waiters before the future is `poll`ed for the
1019 /// first time.
1020 ///
1021 /// In general, this method is used in cases where a [`WaitQueue`] must
1022 /// synchronize with some additional state, such as an `AtomicBool` or
1023 /// counter. If a task first checks that state, and then chooses whether or
1024 /// not to wait on the `WaitQueue` based on that state, then a race
1025 /// condition may occur where the `WaitQueue` wakes waiters *between* when
1026 /// the task checked the external state and when it first polled its `Wait`
1027 /// future to wait on the queue. This method allows registering the `Wait`
1028 /// future with the queue *prior* to checking the external state, without
1029 /// actually sleeping, so that when the task does wait for the `Wait` future
1030 /// to complete, it will have received any wakeup that was sent between when
1031 /// the external state was checked and the `Wait` future was first polled.
1032 ///
1033 /// # Returns
1034 ///
1035 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready` a wakeup was
1036 /// already received. This method returns [`Poll::Ready`] in the following
1037 /// cases:
1038 ///
1039 /// 1. The [`WaitQueue::wake()`] method was called between the creation of the
1040 /// `Wait` and the call to this method.
1041 /// 2. This is the first call to `subscribe` or `poll` on this future, and the
1042 /// `WaitQueue` was holding a stored wakeup from a previous call to
1043 /// [`wake()`]. This method consumes the wakeup in that case.
1044 /// 3. The future has previously been `subscribe`d or polled, and it has since
1045 /// then been marked ready by either consuming a wakeup from the
1046 /// `WaitQueue`, or by a call to [`wake()`] or [`wake_all()`] that
1047 /// removed it from the list of futures ready to receive wakeups.
1048 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which case
1049 /// this method returns `Poll::Ready(Err(Closed))`.
1050 ///
1051 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of this
1052 /// `Wait` future will also immediately return [`Poll::Ready`].
1053 ///
1054 /// If the [`Wait`] future subscribed to wakeups from the queue, and
1055 /// has not been woken, this method returns [`Poll::Pending`].
1056 ///
1057 /// [`wake()`]: WaitQueue::wake
1058 /// [`wake_all()`]: WaitQueue::wake_all
1059 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> {
1060 let this = self.project();
1061 this.waiter.poll_wait(this.queue, None)
1062 }
1063}
1064
1065impl Future for Wait<'_> {
1066 type Output = Result<(), Closed>;
1067
1068 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1069 let this = self.project();
1070 this.waiter.poll_wait(this.queue, Some(cx.waker()))
1071 }
1072}
1073
1074#[pinned_drop]
1075impl PinnedDrop for Wait<'_> {
1076 fn drop(mut self: Pin<&mut Self>) {
1077 let this = self.project();
1078 this.waiter.release(this.queue);
1079 }
1080}
1081
1082// === impl WaitOwned ===
1083
1084impl WaitOwned {
1085 /// Returns `true` if this `WaitOwned` future is waiting for a
1086 /// notification from the provided [`WaitQueue`].
1087 #[inline]
1088 #[must_use]
1089 pub fn waits_on(&self, queue: &WaitQueue) -> bool {
1090 ptr::eq(&*self.queue, queue)
1091 }
1092
1093 /// Returns `true` if `self` and `other` are waiting on a notification
1094 /// from the same [`WaitQueue`].
1095 #[inline]
1096 #[must_use]
1097 pub fn same_queue(&self, other: &Wait<'_>) -> bool {
1098 ptr::eq(&*self.queue, other.queue)
1099 }
1100
1101 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1102 ///
1103 /// Polling a `WaitOwned` future adds that future to the list of waiters
1104 /// that may receive a wakeup from a `WaitQueue`. However, in some
1105 /// cases, it is desirable to subscribe to wakeups *prior* to actually
1106 /// waiting for one. This method should be used when it is necessary to
1107 /// ensure a `WaitOwned` future is in the list of waiters before the
1108 /// future is `poll`ed for the rst time.
1109 ///
1110 /// In general, this method is used in cases where a [`WaitQueue`] must
1111 /// synchronize with some additional state, such as an `AtomicBool` or
1112 /// counter. If a task first checks that state, and then chooses whether or
1113 /// not to wait on the `WaitQueue` based on that state, then a race
1114 /// condition may occur where the `WaitQueue` wakes waiters *between* when
1115 /// the task checked the external state and when it first polled its
1116 /// `WaitOwned` future to wait on the queue. This method allows
1117 /// registering the `WaitOwned` future with the queue *prior* to
1118 /// checking the external state, without actually sleeping, so that when
1119 /// the task does wait for the `WaitOwned` future to complete, it will
1120 /// have received any wakeup that was sent between when the external
1121 /// state was checked and the `WaitOwned` future was first polled.
1122 ///
1123 /// # Returns
1124 ///
1125 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready`
1126 /// a wakeup was already received. This method returns [`Poll::Ready`]
1127 /// in the following cases:
1128 ///
1129 /// 1. The [`WaitQueue::wake()`] method was called between the creation
1130 /// of the `WaitOwned` future and the call to this method.
1131 /// 2. This is the first call to `subscribe` or `poll` on this future,
1132 /// and the `WaitQueue` was holding a stored wakeup from a previous
1133 /// call to [`wake()`]. This method consumes the wakeup in that case.
1134 /// 3. The future has previously been `subscribe`d or polled, and it
1135 /// has since then been marked ready by either consuming a wakeup
1136 /// from the `WaitQueue`, or by a call to [`wake()`] or
1137 /// [`wake_all()`] that removed it from the list of futures ready to
1138 /// receive wakeups.
1139 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which
1140 /// case this method returns `Poll::Ready(Err(Closed))`.
1141 ///
1142 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of
1143 /// this `Wait` future will also immediately return [`Poll::Ready`].
1144 ///
1145 /// If the [`WaitOwned`] future subscribed to wakeups from the queue,
1146 /// and has not been woken, this method returns [`Poll::Pending`].
1147 ///
1148 /// [`wake()`]: WaitQueue::wake
1149 /// [`wake_all()`]: WaitQueue::wake_all
1150 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> {
1151 let this = self.project();
1152 this.waiter.poll_wait(this.queue, None)
1153 }
1154}
1155
1156impl Future for WaitOwned {
1157 type Output = Result<(), Closed>;
1158
1159 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1160 let this = self.project();
1161 this.waiter.poll_wait(&*this.queue, Some(cx.waker()))
1162 }
1163}
1164
1165#[pinned_drop]
1166impl PinnedDrop for WaitOwned {
1167 fn drop(mut self: Pin<&mut Self>) {
1168 let this = self.project();
1169 this.waiter.release(&*this.queue);
1170 }
1171}