Next Generation WASM Microkernel Operating System
at trap_handler 520 lines 20 kB view raw
1// Copyright 2025 Jonas Kruckenberg 2// 3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5// http://opensource.org/licenses/MIT>, at your option. This file may not be 6// copied, modified, or distributed except according to those terms. 7 8use crate::error::Closed; 9use crate::loom::cell::UnsafeCell; 10use crate::loom::sync::atomic::{AtomicUsize, Ordering}; 11use bitflags::bitflags; 12use core::panic::{RefUnwindSafe, UnwindSafe}; 13use core::pin::Pin; 14use core::task::{Context, Poll, Waker}; 15use core::{fmt, task}; 16use static_assertions::const_assert_eq; 17use util::{CachePadded, loom_const_fn}; 18 19/// An atomically registered [`Waker`]. 20/// 21/// This cell stores the [`Waker`] of a single task. A [`Waker`] is stored in 22/// the cell either by calling [`poll_wait`], or by polling a [`wait`] 23/// future. Once a task's [`Waker`] is stored in a `WaitCell`, it can be woken 24/// by calling [`wake`] on the `WaitCell`. 25/// 26/// # Implementation Notes 27/// 28/// This type is copied from [`maitake-sync`](https://github.com/hawkw/mycelium/blob/dd0020892564c77ee4c20ffbc2f7f5b046ad54c8/maitake-sync/src/wait_cell.rs) 29/// which is in turn inspired by the [`AtomicWaker`] type used in Tokio's 30/// synchronization primitives, with the following modifications: 31/// 32/// - An additional bit of state is added to allow [setting a "close" 33/// bit](Self::close). 34/// - A `WaitCell` is always woken by value (for now). 35/// 36/// [`AtomicWaker`]: https://github.com/tokio-rs/tokio/blob/09b770c5db31a1f35631600e1d239679354da2dd/tokio/src/sync/task/atomic_waker.rs 37/// [`Waker`]: Waker 38/// [`poll_wait`]: Self::poll_wait 39/// [`wait`]: Self::wait 40/// [`wake`]: Self::wake 41pub struct WaitCell { 42 state: CachePadded<AtomicUsize>, 43 waker: UnsafeCell<Option<Waker>>, 44} 45 46bitflags! { 47 #[derive(Debug, PartialEq, Eq)] 48 struct State: usize { 49 const WAITING = 0b0000; 50 const REGISTERING = 0b0001; 51 const WAKING = 0b0010; 52 const WOKEN = 0b0100; 53 const CLOSED = 0b1000; 54 } 55} 56// WAITING MUST be zero 57const_assert_eq!(State::WAITING.bits(), 0); 58 59/// Future returned from [`WaitCell::wait()`]. 60/// 61/// This future is fused, so once it has completed, any future calls to poll 62/// will immediately return [`Poll::Ready`]. 63#[derive(Debug)] 64#[must_use = "futures do nothing unless `.await`ed or `poll`ed"] 65pub struct Wait<'a> { 66 /// The [`WaitCell`] being waited on. 67 cell: &'a WaitCell, 68 69 presubscribe: Poll<Result<(), Closed>>, 70} 71 72/// Future returned from [`WaitCell::subscribe()`]. 73/// 74/// See the documentation for [`WaitCell::subscribe()`] for details. 75#[derive(Debug)] 76#[must_use = "futures do nothing unless `.await`ed or `poll`ed"] 77pub struct Subscribe<'a> { 78 /// The [`WaitCell`] being waited on. 79 cell: &'a WaitCell, 80} 81 82/// An error indicating that a [`WaitCell`] was closed or busy while 83/// attempting register a [`Waker`]. 84/// 85/// This error is returned by the [`WaitCell::poll_wait`] method. 86#[derive(Copy, Clone, Debug, Eq, PartialEq)] 87pub enum PollWaitError { 88 /// The [`Waker`] was not registered because the [`WaitCell`] has been 89 /// [closed](WaitCell::close). 90 Closed, 91 92 /// The [`Waker`] was not registered because another task was concurrently 93 /// storing its own [`Waker`] in the [`WaitCell`]. 94 Busy, 95} 96 97// === impl WaitCell === 98 99impl WaitCell { 100 loom_const_fn! { 101 pub const fn new() -> Self { 102 Self { 103 state: CachePadded(AtomicUsize::new(State::WAITING.bits())), 104 waker: UnsafeCell::new(None), 105 } 106 } 107 } 108 109 #[tracing::instrument] 110 pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll<Result<(), PollWaitError>> { 111 // this is based on tokio's AtomicWaker synchronization strategy 112 match self.compare_exchange(State::WAITING, State::REGISTERING, Ordering::Acquire) { 113 Err(actual) if actual.contains(State::CLOSED) => { 114 return Poll::Ready(Err(PollWaitError::Closed)); 115 } 116 Err(actual) if actual.contains(State::WOKEN) => { 117 // take the wakeup 118 self.fetch_and(!State::WOKEN, Ordering::Release); 119 return Poll::Ready(Ok(())); 120 } 121 // someone else is notifying, so don't wait! 122 Err(actual) if actual.contains(State::WAKING) => { 123 return Poll::Ready(Ok(())); 124 } 125 Err(_) => return Poll::Ready(Err(PollWaitError::Busy)), 126 Ok(_) => {} 127 } 128 129 let waker = cx.waker(); 130 tracing::trace!( 131 /*wait_cell = ?fmt::ptr(self),*/ ?waker, 132 "registering waker" 133 ); 134 135 if let Some(prev_waker) = self.replace_waker(waker.clone()) { 136 tracing::debug!("Replaced an old waker in cell, waking"); 137 prev_waker.wake(); 138 } 139 140 if let Err(actual) = 141 self.compare_exchange(State::REGISTERING, State::WAITING, Ordering::AcqRel) 142 { 143 // If the `compare_exchange` fails above, this means that we were notified for one of 144 // two reasons: either the cell was awoken, or the cell was closed. 145 // 146 // Bail out of the parking state, and determine what to report to the caller. 147 tracing::trace!(state = ?actual, "was notified"); 148 149 // Safety: No one else is touching the waker right now, so it is safe to access it 150 // mutably 151 let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() }); 152 153 // Reset to the WAITING state by clearing everything *except* 154 // the closed bits (which must remain set). This `fetch_and` 155 // does *not* set the CLOSED bit if it is unset, it just doesn't 156 // clear it. 157 let state = self.fetch_and(State::CLOSED, Ordering::AcqRel); 158 // The only valid state transition while we were parking is to 159 // add the CLOSED bit. 160 debug_assert!( 161 state == actual || state == actual | State::CLOSED, 162 "state changed unexpectedly while parking!" 163 ); 164 165 if let Some(waker) = waker { 166 waker.wake(); 167 } 168 169 // Was the `CLOSED` bit set while we were clearing other bits? 170 // If so, the cell is closed. Otherwise, we must have been notified. 171 if state.contains(State::CLOSED) { 172 return Poll::Ready(Err(PollWaitError::Closed)); 173 } 174 175 return Poll::Ready(Ok(())); 176 } 177 178 // Waker registered, time to yield! 179 Poll::Pending 180 } 181 182 /// Wait to be woken up by this cell. 183 /// 184 /// # Returns 185 /// 186 /// This future completes with the following values: 187 /// 188 /// - [`Ok`]`(())` if the future was woken by a call to [`wake`] or another 189 /// task calling [`poll_wait`] or [`wait`] on this [`WaitCell`]. 190 /// - [`Err`]`(`[`Closed`]`)` if the task was woken by a call to [`close`], 191 /// or the [`WaitCell`] was already closed. 192 /// 193 /// **Note**: The calling task's [`Waker`] is not registered until AFTER the 194 /// first time the returned [`Wait`] future is polled. This means that if a 195 /// call to [`wake`] occurs between when [`wait`] is called and when the 196 /// future is first polled, the future will *not* complete. If the caller is 197 /// responsible for performing an operation which will result in an eventual 198 /// wakeup, prefer calling [`subscribe`] _before_ performing that operation 199 /// and `.await`ing the [`Wait`] future returned by [`subscribe`]. 200 /// 201 /// [`wake`]: Self::wake 202 /// [`poll_wait`]: Self::poll_wait 203 /// [`wait`]: Self::wait 204 /// [`close`]: Self::close 205 /// [`subscribe`]: Self::subscribe 206 pub fn wait(&self) -> Wait<'_> { 207 Wait { 208 cell: self, 209 presubscribe: Poll::Pending, 210 } 211 } 212 213 /// Eagerly subscribe to notifications from this `WaitCell`. 214 /// 215 /// This method returns a [`Subscribe`] [`Future`], which outputs a [`Wait`] 216 /// [`Future`]. Awaiting the [`Subscribe`] future will eagerly register the 217 /// calling task to be woken by this [`WaitCell`], so that the returned 218 /// [`Wait`] future will be woken by any calls to [`wake`] (or [`close`]) 219 /// that occur between when the [`Subscribe`] future completes and when the 220 /// returned [`Wait`] future is `.await`ed. 221 /// 222 /// This is primarily intended for scenarios where the task that waits on a 223 /// [`WaitCell`] is responsible for performing some operation that 224 /// ultimately results in the [`WaitCell`] being woken. If the task were to 225 /// simply perform the operation and then call [`wait`] on the [`WaitCell`], 226 /// a potential race condition could occur where the operation completes and 227 /// wakes the [`WaitCell`] *before* the [`Wait`] future is first `.await`ed. 228 /// Using `subscribe`, the task can ensure that it is ready to be woken by 229 /// the cell *before* performing an operation that could result in it being 230 /// woken. 231 /// 232 /// These scenarios occur when a wakeup is triggered by another thread/CPU 233 /// core in response to an operation performed in the task waiting on the 234 /// `WaitCell`, or when the wakeup is triggered by a hardware interrupt 235 /// resulting from operations performed in the task. 236 /// 237 /// [`wait`]: Self::wait 238 /// [`wake`]: Self::wake 239 /// [`close`]: Self::close 240 pub fn subscribe(&self) -> Subscribe<'_> { 241 Subscribe { cell: self } 242 } 243 244 /// Wake the [`Waker`] stored in this cell. 245 /// 246 /// # Returns 247 /// 248 /// - `true` if a waiting task was woken. 249 /// - `false` if no task was woken (no [`Waker`] was stored in the cell) 250 #[tracing::instrument] 251 pub fn wake(&self) -> bool { 252 if let Some(waker) = self.take_waker(false) { 253 waker.wake(); 254 true 255 } else { 256 false 257 } 258 } 259 260 /// Close the [`WaitCell`]. 261 /// 262 /// This wakes any waiting task with an error indicating the `WaitCell` is 263 /// closed. Subsequent calls to [`wait`] or [`poll_wait`] will return an 264 /// error indicating that the cell has been closed. 265 /// 266 /// [`wait`]: Self::wait 267 /// [`poll_wait`]: Self::poll_wait 268 #[tracing::instrument] 269 pub fn close(&self) -> bool { 270 if let Some(waker) = self.take_waker(true) { 271 waker.wake(); 272 true 273 } else { 274 false 275 } 276 } 277 278 /// Returns `true` if this `WaitCell` is [closed](Self::close). 279 #[must_use] 280 pub fn is_closed(&self) -> bool { 281 self.current_state() == State::CLOSED 282 } 283 284 /// Asynchronously poll the given function `f` until a condition occurs, 285 /// using the [`WaitCell`] to only re-poll when notified. 286 /// 287 /// This can be used to implement a "wait loop", turning a "try" function 288 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. 289 /// "recv" or "send"). 290 /// 291 /// In particular, this function correctly *registers* interest in the [`WaitCell`] 292 /// prior to polling the function, ensuring that there is not a chance of a race 293 /// where the condition occurs AFTER checking but BEFORE registering interest 294 /// in the [`WaitCell`], which could lead to deadlock. 295 /// 296 /// This is intended to have similar behavior to `Condvar` in the standard library, 297 /// but asynchronous, and not requiring operating system intervention (or existence). 298 /// 299 /// In particular, this can be used in cases where interrupts or events are used 300 /// to signify readiness or completion of some task, such as the completion of a 301 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt 302 /// can wake the cell, allowing the polling function to check status fields for 303 /// partial progress or completion. 304 /// 305 /// Consider using [`Self::wait_for_value()`] if your function does return a value. 306 /// 307 // Consider using [`WaitQueue::wait_for()`](super::wait_queue::WaitQueue::wait_for) 308 // if you need multiple waiters. 309 /// 310 /// # Errors 311 /// 312 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed. 313 pub async fn wait_for<F: FnMut() -> bool>(&self, mut f: F) -> Result<(), Closed> { 314 loop { 315 let wait = self.subscribe().await; 316 if f() { 317 return Ok(()); 318 } 319 wait.await?; 320 } 321 } 322 323 /// Asynchronously poll the given function `f` until a condition occurs, 324 /// using the [`WaitCell`] to only re-poll when notified. 325 /// 326 /// This can be used to implement a "wait loop", turning a "try" function 327 /// (e.g. "try_recv" or "try_send") into an asynchronous function (e.g. 328 /// "recv" or "send"). 329 /// 330 /// In particular, this function correctly *registers* interest in the [`WaitCell`] 331 /// prior to polling the function, ensuring that there is not a chance of a race 332 /// where the condition occurs AFTER checking but BEFORE registering interest 333 /// in the [`WaitCell`], which could lead to deadlock. 334 /// 335 /// This is intended to have similar behavior to `Condvar` in the standard library, 336 /// but asynchronous, and not requiring operating system intervention (or existence). 337 /// 338 /// In particular, this can be used in cases where interrupts or events are used 339 /// to signify readiness or completion of some task, such as the completion of a 340 /// DMA transfer, or reception of an ethernet frame. In cases like this, the interrupt 341 /// can wake the cell, allowing the polling function to check status fields for 342 /// partial progress or completion, and also return the status flags at the same time. 343 /// 344 /// Consider using [`Self::wait_for()`] if your function does not return a value. 345 /// 346 // Consider using [`WaitQueue::wait_for_value()`](super::wait_queue::WaitQueue::wait_for_value) if you need multiple waiters. 347 /// 348 /// # Errors 349 /// 350 /// Returns [`Err`]`(`[`Closed`]`)` if the [`WaitCell`] is closed. 351 pub async fn wait_for_value<T, F: FnMut() -> Option<T>>(&self, mut f: F) -> Result<T, Closed> { 352 loop { 353 let wait = self.subscribe().await; 354 if let Some(t) = f() { 355 return Ok(t); 356 } 357 wait.await?; 358 } 359 } 360 361 #[tracing::instrument] 362 fn take_waker(&self, close: bool) -> Option<Waker> { 363 // Set the WAKING bit (to indicate that we're touching the waker) and 364 // the WOKEN bit (to indicate that we intend to wake it up). 365 let state = { 366 let mut bits = State::WAKING | State::WOKEN; 367 if close { 368 bits.0 |= State::CLOSED.0; 369 } 370 self.fetch_or(bits, Ordering::AcqRel) 371 }; 372 373 // Is anyone else touching the waker? 374 if !state.contains(State::WAKING | State::REGISTERING | State::CLOSED) { 375 // Safety: No one else is touching the waker right now, so it is safe to access it 376 // mutably 377 let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() }); 378 379 // Release the lock. 380 self.fetch_and(!State::WAKING, Ordering::Release); 381 382 if let Some(waker) = waker { 383 tracing::trace!(wait_cell = ?self, ?close, ?waker, "took_waker"); 384 return Some(waker); 385 } 386 } 387 388 None 389 } 390 391 #[tracing::instrument] 392 fn replace_waker(&self, waker: Waker) -> Option<Waker> { 393 // Set the WAKING bit (to indicate that we're touching the waker) and 394 // the WOKEN bit (to indicate that we intend to wake it up). 395 let state = self.fetch_or(State::WAKING, Ordering::AcqRel); 396 397 // Is anyone else touching the waker? 398 if !state.contains(State::WAKING | State::REGISTERING | State::CLOSED) { 399 // Safety: No one else is touching the waker right now, so it is safe to access it 400 // mutably 401 let prev_waker = self.waker.with_mut(|old_waker| unsafe { 402 match &mut *old_waker { 403 Some(old_waker) if waker.will_wake(old_waker) => None, 404 old => old.replace(waker.clone()), 405 } 406 }); 407 408 // Release the lock. 409 self.fetch_and(!State::WAKING, Ordering::Release); 410 411 tracing::trace!(wait_cell = ?self, ?prev_waker, ?waker, "replaced_waker"); 412 return prev_waker; 413 } 414 415 None 416 } 417 418 #[inline(always)] 419 fn compare_exchange(&self, curr: State, new: State, success: Ordering) -> Result<State, State> { 420 self.state 421 .0 422 .compare_exchange(curr.bits(), new.bits(), success, Ordering::Acquire) 423 .map(State::from_bits_retain) 424 .map_err(State::from_bits_retain) 425 } 426 427 #[inline(always)] 428 fn fetch_and(&self, state: State, order: Ordering) -> State { 429 State::from_bits_retain(self.state.0.fetch_and(state.bits(), order)) 430 } 431 432 #[inline(always)] 433 fn fetch_or(&self, state: State, order: Ordering) -> State { 434 State::from_bits_retain(self.state.0.fetch_or(state.bits(), order)) 435 } 436 437 #[inline(always)] 438 fn current_state(&self) -> State { 439 State::from_bits_retain(self.state.0.load(Ordering::Acquire)) 440 } 441} 442 443impl Default for WaitCell { 444 fn default() -> Self { 445 WaitCell::new() 446 } 447} 448 449impl RefUnwindSafe for WaitCell {} 450impl UnwindSafe for WaitCell {} 451 452// Safety: `WaitCell` synchronizes all accesses through atomic operations 453unsafe impl Send for WaitCell {} 454// Safety: `WaitCell` synchronizes all accesses through atomic operations 455unsafe impl Sync for WaitCell {} 456 457impl fmt::Debug for WaitCell { 458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 459 f.debug_struct("WaitCell") 460 .field("state", &self.current_state()) 461 .finish_non_exhaustive() 462 } 463} 464 465impl Drop for WaitCell { 466 fn drop(&mut self) { 467 self.close(); 468 } 469} 470 471// === impl Wait === 472 473impl Future for Wait<'_> { 474 type Output = Result<(), Closed>; 475 476 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 477 // Did a wakeup occur while we were pre-registering the future? 478 if self.presubscribe.is_ready() { 479 return self.presubscribe; 480 } 481 482 // Okay, actually poll the cell, then. 483 match task::ready!(self.cell.poll_wait(cx)) { 484 Ok(()) => Poll::Ready(Ok(())), 485 Err(PollWaitError::Closed) => Poll::Ready(Err(Closed(()))), 486 Err(PollWaitError::Busy) => { 487 // If some other task was registering, yield and try to re-register 488 // our waker when that task is done. 489 cx.waker().wake_by_ref(); 490 Poll::Pending 491 } 492 } 493 } 494} 495 496// === impl Subscribe === 497 498impl<'cell> Future for Subscribe<'cell> { 499 type Output = Wait<'cell>; 500 501 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 502 // Pre-register the waker in the cell. 503 let presubscribe = match self.cell.poll_wait(cx) { 504 Poll::Ready(Err(PollWaitError::Busy)) => { 505 // Someone else is in the process of registering. Yield now so we 506 // can wait until that task is done, and then try again. 507 cx.waker().wake_by_ref(); 508 return Poll::Pending; 509 } 510 Poll::Ready(Err(PollWaitError::Closed)) => Poll::Ready(Err(Closed(()))), 511 Poll::Ready(Ok(())) => Poll::Ready(Ok(())), 512 Poll::Pending => Poll::Pending, 513 }; 514 515 Poll::Ready(Wait { 516 cell: self.cell, 517 presubscribe, 518 }) 519 } 520}