Next Generation WASM Microkernel Operating System
at main 632 lines 20 kB view raw
1// Copyright 2025. Jonas Kruckenberg 2// 3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5// http://opensource.org/licenses/MIT>, at your option. This file may not be 6// copied, modified, or distributed except according to those terms. 7 8mod steal; 9 10use core::alloc::AllocError; 11use core::num::NonZeroUsize; 12use core::ptr; 13use core::ptr::NonNull; 14use core::sync::atomic::Ordering; 15 16use cordyceps::mpsc_queue::{MpscQueue, TryDequeueError}; 17use futures::pin_mut; 18use k23_cpu_local::collection::CpuLocal; 19use k23_fastrand::FastRand; 20use k23_spin::Backoff; 21 22use crate::error::{Closed, SpawnError}; 23use crate::executor::steal::{Injector, Stealer, TryStealError}; 24use crate::future::Either; 25use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize}; 26use crate::sync::wait_queue::WaitQueue; 27use crate::task::{Header, JoinHandle, PollResult, TaskBuilder, TaskRef}; 28 29#[derive(Debug)] 30pub struct Executor { 31 schedulers: CpuLocal<Scheduler>, 32 injector: Injector, 33 sleepers: WaitQueue, 34} 35 36#[derive(Debug)] 37pub struct Worker { 38 id: usize, 39 executor: &'static Executor, 40 scheduler: &'static Scheduler, 41 rng: FastRand, 42} 43 44/// Information about the scheduler state produced after ticking. 45#[derive(Debug)] 46#[non_exhaustive] 47pub struct Tick { 48 /// `true` if the tick completed with any tasks remaining in the run queue. 49 pub has_remaining: bool, 50 51 /// The total number of tasks polled on this scheduler tick. 52 pub polled: usize, 53 54 /// The number of polled tasks that *completed* on this scheduler tick. 55 /// 56 /// This should always be <= `self.polled`. 57 #[cfg(feature = "counters")] 58 pub completed: usize, 59 60 /// The number of tasks that were spawned since the last tick. 61 #[cfg(feature = "counters")] 62 pub spawned: usize, 63 64 /// The number of tasks that were woken from outside their own `poll` calls since the last tick. 65 #[cfg(feature = "counters")] 66 pub woken_external: usize, 67 68 /// The number of tasks that were woken from within their own `poll` calls during this tick. 69 #[cfg(feature = "counters")] 70 pub woken_internal: usize, 71} 72 73#[derive(Debug)] 74pub struct Scheduler { 75 run_queue: MpscQueue<Header>, 76 current_task: AtomicPtr<Header>, 77 queued: AtomicUsize, 78 #[cfg(feature = "counters")] 79 spawned: AtomicUsize, 80 #[cfg(feature = "counters")] 81 woken: AtomicUsize, 82} 83 84// === impl Executor === 85 86impl Executor { 87 /// # Errors 88 /// 89 /// Returns `AllocError` when allocating the underlying resources fails. 90 pub fn new() -> Result<Self, AllocError> { 91 Ok(Self { 92 schedulers: CpuLocal::new(), 93 injector: Injector::new()?, 94 sleepers: WaitQueue::new(), 95 }) 96 } 97 98 /// # Errors 99 /// 100 /// Returns `AllocError` when allocating the underlying resources fails. 101 pub fn with_capacity(capacity: usize) -> Result<Self, AllocError> { 102 Ok(Self { 103 schedulers: CpuLocal::with_capacity(capacity), 104 injector: Injector::new()?, 105 sleepers: WaitQueue::new(), 106 }) 107 } 108 109 /// Closes the executor. 110 /// 111 /// After calling close all ongoing and future calls to [`Worker::run`] will return `Err(Closed)`. 112 pub fn close(&self) { 113 self.sleepers.close(); 114 } 115 116 /// Returns `true` if the executor has been closed. 117 /// 118 /// The executor is closed by calling [`close`][Self::close]. 119 /// 120 /// If true is returned, a call to send will always result in an error. 121 pub fn is_closed(&self) -> bool { 122 self.sleepers.is_closed() 123 } 124 125 pub fn wake_one(&self) { 126 self.sleepers.wake(); 127 } 128 129 pub fn current_scheduler(&self) -> Option<&Scheduler> { 130 self.schedulers.get() 131 } 132 133 pub fn build_task<'a>( 134 &'static self, 135 ) -> TaskBuilder<'a, impl Fn(TaskRef) -> Result<(), Closed>> { 136 TaskBuilder::new(|task| { 137 if self.is_closed() { 138 return Err(Closed(())); 139 } 140 141 if let Some(scheduler) = self.schedulers.get() { 142 // we need to bind the scheduler here 143 task.bind_scheduler(scheduler); 144 145 scheduler.schedule(task); 146 147 Ok(()) 148 } else { 149 self.injector.push_task(task); 150 151 Ok(()) 152 } 153 }) 154 } 155 156 /// Attempt spawn this [`Future`] onto this executor. 157 /// 158 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`] 159 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects 160 /// of its runtime behaviour (such as cancelling it). 161 /// 162 /// # Errors 163 /// 164 /// Returns [`AllocError`] when allocation of the task fails. 165 #[track_caller] 166 pub fn try_spawn<F>(&'static self, future: F) -> Result<JoinHandle<F::Output, ()>, SpawnError> 167 where 168 F: Future + Send + 'static, 169 F::Output: Send + 'static, 170 { 171 self.build_task().try_spawn(future) 172 } 173 174 /// Attempt spawn this [`Future`] with the provided metadata onto this executor. 175 /// 176 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`] 177 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects 178 /// of its runtime behaviour (such as cancelling it). 179 /// 180 /// # Errors 181 /// 182 /// Returns [`AllocError`] when allocation of the task fails. 183 #[track_caller] 184 pub fn try_spawn_with_metadata<F, M>( 185 &'static self, 186 future: F, 187 metadata: M, 188 ) -> Result<JoinHandle<F::Output, M>, SpawnError> 189 where 190 F: Future + Send, 191 F::Output: Send, 192 M: Send, 193 { 194 self.build_task().try_spawn_with_metadata(future, metadata) 195 } 196} 197 198// === impl Worker === 199 200impl Worker { 201 /// # Errors 202 /// 203 /// Returns `AllocError` when allocating the underlying resources fails. 204 pub fn new(executor: &'static Executor, rng: FastRand) -> Result<Self, AllocError> { 205 let id = executor.schedulers.len(); 206 let core = executor.schedulers.get_or_try(Scheduler::new)?; 207 208 Ok(Self { 209 id, 210 executor, 211 scheduler: core, 212 rng, 213 }) 214 } 215 216 /// Returns a reference to the task that's current being polled or `None`. 217 #[must_use] 218 pub fn current_task(&self) -> Option<TaskRef> { 219 self.scheduler.current_task() 220 } 221 222 /// Run the worker main loop until the given future completes. 223 /// 224 /// # Errors 225 /// 226 /// Returns `Err(Closed)` if the executor has been closed before the future completes. 227 pub async fn run<F>(&mut self, future: F) -> Result<F::Output, Closed> 228 where 229 F: Future + Send, 230 F::Output: Send, 231 { 232 if self.executor.is_closed() { 233 return Err(Closed(())); 234 } 235 236 let main_loop = self.main_loop(); 237 pin_mut!(future); 238 pin_mut!(main_loop); 239 240 let res = crate::future::select(future, main_loop).await; 241 match res { 242 Either::Left((val, _)) => Ok(val), 243 // The `main_loop` future either never returns or always returns Err(Closed) 244 Either::Right((Err(err), _)) => Err(err), 245 } 246 } 247 248 async fn main_loop(&mut self) -> Result<!, Closed> { 249 loop { 250 if self.tick().has_remaining { 251 continue; 252 } 253 254 tracing::trace!("worker {} going to sleep...", self.id); 255 self.executor.sleepers.wait().await?; 256 tracing::trace!("worker woke up"); 257 } 258 } 259 260 pub fn tick(&mut self) -> Tick { 261 let mut tick = self.scheduler.tick_n(256); 262 tracing::trace!(worker = self.id, ?tick, "worker tick"); 263 264 if tick.has_remaining { 265 return tick; 266 } 267 268 // if there are no tasks remaining in this core's run queue, try to 269 // steal new tasks from the distributor queue. 270 if let Some(stolen) = self.try_steal() { 271 tracing::trace!(tick.stolen = stolen); 272 273 // if we stole tasks, we need to keep ticking 274 tick.has_remaining = true; 275 return tick; 276 } 277 278 // if we have no remaining woken tasks, and we didn't steal any new 279 // tasks, this core can sleep until an interrupt occurs. 280 tick 281 } 282 283 fn try_steal(&mut self) -> Option<NonZeroUsize> { 284 const ROUNDS: usize = 4; 285 const MAX_STOLEN_PER_TICK: usize = 256; 286 287 // attempt to steal from the global injector queue first 288 if let Ok(stealer) = self.executor.injector.try_steal() { 289 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK); 290 tracing::trace!("stole {stolen} tasks from injector (in first attempt)"); 291 return NonZeroUsize::new(stolen); 292 } 293 294 // If that fails, attempt to steal from other workers 295 let num_workers = self.executor.schedulers.len(); 296 297 // if there is only one worker, there is no one to steal from anyway 298 if num_workers <= 1 { 299 return None; 300 } 301 302 let mut backoff = Backoff::new(); 303 304 for _ in 0..ROUNDS { 305 // Start from a random worker 306 let start = self.rng.fastrand_n(u32::try_from(num_workers).unwrap()) as usize; 307 308 if let Some(stolen) = self.try_steal_one_round(num_workers, start) { 309 return Some(stolen); 310 } 311 312 backoff.spin(); 313 } 314 315 // as a last resort try to steal from the global injector queue again 316 if let Ok(stealer) = self.executor.injector.try_steal() { 317 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK); 318 tracing::trace!("stole {stolen} tasks from injector (in second attempt)"); 319 return NonZeroUsize::new(stolen); 320 } 321 322 None 323 } 324 325 fn try_steal_one_round(&mut self, num_workers: usize, start: usize) -> Option<NonZeroUsize> { 326 for i in 0..num_workers { 327 let i = (start + i) % num_workers; 328 329 // Don't steal from ourselves! We know we don't have work. 330 if i == self.id { 331 continue; 332 } 333 334 let Some(victim) = self.executor.schedulers.iter().nth(i) else { 335 // The worker might not be online yet, just advance past 336 continue; 337 }; 338 339 let Ok(stealer) = victim.try_steal() else { 340 // the victim either doesn't have any tasks, or is already being stolen from 341 // either way, just advance past 342 continue; 343 }; 344 345 let stolen = stealer.spawn_half(self.scheduler); 346 tracing::trace!("stole {stolen} tasks from worker {i} {victim:?}"); 347 return NonZeroUsize::new(stolen); 348 } 349 350 None 351 } 352} 353 354// === impl Scheduler === 355 356impl Scheduler { 357 fn new() -> Result<Self, AllocError> { 358 let stub_task = TaskRef::new_stub()?; 359 360 Ok(Self { 361 run_queue: MpscQueue::new_with_stub(stub_task), 362 queued: AtomicUsize::new(0), 363 current_task: AtomicPtr::new(ptr::null_mut()), 364 #[cfg(feature = "counters")] 365 spawned: AtomicUsize::new(0), 366 #[cfg(feature = "counters")] 367 woken: AtomicUsize::new(0), 368 }) 369 } 370 371 /// Returns a reference to the task that's current being polled or `None`. 372 #[must_use] 373 pub fn current_task(&self) -> Option<TaskRef> { 374 let ptr = NonNull::new(self.current_task.load(Ordering::Acquire))?; 375 Some(TaskRef::clone_from_raw(ptr)) 376 } 377 378 pub fn schedule(&self, task: TaskRef) { 379 self.queued.fetch_add(1, Ordering::AcqRel); 380 self.run_queue.enqueue(task); 381 } 382 383 fn tick_n(&'static self, n: usize) -> Tick { 384 tracing::trace!("tick_n({self:p}, {n})"); 385 386 let mut tick = Tick { 387 has_remaining: false, 388 polled: 0, 389 #[cfg(feature = "counters")] 390 completed: 0, 391 #[cfg(feature = "counters")] 392 spawned: 0, 393 #[cfg(feature = "counters")] 394 woken_external: 0, 395 #[cfg(feature = "counters")] 396 woken_internal: 0, 397 }; 398 399 while tick.polled < n { 400 let task = match self.run_queue.try_dequeue() { 401 Ok(task) => task, 402 // If inconsistent, just try again. 403 Err(TryDequeueError::Inconsistent) => { 404 tracing::trace!("scheduler queue {:?} inconsistent", self.run_queue); 405 core::hint::spin_loop(); 406 continue; 407 } 408 // Queue is empty or busy (in use by something else), bail out. 409 Err(TryDequeueError::Busy | TryDequeueError::Empty) => { 410 tracing::trace!("scheduler queue {:?} busy or empty", self.run_queue); 411 break; 412 } 413 }; 414 415 self.queued.fetch_sub(1, Ordering::SeqCst); 416 417 let _span = tracing::trace_span!( 418 "poll", 419 task.addr = ?task.header_ptr(), 420 task.tid = task.id().as_u64(), 421 ) 422 .entered(); 423 424 // store the currently polled task in the `current_task` pointer. 425 // using `TaskRef::as_ptr` is safe here, since we will clear the 426 // `current_task` pointer before dropping the `TaskRef`. 427 self.current_task 428 .store(task.header_ptr().as_ptr(), Ordering::Release); 429 430 // poll the task 431 let poll_result = task.poll(); 432 433 // clear the current task cell before potentially dropping the 434 // `TaskRef`. 435 self.current_task.store(ptr::null_mut(), Ordering::Release); 436 437 tick.polled += 1; 438 match poll_result { 439 PollResult::Ready | PollResult::ReadyJoined => { 440 #[cfg(feature = "counters")] 441 { 442 tick.completed += 1; 443 } 444 } 445 PollResult::PendingSchedule => { 446 self.schedule(task); 447 #[cfg(feature = "counters")] 448 { 449 tick.woken_internal += 1; 450 } 451 } 452 PollResult::Pending => {} 453 } 454 455 #[cfg(not(feature = "counters"))] 456 tracing::trace!(poll = ?poll_result, tick.polled); 457 #[cfg(feature = "counters")] 458 tracing::trace!(poll = ?poll_result, tick.polled, tick.completed); 459 } 460 461 #[cfg(feature = "counters")] 462 { 463 tick.spawned = self.spawned.swap(0, Ordering::Relaxed); 464 tick.woken_external = self.woken.swap(0, Ordering::Relaxed); 465 } 466 467 // are there still tasks in the queue? if so, we have more tasks to poll. 468 if self.queued.load(Ordering::SeqCst) > 0 { 469 tick.has_remaining = true; 470 } 471 472 if tick.polled > 0 { 473 // log scheduler metrics. 474 #[cfg(not(feature = "counters"))] 475 tracing::trace!(tick.polled, tick.has_remaining); 476 #[cfg(feature = "counters")] 477 tracing::trace!( 478 tick.polled, 479 tick.has_remaining, 480 tick.completed, 481 tick.spawned, 482 tick.woken = tick.woken_external + tick.woken_internal, 483 tick.woken.external = tick.woken_external, 484 tick.woken.internal = tick.woken_internal 485 ); 486 } 487 488 tick 489 } 490 491 fn try_steal(&self) -> Result<Stealer<'_>, TryStealError> { 492 Stealer::new(&self.run_queue, &self.queued) 493 } 494} 495 496#[cfg(test)] 497mod tests { 498 use core::hint::black_box; 499 use core::sync::atomic::AtomicBool; 500 501 use tracing_subscriber::EnvFilter; 502 use tracing_subscriber::util::SubscriberInitExt; 503 504 use super::*; 505 use crate::{loom, test_util}; 506 507 async fn work() -> usize { 508 let val = 1 + 1; 509 crate::task::yield_now().await; 510 black_box(val) 511 } 512 513 #[test] 514 fn single_threaded() { 515 let _trace = tracing_subscriber::fmt() 516 .with_env_filter(EnvFilter::from_default_env()) 517 .set_default(); 518 519 loom::model(|| { 520 loom::lazy_static! { 521 static ref EXEC: Executor = Executor::new().unwrap(); 522 static ref CALLED: AtomicBool = AtomicBool::new(false); 523 } 524 525 EXEC.try_spawn(async move { 526 work().await; 527 CALLED.store(true, Ordering::SeqCst); 528 EXEC.close(); 529 }) 530 .unwrap(); 531 532 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)).unwrap(); 533 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err( 534 "stopping the executor should always result in a Closed(()) error here", 535 ); 536 assert!(CALLED.load(Ordering::SeqCst)); 537 }) 538 } 539 540 #[test] 541 fn multi_threaded() { 542 let _trace = tracing_subscriber::fmt() 543 .with_env_filter(EnvFilter::from_default_env()) 544 .set_default(); 545 546 loom::model(|| { 547 const NUM_THREADS: usize = 3; 548 549 loom::lazy_static! { 550 static ref EXEC: Executor = Executor::new().unwrap(); 551 static ref CALLED: AtomicBool = AtomicBool::new(false); 552 } 553 554 EXEC.try_spawn(async move { 555 work().await; 556 CALLED.store(true, Ordering::SeqCst); 557 EXEC.close(); 558 }) 559 .unwrap(); 560 561 let joins: Vec<_> = (0..NUM_THREADS) 562 .map(|_| { 563 loom::thread::spawn(move || { 564 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)).unwrap(); 565 566 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err( 567 "stopping the executor should always result in a Closed(()) error here", 568 ); 569 }) 570 }) 571 .collect(); 572 573 for join in joins { 574 join.join().unwrap(); 575 } 576 assert!(CALLED.load(Ordering::SeqCst)); 577 }); 578 } 579 580 #[test] 581 fn join_handle_cross_thread() { 582 let _trace = tracing_subscriber::fmt() 583 .with_env_filter(EnvFilter::from_default_env()) 584 .set_default(); 585 586 loom::model(|| { 587 loom::lazy_static! { 588 static ref EXEC: Executor = Executor::new().unwrap(); 589 } 590 591 let (tx, rx) = loom::sync::mpsc::channel::<JoinHandle<u32, ()>>(); 592 593 let h0 = loom::thread::spawn(move || { 594 let tid = loom::thread::current().id(); 595 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)).unwrap(); 596 597 let h = EXEC 598 .try_spawn(async move { 599 // make sure the task is actually polled on thread 0 600 assert_eq!(loom::thread::current().id(), tid); 601 602 crate::task::yield_now().await; 603 604 // make sure the task is actually polled on thread 0 605 assert_eq!(loom::thread::current().id(), tid); 606 607 42 608 }) 609 .unwrap(); 610 611 tx.send(h).unwrap(); 612 613 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err( 614 "stopping the executor should always result in a Closed(()) error here", 615 ); 616 }); 617 618 let h1 = loom::thread::spawn(move || { 619 let h = rx.recv().unwrap(); 620 621 let ret_code = test_util::block_on(h).unwrap(); 622 623 assert_eq!(ret_code, 42); 624 625 EXEC.close(); 626 }); 627 628 h0.join().unwrap(); 629 h1.join().unwrap(); 630 }); 631 } 632}