Next Generation WASM Microkernel Operating System
at trap_handler 626 lines 20 kB view raw
1// Copyright 2025. Jonas Kruckenberg 2// 3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5// http://opensource.org/licenses/MIT>, at your option. This file may not be 6// copied, modified, or distributed except according to those terms. 7 8mod steal; 9 10use crate::error::Closed; 11use crate::error::SpawnError; 12use crate::executor::steal::{Injector, Stealer, TryStealError}; 13use crate::future::Either; 14use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; 15use crate::sync::wait_queue::WaitQueue; 16use crate::task::{Header, JoinHandle, PollResult, Task, TaskBuilder, TaskRef}; 17use alloc::boxed::Box; 18use cordyceps::mpsc_queue::{MpscQueue, TryDequeueError}; 19use core::alloc::Allocator; 20use core::num::NonZeroUsize; 21use core::ptr; 22use core::ptr::NonNull; 23use core::sync::atomic::Ordering; 24use cpu_local::collection::CpuLocal; 25use fastrand::FastRand; 26use futures::pin_mut; 27use spin::Backoff; 28 29#[derive(Debug)] 30pub struct Executor { 31 schedulers: CpuLocal<Scheduler>, 32 injector: Injector, 33 sleepers: WaitQueue, 34} 35 36#[derive(Debug)] 37pub struct Worker { 38 id: usize, 39 executor: &'static Executor, 40 scheduler: &'static Scheduler, 41 rng: FastRand, 42} 43 44/// Information about the scheduler state produced after ticking. 45#[derive(Debug)] 46#[non_exhaustive] 47pub struct Tick { 48 /// `true` if the tick completed with any tasks remaining in the run queue. 49 pub has_remaining: bool, 50 51 /// The total number of tasks polled on this scheduler tick. 52 pub polled: usize, 53 54 /// The number of polled tasks that *completed* on this scheduler tick. 55 /// 56 /// This should always be <= `self.polled`. 57 #[cfg(feature = "counters")] 58 pub completed: usize, 59 60 /// The number of tasks that were spawned since the last tick. 61 #[cfg(feature = "counters")] 62 pub spawned: usize, 63 64 /// The number of tasks that were woken from outside their own `poll` calls since the last tick. 65 #[cfg(feature = "counters")] 66 pub woken_external: usize, 67 68 /// The number of tasks that were woken from within their own `poll` calls during this tick. 69 #[cfg(feature = "counters")] 70 pub woken_internal: usize, 71} 72 73#[derive(Debug)] 74pub struct Scheduler { 75 run_queue: MpscQueue<Header>, 76 current_task: AtomicPtr<Header>, 77 queued: AtomicUsize, 78 #[cfg(feature = "counters")] 79 spawned: AtomicUsize, 80 #[cfg(feature = "counters")] 81 woken: AtomicUsize, 82} 83 84// === impl Executor === 85 86impl Default for Executor { 87 fn default() -> Self { 88 Self::new() 89 } 90} 91 92impl Executor { 93 pub fn new() -> Self { 94 Self { 95 schedulers: CpuLocal::new(), 96 injector: Injector::new(), 97 sleepers: WaitQueue::new(), 98 } 99 } 100 101 pub fn with_capacity(capacity: usize) -> Self { 102 Self { 103 schedulers: CpuLocal::with_capacity(capacity), 104 injector: Injector::new(), 105 sleepers: WaitQueue::new(), 106 } 107 } 108 109 /// Closes the executor. 110 /// 111 /// After calling close all ongoing and future calls to [`Worker::run`] will return `Err(Closed)`. 112 pub fn close(&self) { 113 self.sleepers.close(); 114 } 115 116 /// Returns `true` if the executor has been closed. 117 /// 118 /// The executor is closed by calling [`close`][Self::close]. 119 /// 120 /// If true is returned, a call to send will always result in an error. 121 pub fn is_closed(&self) -> bool { 122 self.sleepers.is_closed() 123 } 124 125 pub fn wake_one(&self) { 126 self.sleepers.wake(); 127 } 128 129 pub fn current_scheduler(&self) -> Option<&Scheduler> { 130 self.schedulers.get() 131 } 132 133 pub fn build_task<'a>( 134 &'static self, 135 ) -> TaskBuilder<'a, impl Fn(TaskRef) -> Result<(), Closed>> { 136 TaskBuilder::new(|task| { 137 if self.is_closed() { 138 return Err(Closed(())); 139 } 140 141 if let Some(scheduler) = self.schedulers.get() { 142 // we need to bind the scheduler here 143 task.bind_scheduler(scheduler); 144 145 scheduler.schedule(task); 146 147 Ok(()) 148 } else { 149 self.injector.push_task(task); 150 151 Ok(()) 152 } 153 }) 154 } 155 156 /// Attempt spawn this [`Future`] onto this executor. 157 /// 158 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`] 159 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects 160 /// of its runtime behaviour (such as cancelling it). 161 /// 162 /// # Errors 163 /// 164 /// Returns [`AllocError`] when allocation of the task fails. 165 pub fn try_spawn<F>(&'static self, future: F) -> Result<JoinHandle<F::Output>, SpawnError> 166 where 167 F: Future + Send + 'static, 168 F::Output: Send + 'static, 169 { 170 self.build_task().try_spawn(future) 171 } 172 173 /// Attempt spawn this [`Future`] onto this executor using a custom [`Allocator`]. 174 /// 175 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`] 176 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects 177 /// of its runtime behaviour (such as cancelling it). 178 /// 179 /// # Errors 180 /// 181 /// Returns [`AllocError`] when allocation of the task fails. 182 pub fn try_spawn_in<F, A>( 183 &'static self, 184 future: F, 185 alloc: A, 186 ) -> Result<JoinHandle<F::Output>, SpawnError> 187 where 188 F: Future + Send + 'static, 189 F::Output: Send + 'static, 190 A: Allocator, 191 { 192 self.build_task().try_spawn_in(future, alloc) 193 } 194} 195 196// === impl Worker === 197 198impl Worker { 199 pub fn new(executor: &'static Executor, rng: FastRand) -> Self { 200 let id = executor.schedulers.len(); 201 let core = executor.schedulers.get_or(Scheduler::new); 202 203 Self { 204 id, 205 executor, 206 scheduler: core, 207 rng, 208 } 209 } 210 211 /// Returns a reference to the task that's current being polled or `None`. 212 #[must_use] 213 pub fn current_task(&self) -> Option<TaskRef> { 214 self.scheduler.current_task() 215 } 216 217 /// Run the worker main loop until the given future completes. 218 /// 219 /// # Errors 220 /// 221 /// Returns `Err(Closed)` if the executor has been closed before the future completes. 222 pub async fn run<F>(&mut self, future: F) -> Result<F::Output, Closed> 223 where 224 F: Future + Send, 225 F::Output: Send, 226 { 227 if self.executor.is_closed() { 228 return Err(Closed(())); 229 } 230 231 let main_loop = self.main_loop(); 232 pin_mut!(future); 233 pin_mut!(main_loop); 234 235 let res = crate::future::select(future, main_loop).await; 236 match res { 237 Either::Left((val, _)) => Ok(val), 238 // The `main_loop` future either never returns or always returns Err(Closed) 239 Either::Right((Err(err), _)) => Err(err), 240 } 241 } 242 243 async fn main_loop(&mut self) -> Result<!, Closed> { 244 loop { 245 if self.tick().has_remaining { 246 continue; 247 } 248 249 tracing::trace!("worker {} going to sleep...", self.id); 250 self.executor.sleepers.wait().await?; 251 tracing::trace!("worker woke up"); 252 } 253 } 254 255 pub fn tick(&mut self) -> Tick { 256 let mut tick = self.scheduler.tick_n(256); 257 tracing::trace!(worker = self.id, ?tick, "worker tick"); 258 259 if tick.has_remaining { 260 return tick; 261 } 262 263 // if there are no tasks remaining in this core's run queue, try to 264 // steal new tasks from the distributor queue. 265 if let Some(stolen) = self.try_steal() { 266 tracing::trace!(tick.stolen = stolen); 267 268 // if we stole tasks, we need to keep ticking 269 tick.has_remaining = true; 270 return tick; 271 } 272 273 // if we have no remaining woken tasks, and we didn't steal any new 274 // tasks, this core can sleep until an interrupt occurs. 275 tick 276 } 277 278 fn try_steal(&mut self) -> Option<NonZeroUsize> { 279 const ROUNDS: usize = 4; 280 const MAX_STOLEN_PER_TICK: usize = 256; 281 282 // attempt to steal from the global injector queue first 283 if let Ok(stealer) = self.executor.injector.try_steal() { 284 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK); 285 tracing::trace!("stole {stolen} tasks from injector (in first attempt)"); 286 return NonZeroUsize::new(stolen); 287 } 288 289 // If that fails, attempt to steal from other workers 290 let num_workers = self.executor.schedulers.len(); 291 292 // if there is only one worker, there is no one to steal from anyway 293 if num_workers <= 1 { 294 return None; 295 } 296 297 let mut backoff = Backoff::new(); 298 299 for _ in 0..ROUNDS { 300 // Start from a random worker 301 let start = self.rng.fastrand_n(u32::try_from(num_workers).unwrap()) as usize; 302 303 if let Some(stolen) = self.try_steal_one_round(num_workers, start) { 304 return Some(stolen); 305 } 306 307 backoff.spin(); 308 } 309 310 // as a last resort try to steal from the global injector queue again 311 if let Ok(stealer) = self.executor.injector.try_steal() { 312 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK); 313 tracing::trace!("stole {stolen} tasks from injector (in second attempt)"); 314 return NonZeroUsize::new(stolen); 315 } 316 317 None 318 } 319 320 fn try_steal_one_round(&mut self, num_workers: usize, start: usize) -> Option<NonZeroUsize> { 321 for i in 0..num_workers { 322 let i = (start + i) % num_workers; 323 324 // Don't steal from ourselves! We know we don't have work. 325 if i == self.id { 326 continue; 327 } 328 329 let Some(victim) = self.executor.schedulers.iter().nth(i) else { 330 // The worker might not be online yet, just advance past 331 continue; 332 }; 333 334 let Ok(stealer) = victim.try_steal() else { 335 // the victim either doesn't have any tasks, or is already being stolen from 336 // either way, just advance past 337 continue; 338 }; 339 340 let stolen = stealer.spawn_half(self.scheduler); 341 tracing::trace!("stole {stolen} tasks from worker {i} {victim:?}"); 342 return NonZeroUsize::new(stolen); 343 } 344 345 None 346 } 347} 348 349// === impl Scheduler === 350 351impl Scheduler { 352 fn new() -> Self { 353 let stub_task = Box::new(Task::new_stub()); 354 let (stub_task, _) = TaskRef::new_allocated(stub_task); 355 356 Self { 357 run_queue: MpscQueue::new_with_stub(stub_task), 358 queued: AtomicUsize::new(0), 359 current_task: AtomicPtr::new(ptr::null_mut()), 360 #[cfg(feature = "counters")] 361 spawned: AtomicUsize::new(0), 362 #[cfg(feature = "counters")] 363 woken: AtomicUsize::new(0), 364 } 365 } 366 367 /// Returns a reference to the task that's current being polled or `None`. 368 #[must_use] 369 pub fn current_task(&self) -> Option<TaskRef> { 370 let ptr = NonNull::new(self.current_task.load(Ordering::Acquire))?; 371 Some(TaskRef::clone_from_raw(ptr)) 372 } 373 374 pub fn schedule(&self, task: TaskRef) { 375 self.queued.fetch_add(1, Ordering::AcqRel); 376 self.run_queue.enqueue(task); 377 } 378 379 fn tick_n(&'static self, n: usize) -> Tick { 380 tracing::trace!("tick_n({self:p}, {n})"); 381 382 let mut tick = Tick { 383 has_remaining: false, 384 polled: 0, 385 #[cfg(feature = "counters")] 386 completed: 0, 387 #[cfg(feature = "counters")] 388 spawned: 0, 389 #[cfg(feature = "counters")] 390 woken_external: 0, 391 #[cfg(feature = "counters")] 392 woken_internal: 0, 393 }; 394 395 while tick.polled < n { 396 let task = match self.run_queue.try_dequeue() { 397 Ok(task) => task, 398 // If inconsistent, just try again. 399 Err(TryDequeueError::Inconsistent) => { 400 tracing::trace!("scheduler queue {:?} inconsistent", self.run_queue); 401 core::hint::spin_loop(); 402 continue; 403 } 404 // Queue is empty or busy (in use by something else), bail out. 405 Err(TryDequeueError::Busy | TryDequeueError::Empty) => { 406 tracing::trace!("scheduler queue {:?} busy or empty", self.run_queue); 407 break; 408 } 409 }; 410 411 self.queued.fetch_sub(1, Ordering::SeqCst); 412 413 let _span = tracing::trace_span!( 414 "poll", 415 task.addr = ?task.header_ptr(), 416 task.tid = task.id().as_u64(), 417 ) 418 .entered(); 419 420 // store the currently polled task in the `current_task` pointer. 421 // using `TaskRef::as_ptr` is safe here, since we will clear the 422 // `current_task` pointer before dropping the `TaskRef`. 423 self.current_task 424 .store(task.header_ptr().as_ptr(), Ordering::Release); 425 426 // poll the task 427 let poll_result = task.poll(); 428 429 // clear the current task cell before potentially dropping the 430 // `TaskRef`. 431 self.current_task.store(ptr::null_mut(), Ordering::Release); 432 433 tick.polled += 1; 434 match poll_result { 435 PollResult::Ready | PollResult::ReadyJoined => { 436 #[cfg(feature = "counters")] 437 { 438 tick.completed += 1; 439 } 440 } 441 PollResult::PendingSchedule => { 442 self.schedule(task); 443 #[cfg(feature = "counters")] 444 { 445 tick.woken_internal += 1; 446 } 447 } 448 PollResult::Pending => {} 449 } 450 451 #[cfg(not(feature = "counters"))] 452 tracing::trace!(poll = ?poll_result, tick.polled); 453 #[cfg(feature = "counters")] 454 tracing::trace!(poll = ?poll_result, tick.polled, tick.completed); 455 } 456 457 #[cfg(feature = "counters")] 458 { 459 tick.spawned = self.spawned.swap(0, Ordering::Relaxed); 460 tick.woken_external = self.woken.swap(0, Ordering::Relaxed); 461 } 462 463 // are there still tasks in the queue? if so, we have more tasks to poll. 464 if self.queued.load(Ordering::SeqCst) > 0 { 465 tick.has_remaining = true; 466 } 467 468 if tick.polled > 0 { 469 // log scheduler metrics. 470 #[cfg(not(feature = "counters"))] 471 tracing::trace!(tick.polled, tick.has_remaining); 472 #[cfg(feature = "counters")] 473 tracing::trace!( 474 tick.polled, 475 tick.has_remaining, 476 tick.completed, 477 tick.spawned, 478 tick.woken = tick.woken_external + tick.woken_internal, 479 tick.woken.external = tick.woken_external, 480 tick.woken.internal = tick.woken_internal 481 ); 482 } 483 484 tick 485 } 486 487 fn try_steal(&self) -> Result<Stealer, TryStealError> { 488 Stealer::new(&self.run_queue, &self.queued) 489 } 490} 491 492#[cfg(test)] 493mod tests { 494 use super::*; 495 use crate::{loom, test_util}; 496 use core::hint::black_box; 497 use core::sync::atomic::AtomicBool; 498 use tracing_subscriber::EnvFilter; 499 use tracing_subscriber::util::SubscriberInitExt; 500 501 async fn work() -> usize { 502 let val = 1 + 1; 503 crate::task::yield_now().await; 504 black_box(val) 505 } 506 507 #[test] 508 fn single_threaded() { 509 let _trace = tracing_subscriber::fmt() 510 .with_env_filter(EnvFilter::from_default_env()) 511 .set_default(); 512 513 loom::model(|| { 514 loom::lazy_static! { 515 static ref EXEC: Executor = Executor::new(); 516 static ref CALLED: AtomicBool = AtomicBool::new(false); 517 } 518 519 EXEC.try_spawn(async move { 520 work().await; 521 CALLED.store(true, Ordering::SeqCst); 522 EXEC.close(); 523 }) 524 .unwrap(); 525 526 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)); 527 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err( 528 "stopping the executor should always result in a Closed(()) error here", 529 ); 530 assert!(CALLED.load(Ordering::SeqCst)); 531 }) 532 } 533 534 #[test] 535 fn multi_threaded() { 536 let _trace = tracing_subscriber::fmt() 537 .with_env_filter(EnvFilter::from_default_env()) 538 .set_default(); 539 540 loom::model(|| { 541 const NUM_THREADS: usize = 3; 542 543 loom::lazy_static! { 544 static ref EXEC: Executor = Executor::new(); 545 static ref CALLED: AtomicBool = AtomicBool::new(false); 546 } 547 548 EXEC.try_spawn(async move { 549 work().await; 550 CALLED.store(true, Ordering::SeqCst); 551 EXEC.close(); 552 }) 553 .unwrap(); 554 555 let joins: Vec<_> = (0..NUM_THREADS) 556 .map(|_| { 557 loom::thread::spawn(move || { 558 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)); 559 560 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err( 561 "stopping the executor should always result in a Closed(()) error here", 562 ); 563 }) 564 }) 565 .collect(); 566 567 for join in joins { 568 join.join().unwrap(); 569 } 570 assert!(CALLED.load(Ordering::SeqCst)); 571 }); 572 } 573 574 #[test] 575 fn join_handle_cross_thread() { 576 let _trace = tracing_subscriber::fmt() 577 .with_env_filter(EnvFilter::from_default_env()) 578 .set_default(); 579 580 loom::model(|| { 581 loom::lazy_static! { 582 static ref EXEC: Executor = Executor::new(); 583 } 584 585 let (tx, rx) = loom::sync::mpsc::channel::<JoinHandle<u32>>(); 586 587 let h0 = loom::thread::spawn(move || { 588 let tid = loom::thread::current().id(); 589 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)); 590 591 let h = EXEC 592 .try_spawn(async move { 593 // make sure the task is actually polled on thread 0 594 assert_eq!(loom::thread::current().id(), tid); 595 596 crate::task::yield_now().await; 597 598 // make sure the task is actually polled on thread 0 599 assert_eq!(loom::thread::current().id(), tid); 600 601 42 602 }) 603 .unwrap(); 604 605 tx.send(h).unwrap(); 606 607 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err( 608 "stopping the executor should always result in a Closed(()) error here", 609 ); 610 }); 611 612 let h1 = loom::thread::spawn(move || { 613 let h = rx.recv().unwrap(); 614 615 let ret_code = test_util::block_on(h).unwrap(); 616 617 assert_eq!(ret_code, 42); 618 619 EXEC.close(); 620 }); 621 622 h0.join().unwrap(); 623 h1.join().unwrap(); 624 }); 625 } 626}