Next Generation WASM Microkernel Operating System
1// Copyright 2025 Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use alloc::sync::Arc;
9use core::cell::UnsafeCell;
10use core::marker::PhantomPinned;
11use core::pin::Pin;
12use core::ptr::NonNull;
13use core::sync::atomic::{AtomicUsize, Ordering};
14use core::task::{Context, Poll, Waker};
15use core::{fmt, mem, ptr};
16
17use cordyceps::{Linked, List, list};
18use k23_spin::{Mutex, MutexGuard};
19use k32_util::{CachePadded, loom_const_fn};
20use mycelium_bitfield::{FromBits, bitfield, enum_from_bits};
21use pin_project::{pin_project, pinned_drop};
22
23use crate::error::Closed;
24use crate::sync::wake_batch::WakeBatch;
25
26/// A queue of waiting tasks which can be [woken in first-in, first-out
27/// order][wake], or [all at once][wake_all].
28///
29/// This type is taken from [maitake-sync](https://github.com/hawkw/mycelium/blob/dd0020892564c77ee4c20ffbc2f7f5b046ad54c8/maitake-sync/src/wait_queue.rs#L1577).
30///
31/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be
32/// woken when some event occurs, either [individually][wake] in first-in,
33/// first-out order, or [all at once][wake_all]. This makes it a vital building
34/// block of runtime services (such as timers or I/O resources), where it may be
35/// used to wake a set of tasks when a timer completes or when a resource
36/// becomes available. It can be equally useful for implementing higher-level
37/// synchronization primitives: for example, a `WaitQueue` plus an
38/// [`UnsafeCell`] is essentially an entire implementation of a fair
39/// asynchronous mutex. Finally, a `WaitQueue` can be a useful
40/// synchronization primitive on its own: sometimes, you just need to have a
41/// bunch of tasks wait for something and then wake them all up.
42///
43/// # Implementation Notes
44///
45/// This type is currently implemented using [intrusive doubly-linked
46/// list][ilist].
47///
48/// The *[intrusive]* aspect of this map is important, as it means that it does
49/// not allocate memory. Instead, nodes in the linked list are stored in the
50/// futures of tasks trying to wait for capacity. This means that it is not
51/// necessary to allocate any heap memory for each task waiting to be woken.
52///
53/// However, the intrusive linked list introduces one new danger: because
54/// futures can be *cancelled*, and the linked list nodes live within the
55/// futures trying to wait on the queue, we *must* ensure that the node
56/// is unlinked from the list before dropping a cancelled future. Failure to do
57/// so would result in the list containing dangling pointers. Therefore, we must
58/// use a *doubly-linked* list, so that nodes can edit both the previous and
59/// next node when they have to remove themselves. This is kind of a bummer, as
60/// it means we can't use something nice like this [intrusive queue by Dmitry
61/// Vyukov][2], and there are not really practical designs for lock-free
62/// doubly-linked lists that don't rely on some kind of deferred reclamation
63/// scheme such as hazard pointers or QSBR.
64///
65/// Instead, we just stick a [`Mutex`] around the linked list, which must be
66/// acquired to pop nodes from it, or for nodes to remove themselves when
67/// futures are cancelled. This is a bit sad, but the critical sections for this
68/// mutex are short enough that we still get pretty good performance despite it.
69///
70/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction
71/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
72/// [`Waker`]: Waker
73/// [wait]: WaitQueue::wait
74/// [wake]: WaitQueue::wake
75/// [wake_all]: WaitQueue::wake_all
76/// [`UnsafeCell`]: UnsafeCell
77/// [ilist]: linked_list::List
78#[derive(Debug)]
79pub struct WaitQueue {
80 /// The wait maps's state variable.
81 state: CachePadded<AtomicUsize>,
82 /// The linked list of waiters.
83 ///
84 /// # Safety
85 ///
86 /// This is protected by a mutex; the mutex *must* be acquired when
87 /// manipulating the linked list, OR when manipulating waiter nodes that may
88 /// be linked into the list. If a node is known to not be linked, it is safe
89 /// to modify that node (such as by waking the stored [`Waker`]) without
90 /// holding the lock; otherwise, it may be modified through the list, so the
91 /// lock must be held when modifying the
92 /// node.
93 queue: Mutex<List<Waiter>>,
94}
95
96bitfield! {
97 #[derive(Eq, PartialEq)]
98 struct State<usize> {
99 /// The queue's state.
100 const INNER: StateInner;
101
102 /// The number of times [`WaitQueue::wake_all`] has been called.
103 const WAKE_ALLS = ..;
104 }
105}
106
107/// The queue's current state.
108#[derive(Debug, Copy, Clone, Eq, PartialEq)]
109#[repr(u8)]
110enum StateInner {
111 /// No waiters are queued, and there is no pending notification.
112 /// Waiting while the queue is in this state will enqueue the waiter;
113 /// notifying while in this state will store a pending notification in the
114 /// queue, transitioning to [`StateInner::Woken`].
115 Empty = 0b00,
116 /// There are one or more waiters in the queue. Waiting while
117 /// the queue is in this state will not transition the state. Waking while
118 /// in this state will wake the first waiter in the queue; if this empties
119 /// the queue, then the queue will transition to [`StateInner::Empty`].
120 Waiting = 0b01,
121 /// The queue has a stored notification. Waiting while the queue
122 /// is in this state will consume the pending notification *without*
123 /// enqueueing the waiter and transition the queue to [`StateInner::Empty`].
124 /// Waking while in this state will leave the queue in this state.
125 Woken = 0b10,
126 /// The queue is closed. Waiting while in this state will return
127 /// [`Closed`] without transitioning the queue's state.
128 ///
129 /// *Note*: This *must* correspond to all state bits being set, as it's set
130 /// via a [`fetch_or`].
131 ///
132 /// [`fetch_or`]: AtomicUsize::fetch_or
133 Closed = 0b11,
134}
135
136#[pin_project(PinnedDrop)]
137#[derive(Debug)]
138#[must_use = "futures do nothing unless `.await`ed or `poll`ed"]
139pub struct Wait<'a> {
140 // The [`WaitQueue`] being waited on.
141 queue: &'a WaitQueue,
142 // Entry in the wait queue linked list.
143 #[pin]
144 waiter: Waiter,
145}
146
147/// Future returned from [`WaitQueue::wait_owned()`].
148///
149/// This is identical to the [`Wait`] future, except that it takes an
150/// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to
151/// live for the `'static` lifetime.
152///
153/// This future is fused, so once it has completed, any future calls to poll
154/// will immediately return [`Poll::Ready`].
155///
156/// # Notes
157///
158/// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a
159/// `WaitOwned` future once it has been polled. For instance, the following
160/// code must not compile:
161///
162///```compile_fail
163/// use maitake_sync::wait_queue::WaitOwned;
164///
165/// // Calls to this function should only compile if `T` is `Unpin`.
166/// fn assert_unpin<T: Unpin>() {}
167///
168/// assert_unpin::<WaitOwned<'_>>();
169/// ```
170#[pin_project(PinnedDrop)]
171#[derive(Debug)]
172pub struct WaitOwned {
173 // The `WaitQueue` being waited on.
174 queue: Arc<WaitQueue>,
175 // Entry in the wait queue.
176 #[pin]
177 waiter: Waiter,
178}
179
180/// A waiter node which may be linked into a wait queue.
181#[pin_project]
182#[repr(C)]
183struct Waiter {
184 // The intrusive linked list node.
185 //
186 // This *must* be the first field in the struct in order for the `Linked`
187 // implementation to be sound.
188 #[pin]
189 node: UnsafeCell<WaiterInner>,
190 // The future's state.
191 state: WaitState,
192}
193
194struct WaiterInner {
195 /// Intrusive linked list pointers.
196 links: list::Links<Waiter>,
197 /// The node's waker
198 waker: Wakeup,
199 // This type is !Unpin due to the heuristic from:
200 // <https://github.com/rust-lang/rust/pull/82834>
201 _pin: PhantomPinned,
202}
203
204bitfield! {
205 #[derive(Eq, PartialEq)]
206 struct WaitState<usize> {
207 /// The waiter's state.
208 const INNER: WaitStateInner;
209 /// The number of times [`WaitQueue::wake_all`] has been called.
210 const WAKE_ALLS = ..;
211 }
212}
213
214enum_from_bits! {
215 /// The state of a [`Waiter`] node in a [`WaitQueue`].
216 #[derive(Debug, Eq, PartialEq)]
217 enum WaitStateInner<u8> {
218 /// The waiter has not yet been enqueued.
219 ///
220 /// The number of times [`WaitQueue::wake_all`] has been called is stored
221 /// when the node is created, in order to determine whether it was woken by
222 /// a stored wakeup when enqueueing.
223 ///
224 /// When in this state, the node is **not** part of the linked list, and
225 /// can be dropped without removing it from the list.
226 Start = 0b00,
227
228 /// The waiter is waiting.
229 ///
230 /// When in this state, the node **is** part of the linked list. If the
231 /// node is dropped in this state, it **must** be removed from the list
232 /// before dropping it. Failure to ensure this will result in dangling
233 /// pointers in the linked list!
234 Waiting = 0b01,
235
236 /// The waiter has been woken.
237 ///
238 /// When in this state, the node is **not** part of the linked list, and
239 /// can be dropped without removing it from the list.
240 Woken = 0b10,
241 }
242}
243
244#[derive(Clone, Debug)]
245enum Wakeup {
246 Empty,
247 Waiting(Waker),
248 One,
249 All,
250 // Closed,
251}
252
253// === impl WaitQueue ===
254
255impl Default for WaitQueue {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261impl WaitQueue {
262 loom_const_fn! {
263 pub const fn new() -> Self {
264 Self {
265 state: CachePadded(AtomicUsize::new(StateInner::Empty.into_usize())),
266 queue: Mutex::new(List::new()),
267 }
268 }
269 }
270
271 /// Wake the next task in the queue.
272 ///
273 /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the
274 /// **next** call to [`wait().await`] will complete immediately. If one or more
275 /// tasks are currently in the queue, the first task in the queue is woken.
276 ///
277 /// At most one wakeup will be stored in the queue at any time. If `wake()`
278 /// is called many times while there are no tasks in the queue, only a
279 /// single wakeup is stored.
280 ///
281 /// [`wait().await`]: Self::wait()
282 #[inline]
283 pub fn wake(&self) {
284 // snapshot the queue's current state.
285 let mut state = self.load();
286
287 // check if any tasks are currently waiting on this queue. if there are
288 // no waiting tasks, store the wakeup to be consumed by the next call to
289 // `wait`.
290 loop {
291 match state.get(State::INNER) {
292 // if the queue is closed, bail.
293 StateInner::Closed => return,
294 // if there are waiting tasks, break out of the loop and wake one.
295 StateInner::Waiting => break,
296 _ => {}
297 }
298
299 let next = state.with_inner(StateInner::Woken);
300 // advance the state to `Woken`, and return (if we did so
301 // successfully)
302 match self.compare_exchange(state, next) {
303 Ok(_) => return,
304 Err(actual) => state = actual,
305 }
306 }
307
308 // okay, there are tasks waiting on the queue; we must acquire the lock
309 // on the linked list and wake the next task from the queue.
310 let mut queue = self.queue.lock();
311
312 // the queue's state may have changed while we were waiting to acquire
313 // the lock, so we need to acquire a new snapshot before we take the
314 // waker.
315 state = self.load();
316 let waker = self.wake_locked(&mut queue, state);
317 drop(queue);
318
319 //now that we've released the lock, wake the waiting task (if we
320 //actually deuqueued one).
321 if let Some(waker) = waker {
322 waker.wake();
323 }
324 }
325
326 /// Wake *all* tasks currently in the queue.
327 ///
328 /// All tasks currently waiting on the queue are woken. Unlike [`wake()`], a
329 /// wakeup is *not* stored in the queue to wake the next call to [`wait()`]
330 /// if the queue is empty. Instead, this method only wakes all currently
331 /// registered waiters. Registering a task to be woken is done by `await`ing
332 /// the [`Future`] returned by the [`wait()`] method on this queue.
333 ///
334 /// [`wake()`]: Self::wake
335 /// [`wait()`]: Self::wait
336 #[expect(clippy::missing_panics_doc, reason = "internal assertion")]
337 pub fn wake_all(&self) -> usize {
338 let mut batch = WakeBatch::new();
339 let mut waiters_remaining = true;
340
341 let mut queue = self.queue.lock();
342 let state = self.load();
343
344 match state.get(State::INNER) {
345 StateInner::Woken | StateInner::Empty => {
346 self.state
347 .0
348 .fetch_add(State::ONE_WAKE_ALL, Ordering::SeqCst);
349 return 0;
350 }
351 StateInner::Closed => return 0,
352 StateInner::Waiting => {
353 let next_state = State::new()
354 .with_inner(StateInner::Empty)
355 .with(State::WAKE_ALLS, state.get(State::WAKE_ALLS) + 1);
356 self.compare_exchange(state, next_state)
357 .expect("state should not have transitioned while locked");
358 }
359 }
360
361 // As long as there are waiters remaining to wake, lock the queue, drain
362 // another batch, release the lock, and wake them.
363 let mut woken = 0;
364 while waiters_remaining {
365 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All);
366 woken += batch.len();
367 MutexGuard::unlocked(&mut queue, || batch.wake_all());
368 }
369
370 woken
371 }
372
373 /// Wait to be woken up by this queue.
374 ///
375 /// Equivalent to:
376 ///
377 /// ```ignore
378 /// async fn wait(&self);
379 /// ```
380 ///
381 /// This returns a [`Wait`] [`Future`] that will complete when the task is
382 /// woken by a call to [`wake()`] or [`wake_all()`], or when the `WaitQueue`
383 /// is dropped.
384 ///
385 /// Each `WaitQueue` holds a single wakeup. If [`wake()`] was previously
386 /// called while no tasks were waiting on the queue, then `wait().await`
387 /// will complete immediately, consuming the stored wakeup. Otherwise,
388 /// `wait().await` waits to be woken by the next call to [`wake()`] or
389 /// [`wake_all()`].
390 ///
391 /// The [`Wait`] future is not guaranteed to receive wakeups from calls to
392 /// [`wake()`] if it has not yet been polled. See the documentation for the
393 /// [`Wait::subscribe()`] method for details on receiving wakeups from the
394 /// queue prior to polling the `Wait` future for the first time.
395 ///
396 /// A `Wait` future **is** is guaranteed to receive wakeups from calls to
397 /// [`wake_all()`] as soon as it is created, even if it has not yet been
398 /// polled.
399 ///
400 /// # Returns
401 ///
402 /// The [`Future`] returned by this method completes with one of the
403 /// following [outputs](Future::Output):
404 ///
405 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
406 /// [`wake_all()`].
407 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` being
408 /// [`close`d](WaitQueue::close).
409 ///
410 /// # Cancellation
411 ///
412 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the order
413 /// that they started to wait. If a [`Wait`] future is dropped, the task
414 /// will forfeit its position in the queue.
415 ///
416 /// [`wake()`]: Self::wake
417 /// [`wake_all()`]: Self::wake_all
418 pub fn wait(&self) -> Wait<'_> {
419 Wait {
420 queue: self,
421 waiter: self.waiter(),
422 }
423 }
424
425 /// Wait to be woken up by this queue, returning a future that's valid
426 /// for the `'static` lifetime.
427 ///
428 /// This returns a [`WaitOwned`] future that will complete when the task
429 /// is woken by a call to [`wake()`] or [`wake_all()`], or when the
430 /// `WaitQueue` is [closed].
431 ///
432 /// This is identical to the [`wait()`] method, except that it takes a
433 /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future
434 /// to live for the `'static` lifetime. See the documentation for
435 /// [`wait()`] for details on how to use the future returned by this
436 /// method.
437 ///
438 /// # Returns
439 ///
440 /// The [`Future`] returned by this method completes with one of the
441 /// following [outputs](Future::Output):
442 ///
443 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or
444 /// [`wake_all()`].
445 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue`
446 /// being [closed].
447 ///
448 /// # Cancellation
449 ///
450 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the
451 /// order that they started to wait. If a [`WaitOwned`] future is
452 /// dropped, the task will forfeit its position in the queue.
453 ///
454 /// [`wake()`]: Self::wake
455 /// [`wake_all()`]: Self::wake_all
456 /// [`wait()`]: Self::wait
457 /// [closed]: Self::close
458 pub fn wait_owned(self: &Arc<Self>) -> WaitOwned {
459 let waiter = self.waiter();
460 let queue = self.clone();
461 WaitOwned { queue, waiter }
462 }
463
464 /// Asynchronously poll the given function `f` until a condition occurs,
465 /// using the [`WaitQueue`] to only re-poll when notified.
466 ///
467 /// This can be used to implement a "wait loop", turning a "try" function
468 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
469 /// "recv" or "send").
470 ///
471 /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
472 /// prior to polling the function, ensuring that there is not a chance of a race
473 /// where the condition occurs AFTER checking but BEFORE registering interest
474 /// in the [`WaitQueue`], which could lead to deadlock.
475 ///
476 /// This is intended to have similar behavior to `Condvar` in the standard library,
477 /// but asynchronous, and not requiring operating system intervention (or existence).
478 ///
479 /// In particular, this can be used in cases where interrupts or events are used
480 /// to signify readiness or completion of some task, such as the completion of a
481 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
482 /// can wake the queue, allowing the polling function to check status fields for
483 /// partial progress or completion.
484 ///
485 /// Consider using [`Self::wait_for_value()`] if your function does return a value.
486 ///
487 /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for)
488 /// if you do not need multiple waiters.
489 ///
490 /// # Errors
491 ///
492 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed.
493 ///
494 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> {
495 loop {
496 let wait = self.wait();
497 let mut wait = core::pin::pin!(wait);
498 let _ = wait.as_mut().subscribe()?;
499 if f() {
500 return Ok(());
501 }
502 wait.await?;
503 }
504 }
505
506 /// Asynchronously poll the given function `f` until a condition occurs,
507 /// using the [`WaitQueue`] to only re-poll when notified.
508 ///
509 /// This can be used to implement a "wait loop", turning a "try" function
510 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g.
511 /// "recv" or "send").
512 ///
513 /// In particular, this function correctly *registers* interest in the [`WaitQueue`]
514 /// prior to polling the function, ensuring that there is not a chance of a race
515 /// where the condition occurs AFTER checking but BEFORE registering interest
516 /// in the [`WaitQueue`], which could lead to deadlock.
517 ///
518 /// This is intended to have similar behavior to `Condvar` in the standard library,
519 /// but asynchronous, and not requiring operating system intervention (or existence).
520 ///
521 /// In particular, this can be used in cases where interrupts or events are used
522 /// to signify readiness or completion of some task, such as the completion of a
523 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt
524 /// can wake the queue, allowing the polling function to check status fields for
525 /// partial progress or completion, and also return the status flags at the same time.
526 ///
527 /// Consider using [`Self::wait_for()`] if your function does not return a value.
528 ///
529 /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value)
530 /// if you do not need multiple waiters.
531 ///
532 /// # Errors
533 ///
534 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed.
535 ///
536 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> {
537 loop {
538 let wait = self.wait();
539 let mut wait = core::pin::pin!(wait);
540 match wait.as_mut().subscribe() {
541 Poll::Ready(wr) => wr?,
542 Poll::Pending => {}
543 }
544 if let Some(t) = f() {
545 return Ok(t);
546 }
547 wait.await?;
548 }
549 }
550
551 /// Close the queue, indicating that it may no longer be used.
552 ///
553 /// Once a queue is closed, all [`wait()`] calls (current or future) will
554 /// return an error.
555 ///
556 /// This method is generally used when implementing higher-level
557 /// synchronization primitives or resources: when an event makes a resource
558 /// permanently unavailable, the queue can be closed.
559 ///
560 /// [`wait()`]: Self::wait
561 pub fn close(&self) {
562 let state = self
563 .state
564 .0
565 .fetch_or(StateInner::Closed.into_bits(), Ordering::SeqCst);
566 let state = State::from_bits(state);
567 if state.get(State::INNER) != StateInner::Waiting {
568 return;
569 }
570
571 let mut queue = self.queue.lock();
572 let mut batch = WakeBatch::new();
573 let mut waiters_remaining = true;
574
575 while waiters_remaining {
576 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All);
577 MutexGuard::unlocked(&mut queue, || batch.wake_all());
578 }
579 }
580
581 /// Returns `true` if this `WaitQueue` is [closed](Self::close).
582 #[must_use]
583 pub fn is_closed(&self) -> bool {
584 self.load().get(State::INNER) == StateInner::Closed
585 }
586
587 /// Returns a [`Waiter`] entry in this queue.
588 ///
589 /// This is factored out into a separate function because it's used by both
590 /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`].
591 fn waiter(&self) -> Waiter {
592 // how many times has `wake_all` been called when this waiter is created?
593 let current_wake_alls = self.load().get(State::WAKE_ALLS);
594 let state = WaitState::new()
595 .with(WaitState::WAKE_ALLS, current_wake_alls)
596 .with(WaitState::INNER, WaitStateInner::Start);
597 Waiter {
598 state,
599 node: UnsafeCell::new(WaiterInner {
600 links: list::Links::new(),
601 waker: Wakeup::Empty,
602 _pin: PhantomPinned,
603 }),
604 }
605 }
606
607 // fn try_wait(&self) -> Poll<Result<(), Closed>> {
608 // let mut state = self.load();
609 // let initial_wake_alls = state.get(State::WAKE_ALLS);
610 // while state.get(State::INNER) == StateInner::Woken {
611 // match self.compare_exchange(state, state.with_inner(StateInner::Empty)) {
612 // Ok(_) => return Poll::Ready(Ok(())),
613 // Err(actual) => state = actual,
614 // }
615 // }
616 //
617 // match state.get(State::INNER) {
618 // StateInner::Closed => Poll::Ready(Err(Closed(()))),
619 // _ if state.get(State::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())),
620 // StateInner::Empty | StateInner::Waiting => Poll::Pending,
621 // StateInner::Woken => Poll::Ready(Ok(())),
622 // }
623 // }
624
625 #[cold]
626 #[inline(never)]
627 fn wake_locked(&self, queue: &mut List<Waiter>, curr: State) -> Option<Waker> {
628 let inner = curr.get(State::INNER);
629
630 // is the queue still in the `Waiting` state? it is possible that we
631 // transitioned to a different state while locking the queue.
632 if inner != StateInner::Waiting {
633 // if there are no longer any queued tasks, try to store the
634 // wakeup in the queue and bail.
635 if let Err(actual) = self.compare_exchange(curr, curr.with_inner(StateInner::Woken)) {
636 debug_assert!(actual.get(State::INNER) != StateInner::Waiting);
637 self.store(actual.with_inner(StateInner::Woken));
638 }
639
640 return None;
641 }
642
643 // otherwise, we have to dequeue a task and wake it.
644 let node = queue
645 .pop_back()
646 .expect("if we are in the Waiting state, there must be waiters in the queue");
647 let waker = Waiter::wake(node, queue, Wakeup::One);
648
649 // if we took the final waiter currently in the queue, transition to the
650 // `Empty` state.
651 if queue.is_empty() {
652 self.store(curr.with_inner(StateInner::Empty));
653 }
654
655 waker
656 }
657
658 /// Drain waiters from `queue` and add them to `batch`. Returns `true` if
659 /// the batch was filled while more waiters remain in the queue, indicating
660 /// that this function must be called again to wake all waiters.
661 fn drain_to_wake_batch(
662 batch: &mut WakeBatch,
663 queue: &mut List<Waiter>,
664 wakeup: Wakeup,
665 ) -> bool {
666 while let Some(node) = queue.pop_back() {
667 let Some(waker) = Waiter::wake(node, queue, wakeup.clone()) else {
668 // this waiter was enqueued by `Wait::register` and doesn't have
669 // a waker, just keep going.
670 continue;
671 };
672
673 if batch.add_waker(waker) {
674 // there's still room in the wake set, just keep adding to it.
675 continue;
676 }
677
678 // wake set is full, drop the lock and wake everyone!
679 break;
680 }
681
682 !queue.is_empty()
683 }
684
685 fn load(&self) -> State {
686 State::from_bits(self.state.0.load(Ordering::SeqCst))
687 }
688
689 fn store(&self, state: State) {
690 self.state.0.store(state.0, Ordering::SeqCst);
691 }
692
693 fn compare_exchange(&self, current: State, new: State) -> Result<State, State> {
694 self.state
695 .0
696 .compare_exchange(current.0, new.0, Ordering::SeqCst, Ordering::SeqCst)
697 .map(State::from_bits)
698 .map_err(State::from_bits)
699 }
700}
701
702// === impl State & StateInner ===
703
704impl State {
705 const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit();
706
707 fn with_inner(self, inner: StateInner) -> Self {
708 self.with(Self::INNER, inner)
709 }
710}
711impl StateInner {
712 const fn into_usize(self) -> usize {
713 self as u8 as usize
714 }
715}
716
717impl FromBits<usize> for StateInner {
718 const BITS: u32 = 2;
719 type Error = core::convert::Infallible;
720
721 fn try_from_bits(bits: usize) -> Result<Self, Self::Error> {
722 #[expect(
723 clippy::cast_possible_truncation,
724 reason = "`bits` will only have 3 bits set"
725 )]
726 Ok(match bits as u8 {
727 bits if bits == Self::Empty as u8 => Self::Empty,
728 bits if bits == Self::Waiting as u8 => Self::Waiting,
729 bits if bits == Self::Woken as u8 => Self::Woken,
730 bits if bits == Self::Closed as u8 => Self::Closed,
731 _ => {
732 unreachable!();
733 }
734 })
735 }
736
737 fn into_bits(self) -> usize {
738 self as usize
739 }
740}
741
742// === impl Waiter ===
743
744impl fmt::Debug for Waiter {
745 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
746 f.debug_struct("Waiter")
747 .field("state", &self.state)
748 .finish_non_exhaustive()
749 }
750}
751
752impl Waiter {
753 /// Returns the [`Waker`] for the task that owns this `Waiter`.
754 ///
755 /// # Safety
756 ///
757 /// This is only safe to call while the list is locked. The `list`
758 /// parameter ensures this method is only called while holding the lock, so
759 /// this can be safe.
760 ///
761 /// Of course, that must be the *same* list that this waiter is a member of,
762 /// and currently, there is no way to ensure that...
763 #[inline(always)]
764 fn wake(this: NonNull<Self>, list: &mut List<Self>, wakeup: Wakeup) -> Option<Waker> {
765 Waiter::with_inner(this, list, |node| {
766 let waker = mem::replace(&mut node.waker, wakeup);
767 match waker {
768 // the node has a registered waker, so wake the task.
769 Wakeup::Waiting(waker) => Some(waker),
770 // do nothing: the node was registered by `Wait::register`
771 // without a waker, so the future will already be woken when it is
772 // actually polled.
773 Wakeup::Empty => None,
774 // the node was already woken? this should not happen and
775 // probably indicates a race!
776 _ => unreachable!("tried to wake a waiter in the {:?} state!", waker),
777 }
778 })
779 }
780
781 /// # Safety
782 ///
783 /// This is only safe to call while the list is locked. The dummy `_list`
784 /// parameter ensures this method is only called while holding the lock, so
785 /// this can be safe.
786 ///
787 /// Of course, that must be the *same* list that this waiter is a member of,
788 /// and currently, there is no way to ensure that...
789 #[inline(always)]
790 fn with_inner<T>(
791 mut this: NonNull<Self>,
792 _list: &mut List<Self>,
793 f: impl FnOnce(&mut WaiterInner) -> T,
794 ) -> T {
795 // safety: this is only called while holding the lock on the queue,
796 // so it's safe to mutate the waiter.
797 unsafe { f(&mut *this.as_mut().node.get()) }
798 }
799
800 fn poll_wait(
801 mut self: Pin<&mut Self>,
802 queue: &WaitQueue,
803 waker: Option<&Waker>,
804 ) -> Poll<Result<(), Closed>> {
805 // Safety: we never move out of `ptr` below, only mutate its fields
806 let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self.as_mut())) };
807 let mut this = self.as_mut().project();
808
809 match this.state.get(WaitState::INNER) {
810 WaitStateInner::Start => {
811 let queue_state = queue.load();
812
813 // can we consume a pending wakeup?
814 if queue
815 .compare_exchange(
816 queue_state.with_inner(StateInner::Woken),
817 queue_state.with_inner(StateInner::Empty),
818 )
819 .is_ok()
820 {
821 this.state.set(WaitState::INNER, WaitStateInner::Woken);
822 return Poll::Ready(Ok(()));
823 }
824
825 // okay, no pending wakeups. try to wait...
826
827 let mut waiters = queue.queue.lock();
828 let mut queue_state = queue.load();
829
830 // the whole queue was woken while we were trying to acquire
831 // the lock!
832 if queue_state.get(State::WAKE_ALLS) != this.state.get(WaitState::WAKE_ALLS) {
833 this.state.set(WaitState::INNER, WaitStateInner::Woken);
834 return Poll::Ready(Ok(()));
835 }
836
837 // transition the queue to the waiting state
838 'to_waiting: loop {
839 match queue_state.get(State::INNER) {
840 // the queue is `Empty`, transition to `Waiting`
841 StateInner::Empty => {
842 match queue.compare_exchange(
843 queue_state,
844 queue_state.with_inner(StateInner::Waiting),
845 ) {
846 Ok(_) => break 'to_waiting,
847 Err(actual) => queue_state = actual,
848 }
849 }
850 // the queue is already `Waiting`
851 StateInner::Waiting => break 'to_waiting,
852 // the queue was woken, consume the wakeup.
853 StateInner::Woken => {
854 match queue.compare_exchange(
855 queue_state,
856 queue_state.with_inner(StateInner::Empty),
857 ) {
858 Ok(_) => {
859 this.state.set(WaitState::INNER, WaitStateInner::Woken);
860 return Poll::Ready(Ok(()));
861 }
862 Err(actual) => queue_state = actual,
863 }
864 }
865 StateInner::Closed => return Poll::Ready(Err(Closed(()))),
866 }
867 }
868
869 // enqueue the node
870 this.state.set(WaitState::INNER, WaitStateInner::Waiting);
871 if let Some(waker) = waker {
872 // safety: we may mutate the inner state because we are
873 // holding the lock.
874 unsafe {
875 let node = this.node.as_mut().get();
876 debug_assert!(matches!((*node).waker, Wakeup::Empty));
877 (*node).waker = Wakeup::Waiting(waker.clone());
878 }
879 }
880
881 waiters.push_front(ptr);
882
883 Poll::Pending
884 }
885 WaitStateInner::Waiting => {
886 let _waiters = queue.queue.lock();
887 // safety: we may mutate the inner state because we are
888 // holding the lock.
889 unsafe {
890 let node = &mut *this.node.get();
891 match node.waker {
892 Wakeup::Waiting(ref mut curr_waker) => {
893 match waker {
894 Some(waker) if !curr_waker.will_wake(waker) => {
895 *curr_waker = waker.clone();
896 }
897 _ => {}
898 }
899 Poll::Pending
900 }
901 Wakeup::All | Wakeup::One => {
902 this.state.set(WaitState::INNER, WaitStateInner::Woken);
903 Poll::Ready(Ok(()))
904 }
905 // Wakeup::Closed => {
906 // this.state.set(WaitState::INNER, WaitStateInner::Woken);
907 // Poll::Ready(Err(Closed(())))
908 // }
909 Wakeup::Empty => {
910 if let Some(waker) = waker {
911 node.waker = Wakeup::Waiting(waker.clone());
912 }
913
914 Poll::Pending
915 }
916 }
917 }
918 }
919 WaitStateInner::Woken => Poll::Ready(Ok(())),
920 }
921 }
922
923 /// Release this `Waiter` from the queue.
924 ///
925 /// This is called from the `drop` implementation for the [`Wait`] and
926 /// [`WaitOwned`] futures.
927 fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) {
928 let state = *(self.as_mut().project().state);
929 // Safety: we never move out of `ptr` below, only mutate its fields
930 let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) });
931
932 // if we're not enqueued, we don't have to do anything else.
933 if state.get(WaitState::INNER) != WaitStateInner::Waiting {
934 return;
935 }
936
937 let mut waiters = queue.queue.lock();
938 let state = queue.load();
939 // remove the node
940 // safety: we have the lock on the queue, so this is safe.
941 unsafe {
942 waiters.remove(ptr);
943 };
944
945 // if we removed the last waiter from the queue, transition the state to
946 // `Empty`.
947 if waiters.is_empty() && state.get(State::INNER) == StateInner::Waiting {
948 queue.store(state.with_inner(StateInner::Empty));
949 }
950
951 // if the node has an unconsumed wakeup, it must be assigned to the next
952 // node in the queue.
953 let next_waiter =
954 if Waiter::with_inner(ptr, &mut waiters, |node| matches!(&node.waker, Wakeup::One)) {
955 queue.wake_locked(&mut waiters, state)
956 } else {
957 None
958 };
959
960 drop(waiters);
961
962 if let Some(next) = next_waiter {
963 next.wake();
964 }
965 }
966}
967
968// Safety: TODO
969unsafe impl Linked<list::Links<Self>> for Waiter {
970 type Handle = NonNull<Waiter>;
971
972 fn into_ptr(r: Self::Handle) -> NonNull<Self> {
973 r
974 }
975
976 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle {
977 ptr
978 }
979
980 unsafe fn links(target: NonNull<Self>) -> NonNull<list::Links<Waiter>> {
981 // Safety: ensured by caller
982 unsafe {
983 // Safety: using `ptr::addr_of!` avoids creating a temporary
984 // reference, which stacked borrows dislikes.
985 let node = &*ptr::addr_of!((*target.as_ptr()).node);
986 let links = ptr::addr_of_mut!((*node.get()).links);
987 // Safety: since the `target` pointer is `NonNull`, we can assume
988 // that pointers to its members are also not null, making this use
989 // of `new_unchecked` fine.
990 NonNull::new_unchecked(links)
991 }
992 }
993}
994
995// === impl Wait ===
996
997impl Wait<'_> {
998 /// Returns `true` if this `WaitOwned` future is waiting for a
999 /// notification from the provided [`WaitQueue`].
1000 #[inline]
1001 #[must_use]
1002 pub fn waits_on(&self, queue: &WaitQueue) -> bool {
1003 ptr::eq(self.queue, queue)
1004 }
1005
1006 /// Returns `true` if `self` and `other` are waiting on a notification
1007 /// from the same [`WaitQueue`].
1008 #[inline]
1009 #[must_use]
1010 pub fn same_queue(&self, other: &Wait<'_>) -> bool {
1011 ptr::eq(self.queue, other.queue)
1012 }
1013
1014 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1015 ///
1016 /// Polling a `Wait` future adds that future to the list of waiters that may
1017 /// receive a wakeup from a `WaitQueue`. However, in some cases, it is
1018 /// desirable to subscribe to wakeups *prior* to actually waiting for one.
1019 /// This method should be used when it is necessary to ensure a `Wait`
1020 /// future is in the list of waiters before the future is `poll`ed for the
1021 /// first time.
1022 ///
1023 /// In general, this method is used in cases where a [`WaitQueue`] must
1024 /// synchronize with some additional state, such as an `AtomicBool` or
1025 /// counter. If a task first checks that state, and then chooses whether or
1026 /// not to wait on the `WaitQueue` based on that state, then a race
1027 /// condition may occur where the `WaitQueue` wakes waiters *between* when
1028 /// the task checked the external state and when it first polled its `Wait`
1029 /// future to wait on the queue. This method allows registering the `Wait`
1030 /// future with the queue *prior* to checking the external state, without
1031 /// actually sleeping, so that when the task does wait for the `Wait` future
1032 /// to complete, it will have received any wakeup that was sent between when
1033 /// the external state was checked and the `Wait` future was first polled.
1034 ///
1035 /// # Returns
1036 ///
1037 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready` a wakeup was
1038 /// already received. This method returns [`Poll::Ready`] in the following
1039 /// cases:
1040 ///
1041 /// 1. The [`WaitQueue::wake()`] method was called between the creation of the
1042 /// `Wait` and the call to this method.
1043 /// 2. This is the first call to `subscribe` or `poll` on this future, and the
1044 /// `WaitQueue` was holding a stored wakeup from a previous call to
1045 /// [`wake()`]. This method consumes the wakeup in that case.
1046 /// 3. The future has previously been `subscribe`d or polled, and it has since
1047 /// then been marked ready by either consuming a wakeup from the
1048 /// `WaitQueue`, or by a call to [`wake()`] or [`wake_all()`] that
1049 /// removed it from the list of futures ready to receive wakeups.
1050 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which case
1051 /// this method returns `Poll::Ready(Err(Closed))`.
1052 ///
1053 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of this
1054 /// `Wait` future will also immediately return [`Poll::Ready`].
1055 ///
1056 /// If the [`Wait`] future subscribed to wakeups from the queue, and
1057 /// has not been woken, this method returns [`Poll::Pending`].
1058 ///
1059 /// [`wake()`]: WaitQueue::wake
1060 /// [`wake_all()`]: WaitQueue::wake_all
1061 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> {
1062 let this = self.project();
1063 this.waiter.poll_wait(this.queue, None)
1064 }
1065}
1066
1067impl Future for Wait<'_> {
1068 type Output = Result<(), Closed>;
1069
1070 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1071 let this = self.project();
1072 this.waiter.poll_wait(this.queue, Some(cx.waker()))
1073 }
1074}
1075
1076#[pinned_drop]
1077impl PinnedDrop for Wait<'_> {
1078 fn drop(mut self: Pin<&mut Self>) {
1079 let this = self.project();
1080 this.waiter.release(this.queue);
1081 }
1082}
1083
1084// === impl WaitOwned ===
1085
1086impl WaitOwned {
1087 /// Returns `true` if this `WaitOwned` future is waiting for a
1088 /// notification from the provided [`WaitQueue`].
1089 #[inline]
1090 #[must_use]
1091 pub fn waits_on(&self, queue: &WaitQueue) -> bool {
1092 ptr::eq(&*self.queue, queue)
1093 }
1094
1095 /// Returns `true` if `self` and `other` are waiting on a notification
1096 /// from the same [`WaitQueue`].
1097 #[inline]
1098 #[must_use]
1099 pub fn same_queue(&self, other: &Wait<'_>) -> bool {
1100 ptr::eq(&*self.queue, other.queue)
1101 }
1102
1103 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`].
1104 ///
1105 /// Polling a `WaitOwned` future adds that future to the list of waiters
1106 /// that may receive a wakeup from a `WaitQueue`. However, in some
1107 /// cases, it is desirable to subscribe to wakeups *prior* to actually
1108 /// waiting for one. This method should be used when it is necessary to
1109 /// ensure a `WaitOwned` future is in the list of waiters before the
1110 /// future is `poll`ed for the rst time.
1111 ///
1112 /// In general, this method is used in cases where a [`WaitQueue`] must
1113 /// synchronize with some additional state, such as an `AtomicBool` or
1114 /// counter. If a task first checks that state, and then chooses whether or
1115 /// not to wait on the `WaitQueue` based on that state, then a race
1116 /// condition may occur where the `WaitQueue` wakes waiters *between* when
1117 /// the task checked the external state and when it first polled its
1118 /// `WaitOwned` future to wait on the queue. This method allows
1119 /// registering the `WaitOwned` future with the queue *prior* to
1120 /// checking the external state, without actually sleeping, so that when
1121 /// the task does wait for the `WaitOwned` future to complete, it will
1122 /// have received any wakeup that was sent between when the external
1123 /// state was checked and the `WaitOwned` future was first polled.
1124 ///
1125 /// # Returns
1126 ///
1127 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready`
1128 /// a wakeup was already received. This method returns [`Poll::Ready`]
1129 /// in the following cases:
1130 ///
1131 /// 1. The [`WaitQueue::wake()`] method was called between the creation
1132 /// of the `WaitOwned` future and the call to this method.
1133 /// 2. This is the first call to `subscribe` or `poll` on this future,
1134 /// and the `WaitQueue` was holding a stored wakeup from a previous
1135 /// call to [`wake()`]. This method consumes the wakeup in that case.
1136 /// 3. The future has previously been `subscribe`d or polled, and it
1137 /// has since then been marked ready by either consuming a wakeup
1138 /// from the `WaitQueue`, or by a call to [`wake()`] or
1139 /// [`wake_all()`] that removed it from the list of futures ready to
1140 /// receive wakeups.
1141 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which
1142 /// case this method returns `Poll::Ready(Err(Closed))`.
1143 ///
1144 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of
1145 /// this `Wait` future will also immediately return [`Poll::Ready`].
1146 ///
1147 /// If the [`WaitOwned`] future subscribed to wakeups from the queue,
1148 /// and has not been woken, this method returns [`Poll::Pending`].
1149 ///
1150 /// [`wake()`]: WaitQueue::wake
1151 /// [`wake_all()`]: WaitQueue::wake_all
1152 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> {
1153 let this = self.project();
1154 this.waiter.poll_wait(this.queue, None)
1155 }
1156}
1157
1158impl Future for WaitOwned {
1159 type Output = Result<(), Closed>;
1160
1161 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1162 let this = self.project();
1163 this.waiter.poll_wait(&*this.queue, Some(cx.waker()))
1164 }
1165}
1166
1167#[pinned_drop]
1168impl PinnedDrop for WaitOwned {
1169 fn drop(mut self: Pin<&mut Self>) {
1170 let this = self.project();
1171 this.waiter.release(&*this.queue);
1172 }
1173}