Next Generation WASM Microkernel Operating System
1// Copyright 2025 Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use crate::executor::Scheduler;
9use crate::loom::sync::atomic::{AtomicUsize, Ordering};
10use crate::task::{Header, Task, TaskRef};
11use alloc::boxed::Box;
12use cordyceps::{MpscQueue, mpsc_queue};
13use core::fmt::Debug;
14use core::num::NonZeroUsize;
15
16#[derive(Debug, Clone, Eq, PartialEq)]
17#[non_exhaustive]
18pub enum TryStealError {
19 /// Tasks could not be stolen because the targeted queue already has a
20 /// consumer.
21 Busy,
22 /// No tasks were available to steal.
23 Empty,
24}
25
26#[derive(Debug)]
27pub struct Injector {
28 run_queue: MpscQueue<Header>,
29 queued: AtomicUsize,
30}
31
32impl Default for Injector {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl Injector {
39 pub fn new() -> Self {
40 let stub_task = Box::new(Task::new_stub());
41 let (stub_task, _) = TaskRef::new_allocated(stub_task);
42
43 Self {
44 run_queue: MpscQueue::new_with_stub(stub_task),
45 queued: AtomicUsize::new(0),
46 }
47 }
48
49 /// Attempt to steal from this `Injector`, the returned [`Stealer`] will grant exclusive access to
50 /// steal from the `Injector` until it is dropped.
51 ///
52 /// # Errors
53 ///
54 /// When stealing from the target is not possible, either because its queue is *empty*
55 /// or because there is *already an active stealer*, an error is returned.
56 pub fn try_steal(&self) -> Result<Stealer, TryStealError> {
57 Stealer::new(&self.run_queue, &self.queued)
58 }
59
60 pub fn push_task(&self, task: TaskRef) {
61 self.queued.fetch_add(1, Ordering::SeqCst);
62 self.run_queue.enqueue(task);
63 }
64}
65
66pub struct Stealer<'queue> {
67 queue: mpsc_queue::Consumer<'queue, Header>,
68 tasks: &'queue AtomicUsize,
69 /// The initial task count in the target queue when this `Stealer` was created.
70 task_snapshot: NonZeroUsize,
71}
72
73impl<'queue> Stealer<'queue> {
74 pub(crate) fn new(
75 queue: &'queue MpscQueue<Header>,
76 tasks: &'queue AtomicUsize,
77 ) -> Result<Self, TryStealError> {
78 let queue = queue.try_consume().ok_or(TryStealError::Busy)?;
79
80 let task_snapshot = tasks.load(Ordering::SeqCst);
81 let Some(task_snapshot) = NonZeroUsize::new(task_snapshot) else {
82 return Err(TryStealError::Empty);
83 };
84
85 Ok(Self {
86 queue,
87 tasks,
88 task_snapshot,
89 })
90 }
91
92 /// Steal a task from the queue and spawn it on the provided
93 /// `scheduler`. Returns `true` when a task got successfully stolen
94 /// and `false` if queue was empty.
95 pub fn spawn_one(&self, scheduler: &'static Scheduler) -> bool {
96 let Some(task) = self.queue.dequeue() else {
97 return false;
98 };
99
100 tracing::trace!(?task, "stole");
101
102 // decrement the target queue's task count
103 self.tasks.fetch_sub(1, Ordering::SeqCst);
104
105 // we're moving the task to a different scheduler so we need to
106 // bind to it
107 task.bind_scheduler(scheduler);
108
109 scheduler.schedule(task);
110
111 true
112 }
113
114 /// Steal up to `max` task from the queue and spawn them on the provided
115 /// `scheduler`.
116 ///
117 /// Note this will always steal at least one task.
118 pub fn spawn_n(&self, core: &'static Scheduler, max: usize) -> usize {
119 let mut stolen = 0;
120 while stolen <= max && self.spawn_one(core) {
121 stolen += 1;
122 }
123 stolen
124 }
125
126 /// Steal half the tasks in the current queue and spawn them on the provided
127 /// `scheduler`.
128 ///
129 /// Note this will always steal at least one task.
130 pub fn spawn_half(&self, core: &'static Scheduler) -> usize {
131 let max = self.task_snapshot.get().div_ceil(2);
132 self.spawn_n(core, max)
133 }
134}