Next Generation WASM Microkernel Operating System
1// Copyright 2025. Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8mod steal;
9
10use crate::error::Closed;
11use crate::error::SpawnError;
12use crate::executor::steal::{Injector, Stealer, TryStealError};
13use crate::future::Either;
14use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
15use crate::sync::wait_queue::WaitQueue;
16use crate::task::{Header, JoinHandle, PollResult, Task, TaskBuilder, TaskRef};
17use alloc::boxed::Box;
18use cordyceps::mpsc_queue::{MpscQueue, TryDequeueError};
19use core::alloc::Allocator;
20use core::num::NonZeroUsize;
21use core::ptr;
22use core::ptr::NonNull;
23use core::sync::atomic::Ordering;
24use cpu_local::collection::CpuLocal;
25use fastrand::FastRand;
26use futures::pin_mut;
27use spin::Backoff;
28
29#[derive(Debug)]
30pub struct Executor {
31 schedulers: CpuLocal<Scheduler>,
32 injector: Injector,
33 sleepers: WaitQueue,
34}
35
36#[derive(Debug)]
37pub struct Worker {
38 id: usize,
39 executor: &'static Executor,
40 scheduler: &'static Scheduler,
41 rng: FastRand,
42}
43
44/// Information about the scheduler state produced after ticking.
45#[derive(Debug)]
46#[non_exhaustive]
47pub struct Tick {
48 /// `true` if the tick completed with any tasks remaining in the run queue.
49 pub has_remaining: bool,
50
51 /// The total number of tasks polled on this scheduler tick.
52 pub polled: usize,
53
54 /// The number of polled tasks that *completed* on this scheduler tick.
55 ///
56 /// This should always be <= `self.polled`.
57 #[cfg(feature = "counters")]
58 pub completed: usize,
59
60 /// The number of tasks that were spawned since the last tick.
61 #[cfg(feature = "counters")]
62 pub spawned: usize,
63
64 /// The number of tasks that were woken from outside their own `poll` calls since the last tick.
65 #[cfg(feature = "counters")]
66 pub woken_external: usize,
67
68 /// The number of tasks that were woken from within their own `poll` calls during this tick.
69 #[cfg(feature = "counters")]
70 pub woken_internal: usize,
71}
72
73#[derive(Debug)]
74pub struct Scheduler {
75 run_queue: MpscQueue<Header>,
76 current_task: AtomicPtr<Header>,
77 queued: AtomicUsize,
78 #[cfg(feature = "counters")]
79 spawned: AtomicUsize,
80 #[cfg(feature = "counters")]
81 woken: AtomicUsize,
82}
83
84// === impl Executor ===
85
86impl Default for Executor {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl Executor {
93 pub fn new() -> Self {
94 Self {
95 schedulers: CpuLocal::new(),
96 injector: Injector::new(),
97 sleepers: WaitQueue::new(),
98 }
99 }
100
101 pub fn with_capacity(capacity: usize) -> Self {
102 Self {
103 schedulers: CpuLocal::with_capacity(capacity),
104 injector: Injector::new(),
105 sleepers: WaitQueue::new(),
106 }
107 }
108
109 /// Closes the executor.
110 ///
111 /// After calling close all ongoing and future calls to [`Worker::run`] will return `Err(Closed)`.
112 pub fn close(&self) {
113 self.sleepers.close();
114 }
115
116 /// Returns `true` if the executor has been closed.
117 ///
118 /// The executor is closed by calling [`close`][Self::close].
119 ///
120 /// If true is returned, a call to send will always result in an error.
121 pub fn is_closed(&self) -> bool {
122 self.sleepers.is_closed()
123 }
124
125 pub fn wake_one(&self) {
126 self.sleepers.wake();
127 }
128
129 pub fn current_scheduler(&self) -> Option<&Scheduler> {
130 self.schedulers.get()
131 }
132
133 pub fn build_task<'a>(
134 &'static self,
135 ) -> TaskBuilder<'a, impl Fn(TaskRef) -> Result<(), Closed>> {
136 TaskBuilder::new(|task| {
137 if self.is_closed() {
138 return Err(Closed(()));
139 }
140
141 if let Some(scheduler) = self.schedulers.get() {
142 // we need to bind the scheduler here
143 task.bind_scheduler(scheduler);
144
145 scheduler.schedule(task);
146
147 Ok(())
148 } else {
149 self.injector.push_task(task);
150
151 Ok(())
152 }
153 })
154 }
155
156 /// Attempt spawn this [`Future`] onto this executor.
157 ///
158 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`]
159 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects
160 /// of its runtime behaviour (such as cancelling it).
161 ///
162 /// # Errors
163 ///
164 /// Returns [`AllocError`] when allocation of the task fails.
165 pub fn try_spawn<F>(&'static self, future: F) -> Result<JoinHandle<F::Output>, SpawnError>
166 where
167 F: Future + Send + 'static,
168 F::Output: Send + 'static,
169 {
170 self.build_task().try_spawn(future)
171 }
172
173 /// Attempt spawn this [`Future`] onto this executor using a custom [`Allocator`].
174 ///
175 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`]
176 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects
177 /// of its runtime behaviour (such as cancelling it).
178 ///
179 /// # Errors
180 ///
181 /// Returns [`AllocError`] when allocation of the task fails.
182 pub fn try_spawn_in<F, A>(
183 &'static self,
184 future: F,
185 alloc: A,
186 ) -> Result<JoinHandle<F::Output>, SpawnError>
187 where
188 F: Future + Send + 'static,
189 F::Output: Send + 'static,
190 A: Allocator,
191 {
192 self.build_task().try_spawn_in(future, alloc)
193 }
194}
195
196// === impl Worker ===
197
198impl Worker {
199 pub fn new(executor: &'static Executor, rng: FastRand) -> Self {
200 let id = executor.schedulers.len();
201 let core = executor.schedulers.get_or(Scheduler::new);
202
203 Self {
204 id,
205 executor,
206 scheduler: core,
207 rng,
208 }
209 }
210
211 /// Returns a reference to the task that's current being polled or `None`.
212 #[must_use]
213 pub fn current_task(&self) -> Option<TaskRef> {
214 self.scheduler.current_task()
215 }
216
217 /// Run the worker main loop until the given future completes.
218 ///
219 /// # Errors
220 ///
221 /// Returns `Err(Closed)` if the executor has been closed before the future completes.
222 pub async fn run<F>(&mut self, future: F) -> Result<F::Output, Closed>
223 where
224 F: Future + Send,
225 F::Output: Send,
226 {
227 if self.executor.is_closed() {
228 return Err(Closed(()));
229 }
230
231 let main_loop = self.main_loop();
232 pin_mut!(future);
233 pin_mut!(main_loop);
234
235 let res = crate::future::select(future, main_loop).await;
236 match res {
237 Either::Left((val, _)) => Ok(val),
238 // The `main_loop` future either never returns or always returns Err(Closed)
239 Either::Right((Err(err), _)) => Err(err),
240 }
241 }
242
243 async fn main_loop(&mut self) -> Result<!, Closed> {
244 loop {
245 if self.tick().has_remaining {
246 continue;
247 }
248
249 tracing::trace!("worker {} going to sleep...", self.id);
250 self.executor.sleepers.wait().await?;
251 tracing::trace!("worker woke up");
252 }
253 }
254
255 pub fn tick(&mut self) -> Tick {
256 let mut tick = self.scheduler.tick_n(256);
257 tracing::trace!(worker = self.id, ?tick, "worker tick");
258
259 if tick.has_remaining {
260 return tick;
261 }
262
263 // if there are no tasks remaining in this core's run queue, try to
264 // steal new tasks from the distributor queue.
265 if let Some(stolen) = self.try_steal() {
266 tracing::trace!(tick.stolen = stolen);
267
268 // if we stole tasks, we need to keep ticking
269 tick.has_remaining = true;
270 return tick;
271 }
272
273 // if we have no remaining woken tasks, and we didn't steal any new
274 // tasks, this core can sleep until an interrupt occurs.
275 tick
276 }
277
278 fn try_steal(&mut self) -> Option<NonZeroUsize> {
279 const ROUNDS: usize = 4;
280 const MAX_STOLEN_PER_TICK: usize = 256;
281
282 // attempt to steal from the global injector queue first
283 if let Ok(stealer) = self.executor.injector.try_steal() {
284 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK);
285 tracing::trace!("stole {stolen} tasks from injector (in first attempt)");
286 return NonZeroUsize::new(stolen);
287 }
288
289 // If that fails, attempt to steal from other workers
290 let num_workers = self.executor.schedulers.len();
291
292 // if there is only one worker, there is no one to steal from anyway
293 if num_workers <= 1 {
294 return None;
295 }
296
297 let mut backoff = Backoff::new();
298
299 for _ in 0..ROUNDS {
300 // Start from a random worker
301 let start = self.rng.fastrand_n(u32::try_from(num_workers).unwrap()) as usize;
302
303 if let Some(stolen) = self.try_steal_one_round(num_workers, start) {
304 return Some(stolen);
305 }
306
307 backoff.spin();
308 }
309
310 // as a last resort try to steal from the global injector queue again
311 if let Ok(stealer) = self.executor.injector.try_steal() {
312 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK);
313 tracing::trace!("stole {stolen} tasks from injector (in second attempt)");
314 return NonZeroUsize::new(stolen);
315 }
316
317 None
318 }
319
320 fn try_steal_one_round(&mut self, num_workers: usize, start: usize) -> Option<NonZeroUsize> {
321 for i in 0..num_workers {
322 let i = (start + i) % num_workers;
323
324 // Don't steal from ourselves! We know we don't have work.
325 if i == self.id {
326 continue;
327 }
328
329 let Some(victim) = self.executor.schedulers.iter().nth(i) else {
330 // The worker might not be online yet, just advance past
331 continue;
332 };
333
334 let Ok(stealer) = victim.try_steal() else {
335 // the victim either doesn't have any tasks, or is already being stolen from
336 // either way, just advance past
337 continue;
338 };
339
340 let stolen = stealer.spawn_half(self.scheduler);
341 tracing::trace!("stole {stolen} tasks from worker {i} {victim:?}");
342 return NonZeroUsize::new(stolen);
343 }
344
345 None
346 }
347}
348
349// === impl Scheduler ===
350
351impl Scheduler {
352 fn new() -> Self {
353 let stub_task = Box::new(Task::new_stub());
354 let (stub_task, _) = TaskRef::new_allocated(stub_task);
355
356 Self {
357 run_queue: MpscQueue::new_with_stub(stub_task),
358 queued: AtomicUsize::new(0),
359 current_task: AtomicPtr::new(ptr::null_mut()),
360 #[cfg(feature = "counters")]
361 spawned: AtomicUsize::new(0),
362 #[cfg(feature = "counters")]
363 woken: AtomicUsize::new(0),
364 }
365 }
366
367 /// Returns a reference to the task that's current being polled or `None`.
368 #[must_use]
369 pub fn current_task(&self) -> Option<TaskRef> {
370 let ptr = NonNull::new(self.current_task.load(Ordering::Acquire))?;
371 Some(TaskRef::clone_from_raw(ptr))
372 }
373
374 pub fn schedule(&self, task: TaskRef) {
375 self.queued.fetch_add(1, Ordering::AcqRel);
376 self.run_queue.enqueue(task);
377 }
378
379 fn tick_n(&'static self, n: usize) -> Tick {
380 tracing::trace!("tick_n({self:p}, {n})");
381
382 let mut tick = Tick {
383 has_remaining: false,
384 polled: 0,
385 #[cfg(feature = "counters")]
386 completed: 0,
387 #[cfg(feature = "counters")]
388 spawned: 0,
389 #[cfg(feature = "counters")]
390 woken_external: 0,
391 #[cfg(feature = "counters")]
392 woken_internal: 0,
393 };
394
395 while tick.polled < n {
396 let task = match self.run_queue.try_dequeue() {
397 Ok(task) => task,
398 // If inconsistent, just try again.
399 Err(TryDequeueError::Inconsistent) => {
400 tracing::trace!("scheduler queue {:?} inconsistent", self.run_queue);
401 core::hint::spin_loop();
402 continue;
403 }
404 // Queue is empty or busy (in use by something else), bail out.
405 Err(TryDequeueError::Busy | TryDequeueError::Empty) => {
406 tracing::trace!("scheduler queue {:?} busy or empty", self.run_queue);
407 break;
408 }
409 };
410
411 self.queued.fetch_sub(1, Ordering::SeqCst);
412
413 let _span = tracing::trace_span!(
414 "poll",
415 task.addr = ?task.header_ptr(),
416 task.tid = task.id().as_u64(),
417 )
418 .entered();
419
420 // store the currently polled task in the `current_task` pointer.
421 // using `TaskRef::as_ptr` is safe here, since we will clear the
422 // `current_task` pointer before dropping the `TaskRef`.
423 self.current_task
424 .store(task.header_ptr().as_ptr(), Ordering::Release);
425
426 // poll the task
427 let poll_result = task.poll();
428
429 // clear the current task cell before potentially dropping the
430 // `TaskRef`.
431 self.current_task.store(ptr::null_mut(), Ordering::Release);
432
433 tick.polled += 1;
434 match poll_result {
435 PollResult::Ready | PollResult::ReadyJoined => {
436 #[cfg(feature = "counters")]
437 {
438 tick.completed += 1;
439 }
440 }
441 PollResult::PendingSchedule => {
442 self.schedule(task);
443 #[cfg(feature = "counters")]
444 {
445 tick.woken_internal += 1;
446 }
447 }
448 PollResult::Pending => {}
449 }
450
451 #[cfg(not(feature = "counters"))]
452 tracing::trace!(poll = ?poll_result, tick.polled);
453 #[cfg(feature = "counters")]
454 tracing::trace!(poll = ?poll_result, tick.polled, tick.completed);
455 }
456
457 #[cfg(feature = "counters")]
458 {
459 tick.spawned = self.spawned.swap(0, Ordering::Relaxed);
460 tick.woken_external = self.woken.swap(0, Ordering::Relaxed);
461 }
462
463 // are there still tasks in the queue? if so, we have more tasks to poll.
464 if self.queued.load(Ordering::SeqCst) > 0 {
465 tick.has_remaining = true;
466 }
467
468 if tick.polled > 0 {
469 // log scheduler metrics.
470 #[cfg(not(feature = "counters"))]
471 tracing::trace!(tick.polled, tick.has_remaining);
472 #[cfg(feature = "counters")]
473 tracing::trace!(
474 tick.polled,
475 tick.has_remaining,
476 tick.completed,
477 tick.spawned,
478 tick.woken = tick.woken_external + tick.woken_internal,
479 tick.woken.external = tick.woken_external,
480 tick.woken.internal = tick.woken_internal
481 );
482 }
483
484 tick
485 }
486
487 fn try_steal(&self) -> Result<Stealer, TryStealError> {
488 Stealer::new(&self.run_queue, &self.queued)
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::{loom, test_util};
496 use core::hint::black_box;
497 use core::sync::atomic::AtomicBool;
498 use tracing_subscriber::EnvFilter;
499 use tracing_subscriber::util::SubscriberInitExt;
500
501 async fn work() -> usize {
502 let val = 1 + 1;
503 crate::task::yield_now().await;
504 black_box(val)
505 }
506
507 #[test]
508 fn single_threaded() {
509 let _trace = tracing_subscriber::fmt()
510 .with_env_filter(EnvFilter::from_default_env())
511 .set_default();
512
513 loom::model(|| {
514 loom::lazy_static! {
515 static ref EXEC: Executor = Executor::new();
516 static ref CALLED: AtomicBool = AtomicBool::new(false);
517 }
518
519 EXEC.try_spawn(async move {
520 work().await;
521 CALLED.store(true, Ordering::SeqCst);
522 EXEC.close();
523 })
524 .unwrap();
525
526 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0));
527 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err(
528 "stopping the executor should always result in a Closed(()) error here",
529 );
530 assert!(CALLED.load(Ordering::SeqCst));
531 })
532 }
533
534 #[test]
535 fn multi_threaded() {
536 let _trace = tracing_subscriber::fmt()
537 .with_env_filter(EnvFilter::from_default_env())
538 .set_default();
539
540 loom::model(|| {
541 const NUM_THREADS: usize = 3;
542
543 loom::lazy_static! {
544 static ref EXEC: Executor = Executor::new();
545 static ref CALLED: AtomicBool = AtomicBool::new(false);
546 }
547
548 EXEC.try_spawn(async move {
549 work().await;
550 CALLED.store(true, Ordering::SeqCst);
551 EXEC.close();
552 })
553 .unwrap();
554
555 let joins: Vec<_> = (0..NUM_THREADS)
556 .map(|_| {
557 loom::thread::spawn(move || {
558 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0));
559
560 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err(
561 "stopping the executor should always result in a Closed(()) error here",
562 );
563 })
564 })
565 .collect();
566
567 for join in joins {
568 join.join().unwrap();
569 }
570 assert!(CALLED.load(Ordering::SeqCst));
571 });
572 }
573
574 #[test]
575 fn join_handle_cross_thread() {
576 let _trace = tracing_subscriber::fmt()
577 .with_env_filter(EnvFilter::from_default_env())
578 .set_default();
579
580 loom::model(|| {
581 loom::lazy_static! {
582 static ref EXEC: Executor = Executor::new();
583 }
584
585 let (tx, rx) = loom::sync::mpsc::channel::<JoinHandle<u32>>();
586
587 let h0 = loom::thread::spawn(move || {
588 let tid = loom::thread::current().id();
589 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0));
590
591 let h = EXEC
592 .try_spawn(async move {
593 // make sure the task is actually polled on thread 0
594 assert_eq!(loom::thread::current().id(), tid);
595
596 crate::task::yield_now().await;
597
598 // make sure the task is actually polled on thread 0
599 assert_eq!(loom::thread::current().id(), tid);
600
601 42
602 })
603 .unwrap();
604
605 tx.send(h).unwrap();
606
607 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err(
608 "stopping the executor should always result in a Closed(()) error here",
609 );
610 });
611
612 let h1 = loom::thread::spawn(move || {
613 let h = rx.recv().unwrap();
614
615 let ret_code = test_util::block_on(h).unwrap();
616
617 assert_eq!(ret_code, 42);
618
619 EXEC.close();
620 });
621
622 h0.join().unwrap();
623 h1.join().unwrap();
624 });
625 }
626}