Next Generation WASM Microkernel Operating System
at main 1173 lines 46 kB view raw
1// Copyright 2025 Jonas Kruckenberg 2// 3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5// http://opensource.org/licenses/MIT>, at your option. This file may not be 6// copied, modified, or distributed except according to those terms. 7 8use alloc::sync::Arc; 9use core::cell::UnsafeCell; 10use core::marker::PhantomPinned; 11use core::pin::Pin; 12use core::ptr::NonNull; 13use core::sync::atomic::{AtomicUsize, Ordering}; 14use core::task::{Context, Poll, Waker}; 15use core::{fmt, mem, ptr}; 16 17use cordyceps::{Linked, List, list}; 18use k23_spin::{Mutex, MutexGuard}; 19use k32_util::{CachePadded, loom_const_fn}; 20use mycelium_bitfield::{FromBits, bitfield, enum_from_bits}; 21use pin_project::{pin_project, pinned_drop}; 22 23use crate::error::Closed; 24use crate::sync::wake_batch::WakeBatch; 25 26/// A queue of waiting tasks which can be [woken in first-in, first-out 27/// order][wake], or [all at once][wake_all]. 28/// 29/// This type is taken from [maitake-sync](https://github.com/hawkw/mycelium/blob/dd0020892564c77ee4c20ffbc2f7f5b046ad54c8/maitake-sync/src/wait_queue.rs#L1577). 30/// 31/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be 32/// woken when some event occurs, either [individually][wake] in first-in, 33/// first-out order, or [all at once][wake_all]. This makes it a vital building 34/// block of runtime services (such as timers or I/O resources), where it may be 35/// used to wake a set of tasks when a timer completes or when a resource 36/// becomes available. It can be equally useful for implementing higher-level 37/// synchronization primitives: for example, a `WaitQueue` plus an 38/// [`UnsafeCell`] is essentially an entire implementation of a fair 39/// asynchronous mutex. Finally, a `WaitQueue` can be a useful 40/// synchronization primitive on its own: sometimes, you just need to have a 41/// bunch of tasks wait for something and then wake them all up. 42/// 43/// # Implementation Notes 44/// 45/// This type is currently implemented using [intrusive doubly-linked 46/// list][ilist]. 47/// 48/// The *[intrusive]* aspect of this map is important, as it means that it does 49/// not allocate memory. Instead, nodes in the linked list are stored in the 50/// futures of tasks trying to wait for capacity. This means that it is not 51/// necessary to allocate any heap memory for each task waiting to be woken. 52/// 53/// However, the intrusive linked list introduces one new danger: because 54/// futures can be *cancelled*, and the linked list nodes live within the 55/// futures trying to wait on the queue, we *must* ensure that the node 56/// is unlinked from the list before dropping a cancelled future. Failure to do 57/// so would result in the list containing dangling pointers. Therefore, we must 58/// use a *doubly-linked* list, so that nodes can edit both the previous and 59/// next node when they have to remove themselves. This is kind of a bummer, as 60/// it means we can't use something nice like this [intrusive queue by Dmitry 61/// Vyukov][2], and there are not really practical designs for lock-free 62/// doubly-linked lists that don't rely on some kind of deferred reclamation 63/// scheme such as hazard pointers or QSBR. 64/// 65/// Instead, we just stick a [`Mutex`] around the linked list, which must be 66/// acquired to pop nodes from it, or for nodes to remove themselves when 67/// futures are cancelled. This is a bit sad, but the critical sections for this 68/// mutex are short enough that we still get pretty good performance despite it. 69/// 70/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction 71/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue 72/// [`Waker`]: Waker 73/// [wait]: WaitQueue::wait 74/// [wake]: WaitQueue::wake 75/// [wake_all]: WaitQueue::wake_all 76/// [`UnsafeCell`]: UnsafeCell 77/// [ilist]: linked_list::List 78#[derive(Debug)] 79pub struct WaitQueue { 80 /// The wait maps's state variable. 81 state: CachePadded<AtomicUsize>, 82 /// The linked list of waiters. 83 /// 84 /// # Safety 85 /// 86 /// This is protected by a mutex; the mutex *must* be acquired when 87 /// manipulating the linked list, OR when manipulating waiter nodes that may 88 /// be linked into the list. If a node is known to not be linked, it is safe 89 /// to modify that node (such as by waking the stored [`Waker`]) without 90 /// holding the lock; otherwise, it may be modified through the list, so the 91 /// lock must be held when modifying the 92 /// node. 93 queue: Mutex<List<Waiter>>, 94} 95 96bitfield! { 97 #[derive(Eq, PartialEq)] 98 struct State<usize> { 99 /// The queue's state. 100 const INNER: StateInner; 101 102 /// The number of times [`WaitQueue::wake_all`] has been called. 103 const WAKE_ALLS = ..; 104 } 105} 106 107/// The queue's current state. 108#[derive(Debug, Copy, Clone, Eq, PartialEq)] 109#[repr(u8)] 110enum StateInner { 111 /// No waiters are queued, and there is no pending notification. 112 /// Waiting while the queue is in this state will enqueue the waiter; 113 /// notifying while in this state will store a pending notification in the 114 /// queue, transitioning to [`StateInner::Woken`]. 115 Empty = 0b00, 116 /// There are one or more waiters in the queue. Waiting while 117 /// the queue is in this state will not transition the state. Waking while 118 /// in this state will wake the first waiter in the queue; if this empties 119 /// the queue, then the queue will transition to [`StateInner::Empty`]. 120 Waiting = 0b01, 121 /// The queue has a stored notification. Waiting while the queue 122 /// is in this state will consume the pending notification *without* 123 /// enqueueing the waiter and transition the queue to [`StateInner::Empty`]. 124 /// Waking while in this state will leave the queue in this state. 125 Woken = 0b10, 126 /// The queue is closed. Waiting while in this state will return 127 /// [`Closed`] without transitioning the queue's state. 128 /// 129 /// *Note*: This *must* correspond to all state bits being set, as it's set 130 /// via a [`fetch_or`]. 131 /// 132 /// [`fetch_or`]: AtomicUsize::fetch_or 133 Closed = 0b11, 134} 135 136#[pin_project(PinnedDrop)] 137#[derive(Debug)] 138#[must_use = "futures do nothing unless `.await`ed or `poll`ed"] 139pub struct Wait<'a> { 140 // The [`WaitQueue`] being waited on. 141 queue: &'a WaitQueue, 142 // Entry in the wait queue linked list. 143 #[pin] 144 waiter: Waiter, 145} 146 147/// Future returned from [`WaitQueue::wait_owned()`]. 148/// 149/// This is identical to the [`Wait`] future, except that it takes an 150/// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to 151/// live for the `'static` lifetime. 152/// 153/// This future is fused, so once it has completed, any future calls to poll 154/// will immediately return [`Poll::Ready`]. 155/// 156/// # Notes 157/// 158/// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a 159/// `WaitOwned` future once it has been polled. For instance, the following 160/// code must not compile: 161/// 162///```compile_fail 163/// use maitake_sync::wait_queue::WaitOwned; 164/// 165/// // Calls to this function should only compile if `T` is `Unpin`. 166/// fn assert_unpin<T: Unpin>() {} 167/// 168/// assert_unpin::<WaitOwned<'_>>(); 169/// ``` 170#[pin_project(PinnedDrop)] 171#[derive(Debug)] 172pub struct WaitOwned { 173 // The `WaitQueue` being waited on. 174 queue: Arc<WaitQueue>, 175 // Entry in the wait queue. 176 #[pin] 177 waiter: Waiter, 178} 179 180/// A waiter node which may be linked into a wait queue. 181#[pin_project] 182#[repr(C)] 183struct Waiter { 184 // The intrusive linked list node. 185 // 186 // This *must* be the first field in the struct in order for the `Linked` 187 // implementation to be sound. 188 #[pin] 189 node: UnsafeCell<WaiterInner>, 190 // The future's state. 191 state: WaitState, 192} 193 194struct WaiterInner { 195 /// Intrusive linked list pointers. 196 links: list::Links<Waiter>, 197 /// The node's waker 198 waker: Wakeup, 199 // This type is !Unpin due to the heuristic from: 200 // <https://github.com/rust-lang/rust/pull/82834> 201 _pin: PhantomPinned, 202} 203 204bitfield! { 205 #[derive(Eq, PartialEq)] 206 struct WaitState<usize> { 207 /// The waiter's state. 208 const INNER: WaitStateInner; 209 /// The number of times [`WaitQueue::wake_all`] has been called. 210 const WAKE_ALLS = ..; 211 } 212} 213 214enum_from_bits! { 215 /// The state of a [`Waiter`] node in a [`WaitQueue`]. 216 #[derive(Debug, Eq, PartialEq)] 217 enum WaitStateInner<u8> { 218 /// The waiter has not yet been enqueued. 219 /// 220 /// The number of times [`WaitQueue::wake_all`] has been called is stored 221 /// when the node is created, in order to determine whether it was woken by 222 /// a stored wakeup when enqueueing. 223 /// 224 /// When in this state, the node is **not** part of the linked list, and 225 /// can be dropped without removing it from the list. 226 Start = 0b00, 227 228 /// The waiter is waiting. 229 /// 230 /// When in this state, the node **is** part of the linked list. If the 231 /// node is dropped in this state, it **must** be removed from the list 232 /// before dropping it. Failure to ensure this will result in dangling 233 /// pointers in the linked list! 234 Waiting = 0b01, 235 236 /// The waiter has been woken. 237 /// 238 /// When in this state, the node is **not** part of the linked list, and 239 /// can be dropped without removing it from the list. 240 Woken = 0b10, 241 } 242} 243 244#[derive(Clone, Debug)] 245enum Wakeup { 246 Empty, 247 Waiting(Waker), 248 One, 249 All, 250 // Closed, 251} 252 253// === impl WaitQueue === 254 255impl Default for WaitQueue { 256 fn default() -> Self { 257 Self::new() 258 } 259} 260 261impl WaitQueue { 262 loom_const_fn! { 263 pub const fn new() -> Self { 264 Self { 265 state: CachePadded(AtomicUsize::new(StateInner::Empty.into_usize())), 266 queue: Mutex::new(List::new()), 267 } 268 } 269 } 270 271 /// Wake the next task in the queue. 272 /// 273 /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the 274 /// **next** call to [`wait().await`] will complete immediately. If one or more 275 /// tasks are currently in the queue, the first task in the queue is woken. 276 /// 277 /// At most one wakeup will be stored in the queue at any time. If `wake()` 278 /// is called many times while there are no tasks in the queue, only a 279 /// single wakeup is stored. 280 /// 281 /// [`wait().await`]: Self::wait() 282 #[inline] 283 pub fn wake(&self) { 284 // snapshot the queue's current state. 285 let mut state = self.load(); 286 287 // check if any tasks are currently waiting on this queue. if there are 288 // no waiting tasks, store the wakeup to be consumed by the next call to 289 // `wait`. 290 loop { 291 match state.get(State::INNER) { 292 // if the queue is closed, bail. 293 StateInner::Closed => return, 294 // if there are waiting tasks, break out of the loop and wake one. 295 StateInner::Waiting => break, 296 _ => {} 297 } 298 299 let next = state.with_inner(StateInner::Woken); 300 // advance the state to `Woken`, and return (if we did so 301 // successfully) 302 match self.compare_exchange(state, next) { 303 Ok(_) => return, 304 Err(actual) => state = actual, 305 } 306 } 307 308 // okay, there are tasks waiting on the queue; we must acquire the lock 309 // on the linked list and wake the next task from the queue. 310 let mut queue = self.queue.lock(); 311 312 // the queue's state may have changed while we were waiting to acquire 313 // the lock, so we need to acquire a new snapshot before we take the 314 // waker. 315 state = self.load(); 316 let waker = self.wake_locked(&mut queue, state); 317 drop(queue); 318 319 //now that we've released the lock, wake the waiting task (if we 320 //actually deuqueued one). 321 if let Some(waker) = waker { 322 waker.wake(); 323 } 324 } 325 326 /// Wake *all* tasks currently in the queue. 327 /// 328 /// All tasks currently waiting on the queue are woken. Unlike [`wake()`], a 329 /// wakeup is *not* stored in the queue to wake the next call to [`wait()`] 330 /// if the queue is empty. Instead, this method only wakes all currently 331 /// registered waiters. Registering a task to be woken is done by `await`ing 332 /// the [`Future`] returned by the [`wait()`] method on this queue. 333 /// 334 /// [`wake()`]: Self::wake 335 /// [`wait()`]: Self::wait 336 #[expect(clippy::missing_panics_doc, reason = "internal assertion")] 337 pub fn wake_all(&self) -> usize { 338 let mut batch = WakeBatch::new(); 339 let mut waiters_remaining = true; 340 341 let mut queue = self.queue.lock(); 342 let state = self.load(); 343 344 match state.get(State::INNER) { 345 StateInner::Woken | StateInner::Empty => { 346 self.state 347 .0 348 .fetch_add(State::ONE_WAKE_ALL, Ordering::SeqCst); 349 return 0; 350 } 351 StateInner::Closed => return 0, 352 StateInner::Waiting => { 353 let next_state = State::new() 354 .with_inner(StateInner::Empty) 355 .with(State::WAKE_ALLS, state.get(State::WAKE_ALLS) + 1); 356 self.compare_exchange(state, next_state) 357 .expect("state should not have transitioned while locked"); 358 } 359 } 360 361 // As long as there are waiters remaining to wake, lock the queue, drain 362 // another batch, release the lock, and wake them. 363 let mut woken = 0; 364 while waiters_remaining { 365 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All); 366 woken += batch.len(); 367 MutexGuard::unlocked(&mut queue, || batch.wake_all()); 368 } 369 370 woken 371 } 372 373 /// Wait to be woken up by this queue. 374 /// 375 /// Equivalent to: 376 /// 377 /// ```ignore 378 /// async fn wait(&self); 379 /// ``` 380 /// 381 /// This returns a [`Wait`] [`Future`] that will complete when the task is 382 /// woken by a call to [`wake()`] or [`wake_all()`], or when the `WaitQueue` 383 /// is dropped. 384 /// 385 /// Each `WaitQueue` holds a single wakeup. If [`wake()`] was previously 386 /// called while no tasks were waiting on the queue, then `wait().await` 387 /// will complete immediately, consuming the stored wakeup. Otherwise, 388 /// `wait().await` waits to be woken by the next call to [`wake()`] or 389 /// [`wake_all()`]. 390 /// 391 /// The [`Wait`] future is not guaranteed to receive wakeups from calls to 392 /// [`wake()`] if it has not yet been polled. See the documentation for the 393 /// [`Wait::subscribe()`] method for details on receiving wakeups from the 394 /// queue prior to polling the `Wait` future for the first time. 395 /// 396 /// A `Wait` future **is** is guaranteed to receive wakeups from calls to 397 /// [`wake_all()`] as soon as it is created, even if it has not yet been 398 /// polled. 399 /// 400 /// # Returns 401 /// 402 /// The [`Future`] returned by this method completes with one of the 403 /// following [outputs](Future::Output): 404 /// 405 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or 406 /// [`wake_all()`]. 407 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` being 408 /// [`close`d](WaitQueue::close). 409 /// 410 /// # Cancellation 411 /// 412 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the order 413 /// that they started to wait. If a [`Wait`] future is dropped, the task 414 /// will forfeit its position in the queue. 415 /// 416 /// [`wake()`]: Self::wake 417 /// [`wake_all()`]: Self::wake_all 418 pub fn wait(&self) -> Wait<'_> { 419 Wait { 420 queue: self, 421 waiter: self.waiter(), 422 } 423 } 424 425 /// Wait to be woken up by this queue, returning a future that's valid 426 /// for the `'static` lifetime. 427 /// 428 /// This returns a [`WaitOwned`] future that will complete when the task 429 /// is woken by a call to [`wake()`] or [`wake_all()`], or when the 430 /// `WaitQueue` is [closed]. 431 /// 432 /// This is identical to the [`wait()`] method, except that it takes a 433 /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future 434 /// to live for the `'static` lifetime. See the documentation for 435 /// [`wait()`] for details on how to use the future returned by this 436 /// method. 437 /// 438 /// # Returns 439 /// 440 /// The [`Future`] returned by this method completes with one of the 441 /// following [outputs](Future::Output): 442 /// 443 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or 444 /// [`wake_all()`]. 445 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` 446 /// being [closed]. 447 /// 448 /// # Cancellation 449 /// 450 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the 451 /// order that they started to wait. If a [`WaitOwned`] future is 452 /// dropped, the task will forfeit its position in the queue. 453 /// 454 /// [`wake()`]: Self::wake 455 /// [`wake_all()`]: Self::wake_all 456 /// [`wait()`]: Self::wait 457 /// [closed]: Self::close 458 pub fn wait_owned(self: &Arc<Self>) -> WaitOwned { 459 let waiter = self.waiter(); 460 let queue = self.clone(); 461 WaitOwned { queue, waiter } 462 } 463 464 /// Asynchronously poll the given function `f` until a condition occurs, 465 /// using the [`WaitQueue`] to only re-poll when notified. 466 /// 467 /// This can be used to implement a "wait loop", turning a "try" function 468 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. 469 /// "recv" or "send"). 470 /// 471 /// In particular, this function correctly *registers* interest in the [`WaitQueue`] 472 /// prior to polling the function, ensuring that there is not a chance of a race 473 /// where the condition occurs AFTER checking but BEFORE registering interest 474 /// in the [`WaitQueue`], which could lead to deadlock. 475 /// 476 /// This is intended to have similar behavior to `Condvar` in the standard library, 477 /// but asynchronous, and not requiring operating system intervention (or existence). 478 /// 479 /// In particular, this can be used in cases where interrupts or events are used 480 /// to signify readiness or completion of some task, such as the completion of a 481 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt 482 /// can wake the queue, allowing the polling function to check status fields for 483 /// partial progress or completion. 484 /// 485 /// Consider using [`Self::wait_for_value()`] if your function does return a value. 486 /// 487 /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for) 488 /// if you do not need multiple waiters. 489 /// 490 /// # Errors 491 /// 492 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed. 493 /// 494 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> { 495 loop { 496 let wait = self.wait(); 497 let mut wait = core::pin::pin!(wait); 498 let _ = wait.as_mut().subscribe()?; 499 if f() { 500 return Ok(()); 501 } 502 wait.await?; 503 } 504 } 505 506 /// Asynchronously poll the given function `f` until a condition occurs, 507 /// using the [`WaitQueue`] to only re-poll when notified. 508 /// 509 /// This can be used to implement a "wait loop", turning a "try" function 510 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. 511 /// "recv" or "send"). 512 /// 513 /// In particular, this function correctly *registers* interest in the [`WaitQueue`] 514 /// prior to polling the function, ensuring that there is not a chance of a race 515 /// where the condition occurs AFTER checking but BEFORE registering interest 516 /// in the [`WaitQueue`], which could lead to deadlock. 517 /// 518 /// This is intended to have similar behavior to `Condvar` in the standard library, 519 /// but asynchronous, and not requiring operating system intervention (or existence). 520 /// 521 /// In particular, this can be used in cases where interrupts or events are used 522 /// to signify readiness or completion of some task, such as the completion of a 523 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt 524 /// can wake the queue, allowing the polling function to check status fields for 525 /// partial progress or completion, and also return the status flags at the same time. 526 /// 527 /// Consider using [`Self::wait_for()`] if your function does not return a value. 528 /// 529 /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value) 530 /// if you do not need multiple waiters. 531 /// 532 /// # Errors 533 /// 534 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed. 535 /// 536 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> { 537 loop { 538 let wait = self.wait(); 539 let mut wait = core::pin::pin!(wait); 540 match wait.as_mut().subscribe() { 541 Poll::Ready(wr) => wr?, 542 Poll::Pending => {} 543 } 544 if let Some(t) = f() { 545 return Ok(t); 546 } 547 wait.await?; 548 } 549 } 550 551 /// Close the queue, indicating that it may no longer be used. 552 /// 553 /// Once a queue is closed, all [`wait()`] calls (current or future) will 554 /// return an error. 555 /// 556 /// This method is generally used when implementing higher-level 557 /// synchronization primitives or resources: when an event makes a resource 558 /// permanently unavailable, the queue can be closed. 559 /// 560 /// [`wait()`]: Self::wait 561 pub fn close(&self) { 562 let state = self 563 .state 564 .0 565 .fetch_or(StateInner::Closed.into_bits(), Ordering::SeqCst); 566 let state = State::from_bits(state); 567 if state.get(State::INNER) != StateInner::Waiting { 568 return; 569 } 570 571 let mut queue = self.queue.lock(); 572 let mut batch = WakeBatch::new(); 573 let mut waiters_remaining = true; 574 575 while waiters_remaining { 576 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All); 577 MutexGuard::unlocked(&mut queue, || batch.wake_all()); 578 } 579 } 580 581 /// Returns `true` if this `WaitQueue` is [closed](Self::close). 582 #[must_use] 583 pub fn is_closed(&self) -> bool { 584 self.load().get(State::INNER) == StateInner::Closed 585 } 586 587 /// Returns a [`Waiter`] entry in this queue. 588 /// 589 /// This is factored out into a separate function because it's used by both 590 /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`]. 591 fn waiter(&self) -> Waiter { 592 // how many times has `wake_all` been called when this waiter is created? 593 let current_wake_alls = self.load().get(State::WAKE_ALLS); 594 let state = WaitState::new() 595 .with(WaitState::WAKE_ALLS, current_wake_alls) 596 .with(WaitState::INNER, WaitStateInner::Start); 597 Waiter { 598 state, 599 node: UnsafeCell::new(WaiterInner { 600 links: list::Links::new(), 601 waker: Wakeup::Empty, 602 _pin: PhantomPinned, 603 }), 604 } 605 } 606 607 // fn try_wait(&self) -> Poll<Result<(), Closed>> { 608 // let mut state = self.load(); 609 // let initial_wake_alls = state.get(State::WAKE_ALLS); 610 // while state.get(State::INNER) == StateInner::Woken { 611 // match self.compare_exchange(state, state.with_inner(StateInner::Empty)) { 612 // Ok(_) => return Poll::Ready(Ok(())), 613 // Err(actual) => state = actual, 614 // } 615 // } 616 // 617 // match state.get(State::INNER) { 618 // StateInner::Closed => Poll::Ready(Err(Closed(()))), 619 // _ if state.get(State::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())), 620 // StateInner::Empty | StateInner::Waiting => Poll::Pending, 621 // StateInner::Woken => Poll::Ready(Ok(())), 622 // } 623 // } 624 625 #[cold] 626 #[inline(never)] 627 fn wake_locked(&self, queue: &mut List<Waiter>, curr: State) -> Option<Waker> { 628 let inner = curr.get(State::INNER); 629 630 // is the queue still in the `Waiting` state? it is possible that we 631 // transitioned to a different state while locking the queue. 632 if inner != StateInner::Waiting { 633 // if there are no longer any queued tasks, try to store the 634 // wakeup in the queue and bail. 635 if let Err(actual) = self.compare_exchange(curr, curr.with_inner(StateInner::Woken)) { 636 debug_assert!(actual.get(State::INNER) != StateInner::Waiting); 637 self.store(actual.with_inner(StateInner::Woken)); 638 } 639 640 return None; 641 } 642 643 // otherwise, we have to dequeue a task and wake it. 644 let node = queue 645 .pop_back() 646 .expect("if we are in the Waiting state, there must be waiters in the queue"); 647 let waker = Waiter::wake(node, queue, Wakeup::One); 648 649 // if we took the final waiter currently in the queue, transition to the 650 // `Empty` state. 651 if queue.is_empty() { 652 self.store(curr.with_inner(StateInner::Empty)); 653 } 654 655 waker 656 } 657 658 /// Drain waiters from `queue` and add them to `batch`. Returns `true` if 659 /// the batch was filled while more waiters remain in the queue, indicating 660 /// that this function must be called again to wake all waiters. 661 fn drain_to_wake_batch( 662 batch: &mut WakeBatch, 663 queue: &mut List<Waiter>, 664 wakeup: Wakeup, 665 ) -> bool { 666 while let Some(node) = queue.pop_back() { 667 let Some(waker) = Waiter::wake(node, queue, wakeup.clone()) else { 668 // this waiter was enqueued by `Wait::register` and doesn't have 669 // a waker, just keep going. 670 continue; 671 }; 672 673 if batch.add_waker(waker) { 674 // there's still room in the wake set, just keep adding to it. 675 continue; 676 } 677 678 // wake set is full, drop the lock and wake everyone! 679 break; 680 } 681 682 !queue.is_empty() 683 } 684 685 fn load(&self) -> State { 686 State::from_bits(self.state.0.load(Ordering::SeqCst)) 687 } 688 689 fn store(&self, state: State) { 690 self.state.0.store(state.0, Ordering::SeqCst); 691 } 692 693 fn compare_exchange(&self, current: State, new: State) -> Result<State, State> { 694 self.state 695 .0 696 .compare_exchange(current.0, new.0, Ordering::SeqCst, Ordering::SeqCst) 697 .map(State::from_bits) 698 .map_err(State::from_bits) 699 } 700} 701 702// === impl State & StateInner === 703 704impl State { 705 const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit(); 706 707 fn with_inner(self, inner: StateInner) -> Self { 708 self.with(Self::INNER, inner) 709 } 710} 711impl StateInner { 712 const fn into_usize(self) -> usize { 713 self as u8 as usize 714 } 715} 716 717impl FromBits<usize> for StateInner { 718 const BITS: u32 = 2; 719 type Error = core::convert::Infallible; 720 721 fn try_from_bits(bits: usize) -> Result<Self, Self::Error> { 722 #[expect( 723 clippy::cast_possible_truncation, 724 reason = "`bits` will only have 3 bits set" 725 )] 726 Ok(match bits as u8 { 727 bits if bits == Self::Empty as u8 => Self::Empty, 728 bits if bits == Self::Waiting as u8 => Self::Waiting, 729 bits if bits == Self::Woken as u8 => Self::Woken, 730 bits if bits == Self::Closed as u8 => Self::Closed, 731 _ => { 732 unreachable!(); 733 } 734 }) 735 } 736 737 fn into_bits(self) -> usize { 738 self as usize 739 } 740} 741 742// === impl Waiter === 743 744impl fmt::Debug for Waiter { 745 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 746 f.debug_struct("Waiter") 747 .field("state", &self.state) 748 .finish_non_exhaustive() 749 } 750} 751 752impl Waiter { 753 /// Returns the [`Waker`] for the task that owns this `Waiter`. 754 /// 755 /// # Safety 756 /// 757 /// This is only safe to call while the list is locked. The `list` 758 /// parameter ensures this method is only called while holding the lock, so 759 /// this can be safe. 760 /// 761 /// Of course, that must be the *same* list that this waiter is a member of, 762 /// and currently, there is no way to ensure that... 763 #[inline(always)] 764 fn wake(this: NonNull<Self>, list: &mut List<Self>, wakeup: Wakeup) -> Option<Waker> { 765 Waiter::with_inner(this, list, |node| { 766 let waker = mem::replace(&mut node.waker, wakeup); 767 match waker { 768 // the node has a registered waker, so wake the task. 769 Wakeup::Waiting(waker) => Some(waker), 770 // do nothing: the node was registered by `Wait::register` 771 // without a waker, so the future will already be woken when it is 772 // actually polled. 773 Wakeup::Empty => None, 774 // the node was already woken? this should not happen and 775 // probably indicates a race! 776 _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), 777 } 778 }) 779 } 780 781 /// # Safety 782 /// 783 /// This is only safe to call while the list is locked. The dummy `_list` 784 /// parameter ensures this method is only called while holding the lock, so 785 /// this can be safe. 786 /// 787 /// Of course, that must be the *same* list that this waiter is a member of, 788 /// and currently, there is no way to ensure that... 789 #[inline(always)] 790 fn with_inner<T>( 791 mut this: NonNull<Self>, 792 _list: &mut List<Self>, 793 f: impl FnOnce(&mut WaiterInner) -> T, 794 ) -> T { 795 // safety: this is only called while holding the lock on the queue, 796 // so it's safe to mutate the waiter. 797 unsafe { f(&mut *this.as_mut().node.get()) } 798 } 799 800 fn poll_wait( 801 mut self: Pin<&mut Self>, 802 queue: &WaitQueue, 803 waker: Option<&Waker>, 804 ) -> Poll<Result<(), Closed>> { 805 // Safety: we never move out of `ptr` below, only mutate its fields 806 let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self.as_mut())) }; 807 let mut this = self.as_mut().project(); 808 809 match this.state.get(WaitState::INNER) { 810 WaitStateInner::Start => { 811 let queue_state = queue.load(); 812 813 // can we consume a pending wakeup? 814 if queue 815 .compare_exchange( 816 queue_state.with_inner(StateInner::Woken), 817 queue_state.with_inner(StateInner::Empty), 818 ) 819 .is_ok() 820 { 821 this.state.set(WaitState::INNER, WaitStateInner::Woken); 822 return Poll::Ready(Ok(())); 823 } 824 825 // okay, no pending wakeups. try to wait... 826 827 let mut waiters = queue.queue.lock(); 828 let mut queue_state = queue.load(); 829 830 // the whole queue was woken while we were trying to acquire 831 // the lock! 832 if queue_state.get(State::WAKE_ALLS) != this.state.get(WaitState::WAKE_ALLS) { 833 this.state.set(WaitState::INNER, WaitStateInner::Woken); 834 return Poll::Ready(Ok(())); 835 } 836 837 // transition the queue to the waiting state 838 'to_waiting: loop { 839 match queue_state.get(State::INNER) { 840 // the queue is `Empty`, transition to `Waiting` 841 StateInner::Empty => { 842 match queue.compare_exchange( 843 queue_state, 844 queue_state.with_inner(StateInner::Waiting), 845 ) { 846 Ok(_) => break 'to_waiting, 847 Err(actual) => queue_state = actual, 848 } 849 } 850 // the queue is already `Waiting` 851 StateInner::Waiting => break 'to_waiting, 852 // the queue was woken, consume the wakeup. 853 StateInner::Woken => { 854 match queue.compare_exchange( 855 queue_state, 856 queue_state.with_inner(StateInner::Empty), 857 ) { 858 Ok(_) => { 859 this.state.set(WaitState::INNER, WaitStateInner::Woken); 860 return Poll::Ready(Ok(())); 861 } 862 Err(actual) => queue_state = actual, 863 } 864 } 865 StateInner::Closed => return Poll::Ready(Err(Closed(()))), 866 } 867 } 868 869 // enqueue the node 870 this.state.set(WaitState::INNER, WaitStateInner::Waiting); 871 if let Some(waker) = waker { 872 // safety: we may mutate the inner state because we are 873 // holding the lock. 874 unsafe { 875 let node = this.node.as_mut().get(); 876 debug_assert!(matches!((*node).waker, Wakeup::Empty)); 877 (*node).waker = Wakeup::Waiting(waker.clone()); 878 } 879 } 880 881 waiters.push_front(ptr); 882 883 Poll::Pending 884 } 885 WaitStateInner::Waiting => { 886 let _waiters = queue.queue.lock(); 887 // safety: we may mutate the inner state because we are 888 // holding the lock. 889 unsafe { 890 let node = &mut *this.node.get(); 891 match node.waker { 892 Wakeup::Waiting(ref mut curr_waker) => { 893 match waker { 894 Some(waker) if !curr_waker.will_wake(waker) => { 895 *curr_waker = waker.clone(); 896 } 897 _ => {} 898 } 899 Poll::Pending 900 } 901 Wakeup::All | Wakeup::One => { 902 this.state.set(WaitState::INNER, WaitStateInner::Woken); 903 Poll::Ready(Ok(())) 904 } 905 // Wakeup::Closed => { 906 // this.state.set(WaitState::INNER, WaitStateInner::Woken); 907 // Poll::Ready(Err(Closed(()))) 908 // } 909 Wakeup::Empty => { 910 if let Some(waker) = waker { 911 node.waker = Wakeup::Waiting(waker.clone()); 912 } 913 914 Poll::Pending 915 } 916 } 917 } 918 } 919 WaitStateInner::Woken => Poll::Ready(Ok(())), 920 } 921 } 922 923 /// Release this `Waiter` from the queue. 924 /// 925 /// This is called from the `drop` implementation for the [`Wait`] and 926 /// [`WaitOwned`] futures. 927 fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) { 928 let state = *(self.as_mut().project().state); 929 // Safety: we never move out of `ptr` below, only mutate its fields 930 let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) }); 931 932 // if we're not enqueued, we don't have to do anything else. 933 if state.get(WaitState::INNER) != WaitStateInner::Waiting { 934 return; 935 } 936 937 let mut waiters = queue.queue.lock(); 938 let state = queue.load(); 939 // remove the node 940 // safety: we have the lock on the queue, so this is safe. 941 unsafe { 942 waiters.remove(ptr); 943 }; 944 945 // if we removed the last waiter from the queue, transition the state to 946 // `Empty`. 947 if waiters.is_empty() && state.get(State::INNER) == StateInner::Waiting { 948 queue.store(state.with_inner(StateInner::Empty)); 949 } 950 951 // if the node has an unconsumed wakeup, it must be assigned to the next 952 // node in the queue. 953 let next_waiter = 954 if Waiter::with_inner(ptr, &mut waiters, |node| matches!(&node.waker, Wakeup::One)) { 955 queue.wake_locked(&mut waiters, state) 956 } else { 957 None 958 }; 959 960 drop(waiters); 961 962 if let Some(next) = next_waiter { 963 next.wake(); 964 } 965 } 966} 967 968// Safety: TODO 969unsafe impl Linked<list::Links<Self>> for Waiter { 970 type Handle = NonNull<Waiter>; 971 972 fn into_ptr(r: Self::Handle) -> NonNull<Self> { 973 r 974 } 975 976 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle { 977 ptr 978 } 979 980 unsafe fn links(target: NonNull<Self>) -> NonNull<list::Links<Waiter>> { 981 // Safety: ensured by caller 982 unsafe { 983 // Safety: using `ptr::addr_of!` avoids creating a temporary 984 // reference, which stacked borrows dislikes. 985 let node = &*ptr::addr_of!((*target.as_ptr()).node); 986 let links = ptr::addr_of_mut!((*node.get()).links); 987 // Safety: since the `target` pointer is `NonNull`, we can assume 988 // that pointers to its members are also not null, making this use 989 // of `new_unchecked` fine. 990 NonNull::new_unchecked(links) 991 } 992 } 993} 994 995// === impl Wait === 996 997impl Wait<'_> { 998 /// Returns `true` if this `WaitOwned` future is waiting for a 999 /// notification from the provided [`WaitQueue`]. 1000 #[inline] 1001 #[must_use] 1002 pub fn waits_on(&self, queue: &WaitQueue) -> bool { 1003 ptr::eq(self.queue, queue) 1004 } 1005 1006 /// Returns `true` if `self` and `other` are waiting on a notification 1007 /// from the same [`WaitQueue`]. 1008 #[inline] 1009 #[must_use] 1010 pub fn same_queue(&self, other: &Wait<'_>) -> bool { 1011 ptr::eq(self.queue, other.queue) 1012 } 1013 1014 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`]. 1015 /// 1016 /// Polling a `Wait` future adds that future to the list of waiters that may 1017 /// receive a wakeup from a `WaitQueue`. However, in some cases, it is 1018 /// desirable to subscribe to wakeups *prior* to actually waiting for one. 1019 /// This method should be used when it is necessary to ensure a `Wait` 1020 /// future is in the list of waiters before the future is `poll`ed for the 1021 /// first time. 1022 /// 1023 /// In general, this method is used in cases where a [`WaitQueue`] must 1024 /// synchronize with some additional state, such as an `AtomicBool` or 1025 /// counter. If a task first checks that state, and then chooses whether or 1026 /// not to wait on the `WaitQueue` based on that state, then a race 1027 /// condition may occur where the `WaitQueue` wakes waiters *between* when 1028 /// the task checked the external state and when it first polled its `Wait` 1029 /// future to wait on the queue. This method allows registering the `Wait` 1030 /// future with the queue *prior* to checking the external state, without 1031 /// actually sleeping, so that when the task does wait for the `Wait` future 1032 /// to complete, it will have received any wakeup that was sent between when 1033 /// the external state was checked and the `Wait` future was first polled. 1034 /// 1035 /// # Returns 1036 /// 1037 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready` a wakeup was 1038 /// already received. This method returns [`Poll::Ready`] in the following 1039 /// cases: 1040 /// 1041 /// 1. The [`WaitQueue::wake()`] method was called between the creation of the 1042 /// `Wait` and the call to this method. 1043 /// 2. This is the first call to `subscribe` or `poll` on this future, and the 1044 /// `WaitQueue` was holding a stored wakeup from a previous call to 1045 /// [`wake()`]. This method consumes the wakeup in that case. 1046 /// 3. The future has previously been `subscribe`d or polled, and it has since 1047 /// then been marked ready by either consuming a wakeup from the 1048 /// `WaitQueue`, or by a call to [`wake()`] or [`wake_all()`] that 1049 /// removed it from the list of futures ready to receive wakeups. 1050 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which case 1051 /// this method returns `Poll::Ready(Err(Closed))`. 1052 /// 1053 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of this 1054 /// `Wait` future will also immediately return [`Poll::Ready`]. 1055 /// 1056 /// If the [`Wait`] future subscribed to wakeups from the queue, and 1057 /// has not been woken, this method returns [`Poll::Pending`]. 1058 /// 1059 /// [`wake()`]: WaitQueue::wake 1060 /// [`wake_all()`]: WaitQueue::wake_all 1061 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> { 1062 let this = self.project(); 1063 this.waiter.poll_wait(this.queue, None) 1064 } 1065} 1066 1067impl Future for Wait<'_> { 1068 type Output = Result<(), Closed>; 1069 1070 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1071 let this = self.project(); 1072 this.waiter.poll_wait(this.queue, Some(cx.waker())) 1073 } 1074} 1075 1076#[pinned_drop] 1077impl PinnedDrop for Wait<'_> { 1078 fn drop(mut self: Pin<&mut Self>) { 1079 let this = self.project(); 1080 this.waiter.release(this.queue); 1081 } 1082} 1083 1084// === impl WaitOwned === 1085 1086impl WaitOwned { 1087 /// Returns `true` if this `WaitOwned` future is waiting for a 1088 /// notification from the provided [`WaitQueue`]. 1089 #[inline] 1090 #[must_use] 1091 pub fn waits_on(&self, queue: &WaitQueue) -> bool { 1092 ptr::eq(&*self.queue, queue) 1093 } 1094 1095 /// Returns `true` if `self` and `other` are waiting on a notification 1096 /// from the same [`WaitQueue`]. 1097 #[inline] 1098 #[must_use] 1099 pub fn same_queue(&self, other: &Wait<'_>) -> bool { 1100 ptr::eq(&*self.queue, other.queue) 1101 } 1102 1103 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`]. 1104 /// 1105 /// Polling a `WaitOwned` future adds that future to the list of waiters 1106 /// that may receive a wakeup from a `WaitQueue`. However, in some 1107 /// cases, it is desirable to subscribe to wakeups *prior* to actually 1108 /// waiting for one. This method should be used when it is necessary to 1109 /// ensure a `WaitOwned` future is in the list of waiters before the 1110 /// future is `poll`ed for the rst time. 1111 /// 1112 /// In general, this method is used in cases where a [`WaitQueue`] must 1113 /// synchronize with some additional state, such as an `AtomicBool` or 1114 /// counter. If a task first checks that state, and then chooses whether or 1115 /// not to wait on the `WaitQueue` based on that state, then a race 1116 /// condition may occur where the `WaitQueue` wakes waiters *between* when 1117 /// the task checked the external state and when it first polled its 1118 /// `WaitOwned` future to wait on the queue. This method allows 1119 /// registering the `WaitOwned` future with the queue *prior* to 1120 /// checking the external state, without actually sleeping, so that when 1121 /// the task does wait for the `WaitOwned` future to complete, it will 1122 /// have received any wakeup that was sent between when the external 1123 /// state was checked and the `WaitOwned` future was first polled. 1124 /// 1125 /// # Returns 1126 /// 1127 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready` 1128 /// a wakeup was already received. This method returns [`Poll::Ready`] 1129 /// in the following cases: 1130 /// 1131 /// 1. The [`WaitQueue::wake()`] method was called between the creation 1132 /// of the `WaitOwned` future and the call to this method. 1133 /// 2. This is the first call to `subscribe` or `poll` on this future, 1134 /// and the `WaitQueue` was holding a stored wakeup from a previous 1135 /// call to [`wake()`]. This method consumes the wakeup in that case. 1136 /// 3. The future has previously been `subscribe`d or polled, and it 1137 /// has since then been marked ready by either consuming a wakeup 1138 /// from the `WaitQueue`, or by a call to [`wake()`] or 1139 /// [`wake_all()`] that removed it from the list of futures ready to 1140 /// receive wakeups. 1141 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which 1142 /// case this method returns `Poll::Ready(Err(Closed))`. 1143 /// 1144 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of 1145 /// this `Wait` future will also immediately return [`Poll::Ready`]. 1146 /// 1147 /// If the [`WaitOwned`] future subscribed to wakeups from the queue, 1148 /// and has not been woken, this method returns [`Poll::Pending`]. 1149 /// 1150 /// [`wake()`]: WaitQueue::wake 1151 /// [`wake_all()`]: WaitQueue::wake_all 1152 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> { 1153 let this = self.project(); 1154 this.waiter.poll_wait(this.queue, None) 1155 } 1156} 1157 1158impl Future for WaitOwned { 1159 type Output = Result<(), Closed>; 1160 1161 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1162 let this = self.project(); 1163 this.waiter.poll_wait(&*this.queue, Some(cx.waker())) 1164 } 1165} 1166 1167#[pinned_drop] 1168impl PinnedDrop for WaitOwned { 1169 fn drop(mut self: Pin<&mut Self>) { 1170 let this = self.project(); 1171 this.waiter.release(&*this.queue); 1172 } 1173}