Next Generation WASM Microkernel Operating System
at trap_handler 1171 lines 46 kB view raw
1// Copyright 2025 Jonas Kruckenberg 2// 3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5// http://opensource.org/licenses/MIT>, at your option. This file may not be 6// copied, modified, or distributed except according to those terms. 7 8use crate::error::Closed; 9use crate::sync::wake_batch::WakeBatch; 10use alloc::sync::Arc; 11use cordyceps::{Linked, List, list}; 12use core::cell::UnsafeCell; 13use core::marker::PhantomPinned; 14use core::pin::Pin; 15use core::ptr::NonNull; 16use core::sync::atomic::{AtomicUsize, Ordering}; 17use core::task::{Context, Poll, Waker}; 18use core::{fmt, mem, ptr}; 19use mycelium_bitfield::{FromBits, bitfield, enum_from_bits}; 20use pin_project::{pin_project, pinned_drop}; 21use spin::{Mutex, MutexGuard}; 22use util::{CachePadded, loom_const_fn}; 23 24/// A queue of waiting tasks which can be [woken in first-in, first-out 25/// order][wake], or [all at once][wake_all]. 26/// 27/// This type is taken from [maitake-sync](https://github.com/hawkw/mycelium/blob/dd0020892564c77ee4c20ffbc2f7f5b046ad54c8/maitake-sync/src/wait_queue.rs#L1577). 28/// 29/// A `WaitQueue` allows any number of tasks to [wait] asynchronously and be 30/// woken when some event occurs, either [individually][wake] in first-in, 31/// first-out order, or [all at once][wake_all]. This makes it a vital building 32/// block of runtime services (such as timers or I/O resources), where it may be 33/// used to wake a set of tasks when a timer completes or when a resource 34/// becomes available. It can be equally useful for implementing higher-level 35/// synchronization primitives: for example, a `WaitQueue` plus an 36/// [`UnsafeCell`] is essentially an entire implementation of a fair 37/// asynchronous mutex. Finally, a `WaitQueue` can be a useful 38/// synchronization primitive on its own: sometimes, you just need to have a 39/// bunch of tasks wait for something and then wake them all up. 40/// 41/// # Implementation Notes 42/// 43/// This type is currently implemented using [intrusive doubly-linked 44/// list][ilist]. 45/// 46/// The *[intrusive]* aspect of this map is important, as it means that it does 47/// not allocate memory. Instead, nodes in the linked list are stored in the 48/// futures of tasks trying to wait for capacity. This means that it is not 49/// necessary to allocate any heap memory for each task waiting to be woken. 50/// 51/// However, the intrusive linked list introduces one new danger: because 52/// futures can be *cancelled*, and the linked list nodes live within the 53/// futures trying to wait on the queue, we *must* ensure that the node 54/// is unlinked from the list before dropping a cancelled future. Failure to do 55/// so would result in the list containing dangling pointers. Therefore, we must 56/// use a *doubly-linked* list, so that nodes can edit both the previous and 57/// next node when they have to remove themselves. This is kind of a bummer, as 58/// it means we can't use something nice like this [intrusive queue by Dmitry 59/// Vyukov][2], and there are not really practical designs for lock-free 60/// doubly-linked lists that don't rely on some kind of deferred reclamation 61/// scheme such as hazard pointers or QSBR. 62/// 63/// Instead, we just stick a [`Mutex`] around the linked list, which must be 64/// acquired to pop nodes from it, or for nodes to remove themselves when 65/// futures are cancelled. This is a bit sad, but the critical sections for this 66/// mutex are short enough that we still get pretty good performance despite it. 67/// 68/// [intrusive]: https://fuchsia.dev/fuchsia-src/development/languages/c-cpp/fbl_containers_guide/introduction 69/// [2]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue 70/// [`Waker`]: Waker 71/// [wait]: WaitQueue::wait 72/// [wake]: WaitQueue::wake 73/// [wake_all]: WaitQueue::wake_all 74/// [`UnsafeCell`]: UnsafeCell 75/// [ilist]: linked_list::List 76#[derive(Debug)] 77pub struct WaitQueue { 78 /// The wait maps's state variable. 79 state: CachePadded<AtomicUsize>, 80 /// The linked list of waiters. 81 /// 82 /// # Safety 83 /// 84 /// This is protected by a mutex; the mutex *must* be acquired when 85 /// manipulating the linked list, OR when manipulating waiter nodes that may 86 /// be linked into the list. If a node is known to not be linked, it is safe 87 /// to modify that node (such as by waking the stored [`Waker`]) without 88 /// holding the lock; otherwise, it may be modified through the list, so the 89 /// lock must be held when modifying the 90 /// node. 91 queue: Mutex<List<Waiter>>, 92} 93 94bitfield! { 95 #[derive(Eq, PartialEq)] 96 struct State<usize> { 97 /// The queue's state. 98 const INNER: StateInner; 99 100 /// The number of times [`WaitQueue::wake_all`] has been called. 101 const WAKE_ALLS = ..; 102 } 103} 104 105/// The queue's current state. 106#[derive(Debug, Copy, Clone, Eq, PartialEq)] 107#[repr(u8)] 108enum StateInner { 109 /// No waiters are queued, and there is no pending notification. 110 /// Waiting while the queue is in this state will enqueue the waiter; 111 /// notifying while in this state will store a pending notification in the 112 /// queue, transitioning to [`StateInner::Woken`]. 113 Empty = 0b00, 114 /// There are one or more waiters in the queue. Waiting while 115 /// the queue is in this state will not transition the state. Waking while 116 /// in this state will wake the first waiter in the queue; if this empties 117 /// the queue, then the queue will transition to [`StateInner::Empty`]. 118 Waiting = 0b01, 119 /// The queue has a stored notification. Waiting while the queue 120 /// is in this state will consume the pending notification *without* 121 /// enqueueing the waiter and transition the queue to [`StateInner::Empty`]. 122 /// Waking while in this state will leave the queue in this state. 123 Woken = 0b10, 124 /// The queue is closed. Waiting while in this state will return 125 /// [`Closed`] without transitioning the queue's state. 126 /// 127 /// *Note*: This *must* correspond to all state bits being set, as it's set 128 /// via a [`fetch_or`]. 129 /// 130 /// [`fetch_or`]: AtomicUsize::fetch_or 131 Closed = 0b11, 132} 133 134#[pin_project(PinnedDrop)] 135#[derive(Debug)] 136#[must_use = "futures do nothing unless `.await`ed or `poll`ed"] 137pub struct Wait<'a> { 138 // The [`WaitQueue`] being waited on. 139 queue: &'a WaitQueue, 140 // Entry in the wait queue linked list. 141 #[pin] 142 waiter: Waiter, 143} 144 145/// Future returned from [`WaitQueue::wait_owned()`]. 146/// 147/// This is identical to the [`Wait`] future, except that it takes an 148/// [`Arc`] reference to the [`WaitQueue`], allowing the returned future to 149/// live for the `'static` lifetime. 150/// 151/// This future is fused, so once it has completed, any future calls to poll 152/// will immediately return [`Poll::Ready`]. 153/// 154/// # Notes 155/// 156/// This future is `!Unpin`, as it is unsafe to [`core::mem::forget`] a 157/// `WaitOwned` future once it has been polled. For instance, the following 158/// code must not compile: 159/// 160///```compile_fail 161/// use maitake_sync::wait_queue::WaitOwned; 162/// 163/// // Calls to this function should only compile if `T` is `Unpin`. 164/// fn assert_unpin<T: Unpin>() {} 165/// 166/// assert_unpin::<WaitOwned<'_>>(); 167/// ``` 168#[pin_project(PinnedDrop)] 169#[derive(Debug)] 170pub struct WaitOwned { 171 // The `WaitQueue` being waited on. 172 queue: Arc<WaitQueue>, 173 // Entry in the wait queue. 174 #[pin] 175 waiter: Waiter, 176} 177 178/// A waiter node which may be linked into a wait queue. 179#[pin_project] 180#[repr(C)] 181struct Waiter { 182 // The intrusive linked list node. 183 // 184 // This *must* be the first field in the struct in order for the `Linked` 185 // implementation to be sound. 186 #[pin] 187 node: UnsafeCell<WaiterInner>, 188 // The future's state. 189 state: WaitState, 190} 191 192struct WaiterInner { 193 /// Intrusive linked list pointers. 194 links: list::Links<Waiter>, 195 /// The node's waker 196 waker: Wakeup, 197 // This type is !Unpin due to the heuristic from: 198 // <https://github.com/rust-lang/rust/pull/82834> 199 _pin: PhantomPinned, 200} 201 202bitfield! { 203 #[derive(Eq, PartialEq)] 204 struct WaitState<usize> { 205 /// The waiter's state. 206 const INNER: WaitStateInner; 207 /// The number of times [`WaitQueue::wake_all`] has been called. 208 const WAKE_ALLS = ..; 209 } 210} 211 212enum_from_bits! { 213 /// The state of a [`Waiter`] node in a [`WaitQueue`]. 214 #[derive(Debug, Eq, PartialEq)] 215 enum WaitStateInner<u8> { 216 /// The waiter has not yet been enqueued. 217 /// 218 /// The number of times [`WaitQueue::wake_all`] has been called is stored 219 /// when the node is created, in order to determine whether it was woken by 220 /// a stored wakeup when enqueueing. 221 /// 222 /// When in this state, the node is **not** part of the linked list, and 223 /// can be dropped without removing it from the list. 224 Start = 0b00, 225 226 /// The waiter is waiting. 227 /// 228 /// When in this state, the node **is** part of the linked list. If the 229 /// node is dropped in this state, it **must** be removed from the list 230 /// before dropping it. Failure to ensure this will result in dangling 231 /// pointers in the linked list! 232 Waiting = 0b01, 233 234 /// The waiter has been woken. 235 /// 236 /// When in this state, the node is **not** part of the linked list, and 237 /// can be dropped without removing it from the list. 238 Woken = 0b10, 239 } 240} 241 242#[derive(Clone, Debug)] 243enum Wakeup { 244 Empty, 245 Waiting(Waker), 246 One, 247 All, 248 // Closed, 249} 250 251// === impl WaitQueue === 252 253impl Default for WaitQueue { 254 fn default() -> Self { 255 Self::new() 256 } 257} 258 259impl WaitQueue { 260 loom_const_fn! { 261 pub const fn new() -> Self { 262 Self { 263 state: CachePadded(AtomicUsize::new(StateInner::Empty.into_usize())), 264 queue: Mutex::new(List::new()), 265 } 266 } 267 } 268 269 /// Wake the next task in the queue. 270 /// 271 /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the 272 /// **next** call to [`wait().await`] will complete immediately. If one or more 273 /// tasks are currently in the queue, the first task in the queue is woken. 274 /// 275 /// At most one wakeup will be stored in the queue at any time. If `wake()` 276 /// is called many times while there are no tasks in the queue, only a 277 /// single wakeup is stored. 278 /// 279 /// [`wait().await`]: Self::wait() 280 #[inline] 281 pub fn wake(&self) { 282 // snapshot the queue's current state. 283 let mut state = self.load(); 284 285 // check if any tasks are currently waiting on this queue. if there are 286 // no waiting tasks, store the wakeup to be consumed by the next call to 287 // `wait`. 288 loop { 289 match state.get(State::INNER) { 290 // if the queue is closed, bail. 291 StateInner::Closed => return, 292 // if there are waiting tasks, break out of the loop and wake one. 293 StateInner::Waiting => break, 294 _ => {} 295 } 296 297 let next = state.with_inner(StateInner::Woken); 298 // advance the state to `Woken`, and return (if we did so 299 // successfully) 300 match self.compare_exchange(state, next) { 301 Ok(_) => return, 302 Err(actual) => state = actual, 303 } 304 } 305 306 // okay, there are tasks waiting on the queue; we must acquire the lock 307 // on the linked list and wake the next task from the queue. 308 let mut queue = self.queue.lock(); 309 310 // the queue's state may have changed while we were waiting to acquire 311 // the lock, so we need to acquire a new snapshot before we take the 312 // waker. 313 state = self.load(); 314 let waker = self.wake_locked(&mut queue, state); 315 drop(queue); 316 317 //now that we've released the lock, wake the waiting task (if we 318 //actually deuqueued one). 319 if let Some(waker) = waker { 320 waker.wake(); 321 } 322 } 323 324 /// Wake *all* tasks currently in the queue. 325 /// 326 /// All tasks currently waiting on the queue are woken. Unlike [`wake()`], a 327 /// wakeup is *not* stored in the queue to wake the next call to [`wait()`] 328 /// if the queue is empty. Instead, this method only wakes all currently 329 /// registered waiters. Registering a task to be woken is done by `await`ing 330 /// the [`Future`] returned by the [`wait()`] method on this queue. 331 /// 332 /// [`wake()`]: Self::wake 333 /// [`wait()`]: Self::wait 334 #[expect(clippy::missing_panics_doc, reason = "internal assertion")] 335 pub fn wake_all(&self) -> usize { 336 let mut batch = WakeBatch::new(); 337 let mut waiters_remaining = true; 338 339 let mut queue = self.queue.lock(); 340 let state = self.load(); 341 342 match state.get(State::INNER) { 343 StateInner::Woken | StateInner::Empty => { 344 self.state 345 .0 346 .fetch_add(State::ONE_WAKE_ALL, Ordering::SeqCst); 347 return 0; 348 } 349 StateInner::Closed => return 0, 350 StateInner::Waiting => { 351 let next_state = State::new() 352 .with_inner(StateInner::Empty) 353 .with(State::WAKE_ALLS, state.get(State::WAKE_ALLS) + 1); 354 self.compare_exchange(state, next_state) 355 .expect("state should not have transitioned while locked"); 356 } 357 } 358 359 // As long as there are waiters remaining to wake, lock the queue, drain 360 // another batch, release the lock, and wake them. 361 let mut woken = 0; 362 while waiters_remaining { 363 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All); 364 woken += batch.len(); 365 MutexGuard::unlocked(&mut queue, || batch.wake_all()); 366 } 367 368 woken 369 } 370 371 /// Wait to be woken up by this queue. 372 /// 373 /// Equivalent to: 374 /// 375 /// ```ignore 376 /// async fn wait(&self); 377 /// ``` 378 /// 379 /// This returns a [`Wait`] [`Future`] that will complete when the task is 380 /// woken by a call to [`wake()`] or [`wake_all()`], or when the `WaitQueue` 381 /// is dropped. 382 /// 383 /// Each `WaitQueue` holds a single wakeup. If [`wake()`] was previously 384 /// called while no tasks were waiting on the queue, then `wait().await` 385 /// will complete immediately, consuming the stored wakeup. Otherwise, 386 /// `wait().await` waits to be woken by the next call to [`wake()`] or 387 /// [`wake_all()`]. 388 /// 389 /// The [`Wait`] future is not guaranteed to receive wakeups from calls to 390 /// [`wake()`] if it has not yet been polled. See the documentation for the 391 /// [`Wait::subscribe()`] method for details on receiving wakeups from the 392 /// queue prior to polling the `Wait` future for the first time. 393 /// 394 /// A `Wait` future **is** is guaranteed to receive wakeups from calls to 395 /// [`wake_all()`] as soon as it is created, even if it has not yet been 396 /// polled. 397 /// 398 /// # Returns 399 /// 400 /// The [`Future`] returned by this method completes with one of the 401 /// following [outputs](Future::Output): 402 /// 403 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or 404 /// [`wake_all()`]. 405 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` being 406 /// [`close`d](WaitQueue::close). 407 /// 408 /// # Cancellation 409 /// 410 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the order 411 /// that they started to wait. If a [`Wait`] future is dropped, the task 412 /// will forfeit its position in the queue. 413 /// 414 /// [`wake()`]: Self::wake 415 /// [`wake_all()`]: Self::wake_all 416 pub fn wait(&self) -> Wait<'_> { 417 Wait { 418 queue: self, 419 waiter: self.waiter(), 420 } 421 } 422 423 /// Wait to be woken up by this queue, returning a future that's valid 424 /// for the `'static` lifetime. 425 /// 426 /// This returns a [`WaitOwned`] future that will complete when the task 427 /// is woken by a call to [`wake()`] or [`wake_all()`], or when the 428 /// `WaitQueue` is [closed]. 429 /// 430 /// This is identical to the [`wait()`] method, except that it takes a 431 /// [`Arc`] reference to the [`WaitQueue`], allowing the returned future 432 /// to live for the `'static` lifetime. See the documentation for 433 /// [`wait()`] for details on how to use the future returned by this 434 /// method. 435 /// 436 /// # Returns 437 /// 438 /// The [`Future`] returned by this method completes with one of the 439 /// following [outputs](Future::Output): 440 /// 441 /// - [`Ok`]`(())` if the task was woken by a call to [`wake()`] or 442 /// [`wake_all()`]. 443 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by the `WaitQueue` 444 /// being [closed]. 445 /// 446 /// # Cancellation 447 /// 448 /// A `WaitQueue` fairly distributes wakeups to waiting tasks in the 449 /// order that they started to wait. If a [`WaitOwned`] future is 450 /// dropped, the task will forfeit its position in the queue. 451 /// 452 /// [`wake()`]: Self::wake 453 /// [`wake_all()`]: Self::wake_all 454 /// [`wait()`]: Self::wait 455 /// [closed]: Self::close 456 pub fn wait_owned(self: &Arc<Self>) -> WaitOwned { 457 let waiter = self.waiter(); 458 let queue = self.clone(); 459 WaitOwned { queue, waiter } 460 } 461 462 /// Asynchronously poll the given function `f` until a condition occurs, 463 /// using the [`WaitQueue`] to only re-poll when notified. 464 /// 465 /// This can be used to implement a "wait loop", turning a "try" function 466 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. 467 /// "recv" or "send"). 468 /// 469 /// In particular, this function correctly *registers* interest in the [`WaitQueue`] 470 /// prior to polling the function, ensuring that there is not a chance of a race 471 /// where the condition occurs AFTER checking but BEFORE registering interest 472 /// in the [`WaitQueue`], which could lead to deadlock. 473 /// 474 /// This is intended to have similar behavior to `Condvar` in the standard library, 475 /// but asynchronous, and not requiring operating system intervention (or existence). 476 /// 477 /// In particular, this can be used in cases where interrupts or events are used 478 /// to signify readiness or completion of some task, such as the completion of a 479 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt 480 /// can wake the queue, allowing the polling function to check status fields for 481 /// partial progress or completion. 482 /// 483 /// Consider using [`Self::wait_for_value()`] if your function does return a value. 484 /// 485 /// Consider using [`WaitCell::wait_for()`](super::wait_cell::WaitCell::wait_for) 486 /// if you do not need multiple waiters. 487 /// 488 /// # Errors 489 /// 490 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed. 491 /// 492 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> { 493 loop { 494 let wait = self.wait(); 495 let mut wait = core::pin::pin!(wait); 496 let _ = wait.as_mut().subscribe()?; 497 if f() { 498 return Ok(()); 499 } 500 wait.await?; 501 } 502 } 503 504 /// Asynchronously poll the given function `f` until a condition occurs, 505 /// using the [`WaitQueue`] to only re-poll when notified. 506 /// 507 /// This can be used to implement a "wait loop", turning a "try" function 508 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. 509 /// "recv" or "send"). 510 /// 511 /// In particular, this function correctly *registers* interest in the [`WaitQueue`] 512 /// prior to polling the function, ensuring that there is not a chance of a race 513 /// where the condition occurs AFTER checking but BEFORE registering interest 514 /// in the [`WaitQueue`], which could lead to deadlock. 515 /// 516 /// This is intended to have similar behavior to `Condvar` in the standard library, 517 /// but asynchronous, and not requiring operating system intervention (or existence). 518 /// 519 /// In particular, this can be used in cases where interrupts or events are used 520 /// to signify readiness or completion of some task, such as the completion of a 521 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt 522 /// can wake the queue, allowing the polling function to check status fields for 523 /// partial progress or completion, and also return the status flags at the same time. 524 /// 525 /// Consider using [`Self::wait_for()`] if your function does not return a value. 526 /// 527 /// Consider using [`WaitCell::wait_for_value()`](super::wait_cell::WaitCell::wait_for_value) 528 /// if you do not need multiple waiters. 529 /// 530 /// # Errors 531 /// 532 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitQueue`] is closed. 533 /// 534 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> { 535 loop { 536 let wait = self.wait(); 537 let mut wait = core::pin::pin!(wait); 538 match wait.as_mut().subscribe() { 539 Poll::Ready(wr) => wr?, 540 Poll::Pending => {} 541 } 542 if let Some(t) = f() { 543 return Ok(t); 544 } 545 wait.await?; 546 } 547 } 548 549 /// Close the queue, indicating that it may no longer be used. 550 /// 551 /// Once a queue is closed, all [`wait()`] calls (current or future) will 552 /// return an error. 553 /// 554 /// This method is generally used when implementing higher-level 555 /// synchronization primitives or resources: when an event makes a resource 556 /// permanently unavailable, the queue can be closed. 557 /// 558 /// [`wait()`]: Self::wait 559 pub fn close(&self) { 560 let state = self 561 .state 562 .0 563 .fetch_or(StateInner::Closed.into_bits(), Ordering::SeqCst); 564 let state = State::from_bits(state); 565 if state.get(State::INNER) != StateInner::Waiting { 566 return; 567 } 568 569 let mut queue = self.queue.lock(); 570 let mut batch = WakeBatch::new(); 571 let mut waiters_remaining = true; 572 573 while waiters_remaining { 574 waiters_remaining = Self::drain_to_wake_batch(&mut batch, &mut queue, Wakeup::All); 575 MutexGuard::unlocked(&mut queue, || batch.wake_all()); 576 } 577 } 578 579 /// Returns `true` if this `WaitQueue` is [closed](Self::close). 580 #[must_use] 581 pub fn is_closed(&self) -> bool { 582 self.load().get(State::INNER) == StateInner::Closed 583 } 584 585 /// Returns a [`Waiter`] entry in this queue. 586 /// 587 /// This is factored out into a separate function because it's used by both 588 /// [`WaitQueue::wait`] and [`WaitQueue::wait_owned`]. 589 fn waiter(&self) -> Waiter { 590 // how many times has `wake_all` been called when this waiter is created? 591 let current_wake_alls = self.load().get(State::WAKE_ALLS); 592 let state = WaitState::new() 593 .with(WaitState::WAKE_ALLS, current_wake_alls) 594 .with(WaitState::INNER, WaitStateInner::Start); 595 Waiter { 596 state, 597 node: UnsafeCell::new(WaiterInner { 598 links: list::Links::new(), 599 waker: Wakeup::Empty, 600 _pin: PhantomPinned, 601 }), 602 } 603 } 604 605 // fn try_wait(&self) -> Poll<Result<(), Closed>> { 606 // let mut state = self.load(); 607 // let initial_wake_alls = state.get(State::WAKE_ALLS); 608 // while state.get(State::INNER) == StateInner::Woken { 609 // match self.compare_exchange(state, state.with_inner(StateInner::Empty)) { 610 // Ok(_) => return Poll::Ready(Ok(())), 611 // Err(actual) => state = actual, 612 // } 613 // } 614 // 615 // match state.get(State::INNER) { 616 // StateInner::Closed => Poll::Ready(Err(Closed(()))), 617 // _ if state.get(State::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())), 618 // StateInner::Empty | StateInner::Waiting => Poll::Pending, 619 // StateInner::Woken => Poll::Ready(Ok(())), 620 // } 621 // } 622 623 #[cold] 624 #[inline(never)] 625 fn wake_locked(&self, queue: &mut List<Waiter>, curr: State) -> Option<Waker> { 626 let inner = curr.get(State::INNER); 627 628 // is the queue still in the `Waiting` state? it is possible that we 629 // transitioned to a different state while locking the queue. 630 if inner != StateInner::Waiting { 631 // if there are no longer any queued tasks, try to store the 632 // wakeup in the queue and bail. 633 if let Err(actual) = self.compare_exchange(curr, curr.with_inner(StateInner::Woken)) { 634 debug_assert!(actual.get(State::INNER) != StateInner::Waiting); 635 self.store(actual.with_inner(StateInner::Woken)); 636 } 637 638 return None; 639 } 640 641 // otherwise, we have to dequeue a task and wake it. 642 let node = queue 643 .pop_back() 644 .expect("if we are in the Waiting state, there must be waiters in the queue"); 645 let waker = Waiter::wake(node, queue, Wakeup::One); 646 647 // if we took the final waiter currently in the queue, transition to the 648 // `Empty` state. 649 if queue.is_empty() { 650 self.store(curr.with_inner(StateInner::Empty)); 651 } 652 653 waker 654 } 655 656 /// Drain waiters from `queue` and add them to `batch`. Returns `true` if 657 /// the batch was filled while more waiters remain in the queue, indicating 658 /// that this function must be called again to wake all waiters. 659 fn drain_to_wake_batch( 660 batch: &mut WakeBatch, 661 queue: &mut List<Waiter>, 662 wakeup: Wakeup, 663 ) -> bool { 664 while let Some(node) = queue.pop_back() { 665 let Some(waker) = Waiter::wake(node, queue, wakeup.clone()) else { 666 // this waiter was enqueued by `Wait::register` and doesn't have 667 // a waker, just keep going. 668 continue; 669 }; 670 671 if batch.add_waker(waker) { 672 // there's still room in the wake set, just keep adding to it. 673 continue; 674 } 675 676 // wake set is full, drop the lock and wake everyone! 677 break; 678 } 679 680 !queue.is_empty() 681 } 682 683 fn load(&self) -> State { 684 State::from_bits(self.state.0.load(Ordering::SeqCst)) 685 } 686 687 fn store(&self, state: State) { 688 self.state.0.store(state.0, Ordering::SeqCst); 689 } 690 691 fn compare_exchange(&self, current: State, new: State) -> Result<State, State> { 692 self.state 693 .0 694 .compare_exchange(current.0, new.0, Ordering::SeqCst, Ordering::SeqCst) 695 .map(State::from_bits) 696 .map_err(State::from_bits) 697 } 698} 699 700// === impl State & StateInner === 701 702impl State { 703 const ONE_WAKE_ALL: usize = Self::WAKE_ALLS.first_bit(); 704 705 fn with_inner(self, inner: StateInner) -> Self { 706 self.with(Self::INNER, inner) 707 } 708} 709impl StateInner { 710 const fn into_usize(self) -> usize { 711 self as u8 as usize 712 } 713} 714 715impl FromBits<usize> for StateInner { 716 const BITS: u32 = 2; 717 type Error = core::convert::Infallible; 718 719 fn try_from_bits(bits: usize) -> Result<Self, Self::Error> { 720 #[expect( 721 clippy::cast_possible_truncation, 722 reason = "`bits` will only have 3 bits set" 723 )] 724 Ok(match bits as u8 { 725 bits if bits == Self::Empty as u8 => Self::Empty, 726 bits if bits == Self::Waiting as u8 => Self::Waiting, 727 bits if bits == Self::Woken as u8 => Self::Woken, 728 bits if bits == Self::Closed as u8 => Self::Closed, 729 _ => { 730 unreachable!(); 731 } 732 }) 733 } 734 735 fn into_bits(self) -> usize { 736 self as usize 737 } 738} 739 740// === impl Waiter === 741 742impl fmt::Debug for Waiter { 743 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 744 f.debug_struct("Waiter") 745 .field("state", &self.state) 746 .finish_non_exhaustive() 747 } 748} 749 750impl Waiter { 751 /// Returns the [`Waker`] for the task that owns this `Waiter`. 752 /// 753 /// # Safety 754 /// 755 /// This is only safe to call while the list is locked. The `list` 756 /// parameter ensures this method is only called while holding the lock, so 757 /// this can be safe. 758 /// 759 /// Of course, that must be the *same* list that this waiter is a member of, 760 /// and currently, there is no way to ensure that... 761 #[inline(always)] 762 fn wake(this: NonNull<Self>, list: &mut List<Self>, wakeup: Wakeup) -> Option<Waker> { 763 Waiter::with_inner(this, list, |node| { 764 let waker = mem::replace(&mut node.waker, wakeup); 765 match waker { 766 // the node has a registered waker, so wake the task. 767 Wakeup::Waiting(waker) => Some(waker), 768 // do nothing: the node was registered by `Wait::register` 769 // without a waker, so the future will already be woken when it is 770 // actually polled. 771 Wakeup::Empty => None, 772 // the node was already woken? this should not happen and 773 // probably indicates a race! 774 _ => unreachable!("tried to wake a waiter in the {:?} state!", waker), 775 } 776 }) 777 } 778 779 /// # Safety 780 /// 781 /// This is only safe to call while the list is locked. The dummy `_list` 782 /// parameter ensures this method is only called while holding the lock, so 783 /// this can be safe. 784 /// 785 /// Of course, that must be the *same* list that this waiter is a member of, 786 /// and currently, there is no way to ensure that... 787 #[inline(always)] 788 fn with_inner<T>( 789 mut this: NonNull<Self>, 790 _list: &mut List<Self>, 791 f: impl FnOnce(&mut WaiterInner) -> T, 792 ) -> T { 793 // safety: this is only called while holding the lock on the queue, 794 // so it's safe to mutate the waiter. 795 unsafe { f(&mut *this.as_mut().node.get()) } 796 } 797 798 fn poll_wait( 799 mut self: Pin<&mut Self>, 800 queue: &WaitQueue, 801 waker: Option<&Waker>, 802 ) -> Poll<Result<(), Closed>> { 803 // Safety: we never move out of `ptr` below, only mutate its fields 804 let ptr = unsafe { NonNull::from(Pin::into_inner_unchecked(self.as_mut())) }; 805 let mut this = self.as_mut().project(); 806 807 match this.state.get(WaitState::INNER) { 808 WaitStateInner::Start => { 809 let queue_state = queue.load(); 810 811 // can we consume a pending wakeup? 812 if queue 813 .compare_exchange( 814 queue_state.with_inner(StateInner::Woken), 815 queue_state.with_inner(StateInner::Empty), 816 ) 817 .is_ok() 818 { 819 this.state.set(WaitState::INNER, WaitStateInner::Woken); 820 return Poll::Ready(Ok(())); 821 } 822 823 // okay, no pending wakeups. try to wait... 824 825 let mut waiters = queue.queue.lock(); 826 let mut queue_state = queue.load(); 827 828 // the whole queue was woken while we were trying to acquire 829 // the lock! 830 if queue_state.get(State::WAKE_ALLS) != this.state.get(WaitState::WAKE_ALLS) { 831 this.state.set(WaitState::INNER, WaitStateInner::Woken); 832 return Poll::Ready(Ok(())); 833 } 834 835 // transition the queue to the waiting state 836 'to_waiting: loop { 837 match queue_state.get(State::INNER) { 838 // the queue is `Empty`, transition to `Waiting` 839 StateInner::Empty => { 840 match queue.compare_exchange( 841 queue_state, 842 queue_state.with_inner(StateInner::Waiting), 843 ) { 844 Ok(_) => break 'to_waiting, 845 Err(actual) => queue_state = actual, 846 } 847 } 848 // the queue is already `Waiting` 849 StateInner::Waiting => break 'to_waiting, 850 // the queue was woken, consume the wakeup. 851 StateInner::Woken => { 852 match queue.compare_exchange( 853 queue_state, 854 queue_state.with_inner(StateInner::Empty), 855 ) { 856 Ok(_) => { 857 this.state.set(WaitState::INNER, WaitStateInner::Woken); 858 return Poll::Ready(Ok(())); 859 } 860 Err(actual) => queue_state = actual, 861 } 862 } 863 StateInner::Closed => return Poll::Ready(Err(Closed(()))), 864 } 865 } 866 867 // enqueue the node 868 this.state.set(WaitState::INNER, WaitStateInner::Waiting); 869 if let Some(waker) = waker { 870 // safety: we may mutate the inner state because we are 871 // holding the lock. 872 unsafe { 873 let node = this.node.as_mut().get(); 874 debug_assert!(matches!((*node).waker, Wakeup::Empty)); 875 (*node).waker = Wakeup::Waiting(waker.clone()); 876 } 877 } 878 879 waiters.push_front(ptr); 880 881 Poll::Pending 882 } 883 WaitStateInner::Waiting => { 884 let _waiters = queue.queue.lock(); 885 // safety: we may mutate the inner state because we are 886 // holding the lock. 887 unsafe { 888 let node = &mut *this.node.get(); 889 match node.waker { 890 Wakeup::Waiting(ref mut curr_waker) => { 891 match waker { 892 Some(waker) if !curr_waker.will_wake(waker) => { 893 *curr_waker = waker.clone(); 894 } 895 _ => {} 896 } 897 Poll::Pending 898 } 899 Wakeup::All | Wakeup::One => { 900 this.state.set(WaitState::INNER, WaitStateInner::Woken); 901 Poll::Ready(Ok(())) 902 } 903 // Wakeup::Closed => { 904 // this.state.set(WaitState::INNER, WaitStateInner::Woken); 905 // Poll::Ready(Err(Closed(()))) 906 // } 907 Wakeup::Empty => { 908 if let Some(waker) = waker { 909 node.waker = Wakeup::Waiting(waker.clone()); 910 } 911 912 Poll::Pending 913 } 914 } 915 } 916 } 917 WaitStateInner::Woken => Poll::Ready(Ok(())), 918 } 919 } 920 921 /// Release this `Waiter` from the queue. 922 /// 923 /// This is called from the `drop` implementation for the [`Wait`] and 924 /// [`WaitOwned`] futures. 925 fn release(mut self: Pin<&mut Self>, queue: &WaitQueue) { 926 let state = *(self.as_mut().project().state); 927 // Safety: we never move out of `ptr` below, only mutate its fields 928 let ptr = NonNull::from(unsafe { Pin::into_inner_unchecked(self) }); 929 930 // if we're not enqueued, we don't have to do anything else. 931 if state.get(WaitState::INNER) != WaitStateInner::Waiting { 932 return; 933 } 934 935 let mut waiters = queue.queue.lock(); 936 let state = queue.load(); 937 // remove the node 938 // safety: we have the lock on the queue, so this is safe. 939 unsafe { 940 waiters.remove(ptr); 941 }; 942 943 // if we removed the last waiter from the queue, transition the state to 944 // `Empty`. 945 if waiters.is_empty() && state.get(State::INNER) == StateInner::Waiting { 946 queue.store(state.with_inner(StateInner::Empty)); 947 } 948 949 // if the node has an unconsumed wakeup, it must be assigned to the next 950 // node in the queue. 951 let next_waiter = 952 if Waiter::with_inner(ptr, &mut waiters, |node| matches!(&node.waker, Wakeup::One)) { 953 queue.wake_locked(&mut waiters, state) 954 } else { 955 None 956 }; 957 958 drop(waiters); 959 960 if let Some(next) = next_waiter { 961 next.wake(); 962 } 963 } 964} 965 966// Safety: TODO 967unsafe impl Linked<list::Links<Self>> for Waiter { 968 type Handle = NonNull<Waiter>; 969 970 fn into_ptr(r: Self::Handle) -> NonNull<Self> { 971 r 972 } 973 974 unsafe fn from_ptr(ptr: NonNull<Self>) -> Self::Handle { 975 ptr 976 } 977 978 unsafe fn links(target: NonNull<Self>) -> NonNull<list::Links<Waiter>> { 979 // Safety: ensured by caller 980 unsafe { 981 // Safety: using `ptr::addr_of!` avoids creating a temporary 982 // reference, which stacked borrows dislikes. 983 let node = &*ptr::addr_of!((*target.as_ptr()).node); 984 let links = ptr::addr_of_mut!((*node.get()).links); 985 // Safety: since the `target` pointer is `NonNull`, we can assume 986 // that pointers to its members are also not null, making this use 987 // of `new_unchecked` fine. 988 NonNull::new_unchecked(links) 989 } 990 } 991} 992 993// === impl Wait === 994 995impl Wait<'_> { 996 /// Returns `true` if this `WaitOwned` future is waiting for a 997 /// notification from the provided [`WaitQueue`]. 998 #[inline] 999 #[must_use] 1000 pub fn waits_on(&self, queue: &WaitQueue) -> bool { 1001 ptr::eq(self.queue, queue) 1002 } 1003 1004 /// Returns `true` if `self` and `other` are waiting on a notification 1005 /// from the same [`WaitQueue`]. 1006 #[inline] 1007 #[must_use] 1008 pub fn same_queue(&self, other: &Wait<'_>) -> bool { 1009 ptr::eq(self.queue, other.queue) 1010 } 1011 1012 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`]. 1013 /// 1014 /// Polling a `Wait` future adds that future to the list of waiters that may 1015 /// receive a wakeup from a `WaitQueue`. However, in some cases, it is 1016 /// desirable to subscribe to wakeups *prior* to actually waiting for one. 1017 /// This method should be used when it is necessary to ensure a `Wait` 1018 /// future is in the list of waiters before the future is `poll`ed for the 1019 /// first time. 1020 /// 1021 /// In general, this method is used in cases where a [`WaitQueue`] must 1022 /// synchronize with some additional state, such as an `AtomicBool` or 1023 /// counter. If a task first checks that state, and then chooses whether or 1024 /// not to wait on the `WaitQueue` based on that state, then a race 1025 /// condition may occur where the `WaitQueue` wakes waiters *between* when 1026 /// the task checked the external state and when it first polled its `Wait` 1027 /// future to wait on the queue. This method allows registering the `Wait` 1028 /// future with the queue *prior* to checking the external state, without 1029 /// actually sleeping, so that when the task does wait for the `Wait` future 1030 /// to complete, it will have received any wakeup that was sent between when 1031 /// the external state was checked and the `Wait` future was first polled. 1032 /// 1033 /// # Returns 1034 /// 1035 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready` a wakeup was 1036 /// already received. This method returns [`Poll::Ready`] in the following 1037 /// cases: 1038 /// 1039 /// 1. The [`WaitQueue::wake()`] method was called between the creation of the 1040 /// `Wait` and the call to this method. 1041 /// 2. This is the first call to `subscribe` or `poll` on this future, and the 1042 /// `WaitQueue` was holding a stored wakeup from a previous call to 1043 /// [`wake()`]. This method consumes the wakeup in that case. 1044 /// 3. The future has previously been `subscribe`d or polled, and it has since 1045 /// then been marked ready by either consuming a wakeup from the 1046 /// `WaitQueue`, or by a call to [`wake()`] or [`wake_all()`] that 1047 /// removed it from the list of futures ready to receive wakeups. 1048 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which case 1049 /// this method returns `Poll::Ready(Err(Closed))`. 1050 /// 1051 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of this 1052 /// `Wait` future will also immediately return [`Poll::Ready`]. 1053 /// 1054 /// If the [`Wait`] future subscribed to wakeups from the queue, and 1055 /// has not been woken, this method returns [`Poll::Pending`]. 1056 /// 1057 /// [`wake()`]: WaitQueue::wake 1058 /// [`wake_all()`]: WaitQueue::wake_all 1059 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> { 1060 let this = self.project(); 1061 this.waiter.poll_wait(this.queue, None) 1062 } 1063} 1064 1065impl Future for Wait<'_> { 1066 type Output = Result<(), Closed>; 1067 1068 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1069 let this = self.project(); 1070 this.waiter.poll_wait(this.queue, Some(cx.waker())) 1071 } 1072} 1073 1074#[pinned_drop] 1075impl PinnedDrop for Wait<'_> { 1076 fn drop(mut self: Pin<&mut Self>) { 1077 let this = self.project(); 1078 this.waiter.release(this.queue); 1079 } 1080} 1081 1082// === impl WaitOwned === 1083 1084impl WaitOwned { 1085 /// Returns `true` if this `WaitOwned` future is waiting for a 1086 /// notification from the provided [`WaitQueue`]. 1087 #[inline] 1088 #[must_use] 1089 pub fn waits_on(&self, queue: &WaitQueue) -> bool { 1090 ptr::eq(&*self.queue, queue) 1091 } 1092 1093 /// Returns `true` if `self` and `other` are waiting on a notification 1094 /// from the same [`WaitQueue`]. 1095 #[inline] 1096 #[must_use] 1097 pub fn same_queue(&self, other: &Wait<'_>) -> bool { 1098 ptr::eq(&*self.queue, other.queue) 1099 } 1100 1101 /// Eagerly subscribe this future to wakeups from [`WaitQueue::wake()`]. 1102 /// 1103 /// Polling a `WaitOwned` future adds that future to the list of waiters 1104 /// that may receive a wakeup from a `WaitQueue`. However, in some 1105 /// cases, it is desirable to subscribe to wakeups *prior* to actually 1106 /// waiting for one. This method should be used when it is necessary to 1107 /// ensure a `WaitOwned` future is in the list of waiters before the 1108 /// future is `poll`ed for the rst time. 1109 /// 1110 /// In general, this method is used in cases where a [`WaitQueue`] must 1111 /// synchronize with some additional state, such as an `AtomicBool` or 1112 /// counter. If a task first checks that state, and then chooses whether or 1113 /// not to wait on the `WaitQueue` based on that state, then a race 1114 /// condition may occur where the `WaitQueue` wakes waiters *between* when 1115 /// the task checked the external state and when it first polled its 1116 /// `WaitOwned` future to wait on the queue. This method allows 1117 /// registering the `WaitOwned` future with the queue *prior* to 1118 /// checking the external state, without actually sleeping, so that when 1119 /// the task does wait for the `WaitOwned` future to complete, it will 1120 /// have received any wakeup that was sent between when the external 1121 /// state was checked and the `WaitOwned` future was first polled. 1122 /// 1123 /// # Returns 1124 /// 1125 /// This method returns a [`Poll`]`<`[`Result<(), Closed>`]`>` which is `Ready` 1126 /// a wakeup was already received. This method returns [`Poll::Ready`] 1127 /// in the following cases: 1128 /// 1129 /// 1. The [`WaitQueue::wake()`] method was called between the creation 1130 /// of the `WaitOwned` future and the call to this method. 1131 /// 2. This is the first call to `subscribe` or `poll` on this future, 1132 /// and the `WaitQueue` was holding a stored wakeup from a previous 1133 /// call to [`wake()`]. This method consumes the wakeup in that case. 1134 /// 3. The future has previously been `subscribe`d or polled, and it 1135 /// has since then been marked ready by either consuming a wakeup 1136 /// from the `WaitQueue`, or by a call to [`wake()`] or 1137 /// [`wake_all()`] that removed it from the list of futures ready to 1138 /// receive wakeups. 1139 /// 4. The `WaitQueue` has been [`close`d](WaitQueue::close), in which 1140 /// case this method returns `Poll::Ready(Err(Closed))`. 1141 /// 1142 /// If this method returns [`Poll::Ready`], any subsequent `poll`s of 1143 /// this `Wait` future will also immediately return [`Poll::Ready`]. 1144 /// 1145 /// If the [`WaitOwned`] future subscribed to wakeups from the queue, 1146 /// and has not been woken, this method returns [`Poll::Pending`]. 1147 /// 1148 /// [`wake()`]: WaitQueue::wake 1149 /// [`wake_all()`]: WaitQueue::wake_all 1150 pub fn subscribe(self: Pin<&mut Self>) -> Poll<Result<(), Closed>> { 1151 let this = self.project(); 1152 this.waiter.poll_wait(this.queue, None) 1153 } 1154} 1155 1156impl Future for WaitOwned { 1157 type Output = Result<(), Closed>; 1158 1159 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 1160 let this = self.project(); 1161 this.waiter.poll_wait(&*this.queue, Some(cx.waker())) 1162 } 1163} 1164 1165#[pinned_drop] 1166impl PinnedDrop for WaitOwned { 1167 fn drop(mut self: Pin<&mut Self>) { 1168 let this = self.project(); 1169 this.waiter.release(&*this.queue); 1170 } 1171}