Next Generation WASM Microkernel Operating System
1// Copyright 2025. Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8mod steal;
9
10use core::alloc::AllocError;
11use core::num::NonZeroUsize;
12use core::ptr;
13use core::ptr::NonNull;
14use core::sync::atomic::Ordering;
15
16use cordyceps::mpsc_queue::{MpscQueue, TryDequeueError};
17use futures::pin_mut;
18use k23_cpu_local::collection::CpuLocal;
19use k23_fastrand::FastRand;
20use k23_spin::Backoff;
21
22use crate::error::{Closed, SpawnError};
23use crate::executor::steal::{Injector, Stealer, TryStealError};
24use crate::future::Either;
25use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
26use crate::sync::wait_queue::WaitQueue;
27use crate::task::{Header, JoinHandle, PollResult, TaskBuilder, TaskRef};
28
29#[derive(Debug)]
30pub struct Executor {
31 schedulers: CpuLocal<Scheduler>,
32 injector: Injector,
33 sleepers: WaitQueue,
34}
35
36#[derive(Debug)]
37pub struct Worker {
38 id: usize,
39 executor: &'static Executor,
40 scheduler: &'static Scheduler,
41 rng: FastRand,
42}
43
44/// Information about the scheduler state produced after ticking.
45#[derive(Debug)]
46#[non_exhaustive]
47pub struct Tick {
48 /// `true` if the tick completed with any tasks remaining in the run queue.
49 pub has_remaining: bool,
50
51 /// The total number of tasks polled on this scheduler tick.
52 pub polled: usize,
53
54 /// The number of polled tasks that *completed* on this scheduler tick.
55 ///
56 /// This should always be <= `self.polled`.
57 #[cfg(feature = "counters")]
58 pub completed: usize,
59
60 /// The number of tasks that were spawned since the last tick.
61 #[cfg(feature = "counters")]
62 pub spawned: usize,
63
64 /// The number of tasks that were woken from outside their own `poll` calls since the last tick.
65 #[cfg(feature = "counters")]
66 pub woken_external: usize,
67
68 /// The number of tasks that were woken from within their own `poll` calls during this tick.
69 #[cfg(feature = "counters")]
70 pub woken_internal: usize,
71}
72
73#[derive(Debug)]
74pub struct Scheduler {
75 run_queue: MpscQueue<Header>,
76 current_task: AtomicPtr<Header>,
77 queued: AtomicUsize,
78 #[cfg(feature = "counters")]
79 spawned: AtomicUsize,
80 #[cfg(feature = "counters")]
81 woken: AtomicUsize,
82}
83
84// === impl Executor ===
85
86impl Executor {
87 /// # Errors
88 ///
89 /// Returns `AllocError` when allocating the underlying resources fails.
90 pub fn new() -> Result<Self, AllocError> {
91 Ok(Self {
92 schedulers: CpuLocal::new(),
93 injector: Injector::new()?,
94 sleepers: WaitQueue::new(),
95 })
96 }
97
98 /// # Errors
99 ///
100 /// Returns `AllocError` when allocating the underlying resources fails.
101 pub fn with_capacity(capacity: usize) -> Result<Self, AllocError> {
102 Ok(Self {
103 schedulers: CpuLocal::with_capacity(capacity),
104 injector: Injector::new()?,
105 sleepers: WaitQueue::new(),
106 })
107 }
108
109 /// Closes the executor.
110 ///
111 /// After calling close all ongoing and future calls to [`Worker::run`] will return `Err(Closed)`.
112 pub fn close(&self) {
113 self.sleepers.close();
114 }
115
116 /// Returns `true` if the executor has been closed.
117 ///
118 /// The executor is closed by calling [`close`][Self::close].
119 ///
120 /// If true is returned, a call to send will always result in an error.
121 pub fn is_closed(&self) -> bool {
122 self.sleepers.is_closed()
123 }
124
125 pub fn wake_one(&self) {
126 self.sleepers.wake();
127 }
128
129 pub fn current_scheduler(&self) -> Option<&Scheduler> {
130 self.schedulers.get()
131 }
132
133 pub fn build_task<'a>(
134 &'static self,
135 ) -> TaskBuilder<'a, impl Fn(TaskRef) -> Result<(), Closed>> {
136 TaskBuilder::new(|task| {
137 if self.is_closed() {
138 return Err(Closed(()));
139 }
140
141 if let Some(scheduler) = self.schedulers.get() {
142 // we need to bind the scheduler here
143 task.bind_scheduler(scheduler);
144
145 scheduler.schedule(task);
146
147 Ok(())
148 } else {
149 self.injector.push_task(task);
150
151 Ok(())
152 }
153 })
154 }
155
156 /// Attempt spawn this [`Future`] onto this executor.
157 ///
158 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`]
159 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects
160 /// of its runtime behaviour (such as cancelling it).
161 ///
162 /// # Errors
163 ///
164 /// Returns [`AllocError`] when allocation of the task fails.
165 #[track_caller]
166 pub fn try_spawn<F>(&'static self, future: F) -> Result<JoinHandle<F::Output, ()>, SpawnError>
167 where
168 F: Future + Send + 'static,
169 F::Output: Send + 'static,
170 {
171 self.build_task().try_spawn(future)
172 }
173
174 /// Attempt spawn this [`Future`] with the provided metadata onto this executor.
175 ///
176 /// This method returns a [`TaskRef`] which can be used to spawn it onto an [`crate::executor::Executor`]
177 /// and a [`JoinHandle`] which can be used to await the futures output as well as control some aspects
178 /// of its runtime behaviour (such as cancelling it).
179 ///
180 /// # Errors
181 ///
182 /// Returns [`AllocError`] when allocation of the task fails.
183 #[track_caller]
184 pub fn try_spawn_with_metadata<F, M>(
185 &'static self,
186 future: F,
187 metadata: M,
188 ) -> Result<JoinHandle<F::Output, M>, SpawnError>
189 where
190 F: Future + Send,
191 F::Output: Send,
192 M: Send,
193 {
194 self.build_task().try_spawn_with_metadata(future, metadata)
195 }
196}
197
198// === impl Worker ===
199
200impl Worker {
201 /// # Errors
202 ///
203 /// Returns `AllocError` when allocating the underlying resources fails.
204 pub fn new(executor: &'static Executor, rng: FastRand) -> Result<Self, AllocError> {
205 let id = executor.schedulers.len();
206 let core = executor.schedulers.get_or_try(Scheduler::new)?;
207
208 Ok(Self {
209 id,
210 executor,
211 scheduler: core,
212 rng,
213 })
214 }
215
216 /// Returns a reference to the task that's current being polled or `None`.
217 #[must_use]
218 pub fn current_task(&self) -> Option<TaskRef> {
219 self.scheduler.current_task()
220 }
221
222 /// Run the worker main loop until the given future completes.
223 ///
224 /// # Errors
225 ///
226 /// Returns `Err(Closed)` if the executor has been closed before the future completes.
227 pub async fn run<F>(&mut self, future: F) -> Result<F::Output, Closed>
228 where
229 F: Future + Send,
230 F::Output: Send,
231 {
232 if self.executor.is_closed() {
233 return Err(Closed(()));
234 }
235
236 let main_loop = self.main_loop();
237 pin_mut!(future);
238 pin_mut!(main_loop);
239
240 let res = crate::future::select(future, main_loop).await;
241 match res {
242 Either::Left((val, _)) => Ok(val),
243 // The `main_loop` future either never returns or always returns Err(Closed)
244 Either::Right((Err(err), _)) => Err(err),
245 }
246 }
247
248 async fn main_loop(&mut self) -> Result<!, Closed> {
249 loop {
250 if self.tick().has_remaining {
251 continue;
252 }
253
254 tracing::trace!("worker {} going to sleep...", self.id);
255 self.executor.sleepers.wait().await?;
256 tracing::trace!("worker woke up");
257 }
258 }
259
260 pub fn tick(&mut self) -> Tick {
261 let mut tick = self.scheduler.tick_n(256);
262 tracing::trace!(worker = self.id, ?tick, "worker tick");
263
264 if tick.has_remaining {
265 return tick;
266 }
267
268 // if there are no tasks remaining in this core's run queue, try to
269 // steal new tasks from the distributor queue.
270 if let Some(stolen) = self.try_steal() {
271 tracing::trace!(tick.stolen = stolen);
272
273 // if we stole tasks, we need to keep ticking
274 tick.has_remaining = true;
275 return tick;
276 }
277
278 // if we have no remaining woken tasks, and we didn't steal any new
279 // tasks, this core can sleep until an interrupt occurs.
280 tick
281 }
282
283 fn try_steal(&mut self) -> Option<NonZeroUsize> {
284 const ROUNDS: usize = 4;
285 const MAX_STOLEN_PER_TICK: usize = 256;
286
287 // attempt to steal from the global injector queue first
288 if let Ok(stealer) = self.executor.injector.try_steal() {
289 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK);
290 tracing::trace!("stole {stolen} tasks from injector (in first attempt)");
291 return NonZeroUsize::new(stolen);
292 }
293
294 // If that fails, attempt to steal from other workers
295 let num_workers = self.executor.schedulers.len();
296
297 // if there is only one worker, there is no one to steal from anyway
298 if num_workers <= 1 {
299 return None;
300 }
301
302 let mut backoff = Backoff::new();
303
304 for _ in 0..ROUNDS {
305 // Start from a random worker
306 let start = self.rng.fastrand_n(u32::try_from(num_workers).unwrap()) as usize;
307
308 if let Some(stolen) = self.try_steal_one_round(num_workers, start) {
309 return Some(stolen);
310 }
311
312 backoff.spin();
313 }
314
315 // as a last resort try to steal from the global injector queue again
316 if let Ok(stealer) = self.executor.injector.try_steal() {
317 let stolen = stealer.spawn_n(self.scheduler, MAX_STOLEN_PER_TICK);
318 tracing::trace!("stole {stolen} tasks from injector (in second attempt)");
319 return NonZeroUsize::new(stolen);
320 }
321
322 None
323 }
324
325 fn try_steal_one_round(&mut self, num_workers: usize, start: usize) -> Option<NonZeroUsize> {
326 for i in 0..num_workers {
327 let i = (start + i) % num_workers;
328
329 // Don't steal from ourselves! We know we don't have work.
330 if i == self.id {
331 continue;
332 }
333
334 let Some(victim) = self.executor.schedulers.iter().nth(i) else {
335 // The worker might not be online yet, just advance past
336 continue;
337 };
338
339 let Ok(stealer) = victim.try_steal() else {
340 // the victim either doesn't have any tasks, or is already being stolen from
341 // either way, just advance past
342 continue;
343 };
344
345 let stolen = stealer.spawn_half(self.scheduler);
346 tracing::trace!("stole {stolen} tasks from worker {i} {victim:?}");
347 return NonZeroUsize::new(stolen);
348 }
349
350 None
351 }
352}
353
354// === impl Scheduler ===
355
356impl Scheduler {
357 fn new() -> Result<Self, AllocError> {
358 let stub_task = TaskRef::new_stub()?;
359
360 Ok(Self {
361 run_queue: MpscQueue::new_with_stub(stub_task),
362 queued: AtomicUsize::new(0),
363 current_task: AtomicPtr::new(ptr::null_mut()),
364 #[cfg(feature = "counters")]
365 spawned: AtomicUsize::new(0),
366 #[cfg(feature = "counters")]
367 woken: AtomicUsize::new(0),
368 })
369 }
370
371 /// Returns a reference to the task that's current being polled or `None`.
372 #[must_use]
373 pub fn current_task(&self) -> Option<TaskRef> {
374 let ptr = NonNull::new(self.current_task.load(Ordering::Acquire))?;
375 Some(TaskRef::clone_from_raw(ptr))
376 }
377
378 pub fn schedule(&self, task: TaskRef) {
379 self.queued.fetch_add(1, Ordering::AcqRel);
380 self.run_queue.enqueue(task);
381 }
382
383 fn tick_n(&'static self, n: usize) -> Tick {
384 tracing::trace!("tick_n({self:p}, {n})");
385
386 let mut tick = Tick {
387 has_remaining: false,
388 polled: 0,
389 #[cfg(feature = "counters")]
390 completed: 0,
391 #[cfg(feature = "counters")]
392 spawned: 0,
393 #[cfg(feature = "counters")]
394 woken_external: 0,
395 #[cfg(feature = "counters")]
396 woken_internal: 0,
397 };
398
399 while tick.polled < n {
400 let task = match self.run_queue.try_dequeue() {
401 Ok(task) => task,
402 // If inconsistent, just try again.
403 Err(TryDequeueError::Inconsistent) => {
404 tracing::trace!("scheduler queue {:?} inconsistent", self.run_queue);
405 core::hint::spin_loop();
406 continue;
407 }
408 // Queue is empty or busy (in use by something else), bail out.
409 Err(TryDequeueError::Busy | TryDequeueError::Empty) => {
410 tracing::trace!("scheduler queue {:?} busy or empty", self.run_queue);
411 break;
412 }
413 };
414
415 self.queued.fetch_sub(1, Ordering::SeqCst);
416
417 let _span = tracing::trace_span!(
418 "poll",
419 task.addr = ?task.header_ptr(),
420 task.tid = task.id().as_u64(),
421 )
422 .entered();
423
424 // store the currently polled task in the `current_task` pointer.
425 // using `TaskRef::as_ptr` is safe here, since we will clear the
426 // `current_task` pointer before dropping the `TaskRef`.
427 self.current_task
428 .store(task.header_ptr().as_ptr(), Ordering::Release);
429
430 // poll the task
431 let poll_result = task.poll();
432
433 // clear the current task cell before potentially dropping the
434 // `TaskRef`.
435 self.current_task.store(ptr::null_mut(), Ordering::Release);
436
437 tick.polled += 1;
438 match poll_result {
439 PollResult::Ready | PollResult::ReadyJoined => {
440 #[cfg(feature = "counters")]
441 {
442 tick.completed += 1;
443 }
444 }
445 PollResult::PendingSchedule => {
446 self.schedule(task);
447 #[cfg(feature = "counters")]
448 {
449 tick.woken_internal += 1;
450 }
451 }
452 PollResult::Pending => {}
453 }
454
455 #[cfg(not(feature = "counters"))]
456 tracing::trace!(poll = ?poll_result, tick.polled);
457 #[cfg(feature = "counters")]
458 tracing::trace!(poll = ?poll_result, tick.polled, tick.completed);
459 }
460
461 #[cfg(feature = "counters")]
462 {
463 tick.spawned = self.spawned.swap(0, Ordering::Relaxed);
464 tick.woken_external = self.woken.swap(0, Ordering::Relaxed);
465 }
466
467 // are there still tasks in the queue? if so, we have more tasks to poll.
468 if self.queued.load(Ordering::SeqCst) > 0 {
469 tick.has_remaining = true;
470 }
471
472 if tick.polled > 0 {
473 // log scheduler metrics.
474 #[cfg(not(feature = "counters"))]
475 tracing::trace!(tick.polled, tick.has_remaining);
476 #[cfg(feature = "counters")]
477 tracing::trace!(
478 tick.polled,
479 tick.has_remaining,
480 tick.completed,
481 tick.spawned,
482 tick.woken = tick.woken_external + tick.woken_internal,
483 tick.woken.external = tick.woken_external,
484 tick.woken.internal = tick.woken_internal
485 );
486 }
487
488 tick
489 }
490
491 fn try_steal(&self) -> Result<Stealer<'_>, TryStealError> {
492 Stealer::new(&self.run_queue, &self.queued)
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use core::hint::black_box;
499 use core::sync::atomic::AtomicBool;
500
501 use tracing_subscriber::EnvFilter;
502 use tracing_subscriber::util::SubscriberInitExt;
503
504 use super::*;
505 use crate::{loom, test_util};
506
507 async fn work() -> usize {
508 let val = 1 + 1;
509 crate::task::yield_now().await;
510 black_box(val)
511 }
512
513 #[test]
514 fn single_threaded() {
515 let _trace = tracing_subscriber::fmt()
516 .with_env_filter(EnvFilter::from_default_env())
517 .set_default();
518
519 loom::model(|| {
520 loom::lazy_static! {
521 static ref EXEC: Executor = Executor::new().unwrap();
522 static ref CALLED: AtomicBool = AtomicBool::new(false);
523 }
524
525 EXEC.try_spawn(async move {
526 work().await;
527 CALLED.store(true, Ordering::SeqCst);
528 EXEC.close();
529 })
530 .unwrap();
531
532 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)).unwrap();
533 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err(
534 "stopping the executor should always result in a Closed(()) error here",
535 );
536 assert!(CALLED.load(Ordering::SeqCst));
537 })
538 }
539
540 #[test]
541 fn multi_threaded() {
542 let _trace = tracing_subscriber::fmt()
543 .with_env_filter(EnvFilter::from_default_env())
544 .set_default();
545
546 loom::model(|| {
547 const NUM_THREADS: usize = 3;
548
549 loom::lazy_static! {
550 static ref EXEC: Executor = Executor::new().unwrap();
551 static ref CALLED: AtomicBool = AtomicBool::new(false);
552 }
553
554 EXEC.try_spawn(async move {
555 work().await;
556 CALLED.store(true, Ordering::SeqCst);
557 EXEC.close();
558 })
559 .unwrap();
560
561 let joins: Vec<_> = (0..NUM_THREADS)
562 .map(|_| {
563 loom::thread::spawn(move || {
564 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)).unwrap();
565
566 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err(
567 "stopping the executor should always result in a Closed(()) error here",
568 );
569 })
570 })
571 .collect();
572
573 for join in joins {
574 join.join().unwrap();
575 }
576 assert!(CALLED.load(Ordering::SeqCst));
577 });
578 }
579
580 #[test]
581 fn join_handle_cross_thread() {
582 let _trace = tracing_subscriber::fmt()
583 .with_env_filter(EnvFilter::from_default_env())
584 .set_default();
585
586 loom::model(|| {
587 loom::lazy_static! {
588 static ref EXEC: Executor = Executor::new().unwrap();
589 }
590
591 let (tx, rx) = loom::sync::mpsc::channel::<JoinHandle<u32, ()>>();
592
593 let h0 = loom::thread::spawn(move || {
594 let tid = loom::thread::current().id();
595 let mut worker = Worker::new(&EXEC, FastRand::from_seed(0)).unwrap();
596
597 let h = EXEC
598 .try_spawn(async move {
599 // make sure the task is actually polled on thread 0
600 assert_eq!(loom::thread::current().id(), tid);
601
602 crate::task::yield_now().await;
603
604 // make sure the task is actually polled on thread 0
605 assert_eq!(loom::thread::current().id(), tid);
606
607 42
608 })
609 .unwrap();
610
611 tx.send(h).unwrap();
612
613 test_util::block_on(worker.run(crate::future::pending::<()>())).expect_err(
614 "stopping the executor should always result in a Closed(()) error here",
615 );
616 });
617
618 let h1 = loom::thread::spawn(move || {
619 let h = rx.recv().unwrap();
620
621 let ret_code = test_util::block_on(h).unwrap();
622
623 assert_eq!(ret_code, 42);
624
625 EXEC.close();
626 });
627
628 h0.join().unwrap();
629 h1.join().unwrap();
630 });
631 }
632}