Next Generation WASM Microkernel Operating System
1// Copyright 2025 Jonas Kruckenberg
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use core::alloc::AllocError;
9use core::fmt::Debug;
10use core::num::NonZeroUsize;
11
12use cordyceps::{MpscQueue, mpsc_queue};
13
14use crate::executor::Scheduler;
15use crate::loom::sync::atomic::{AtomicUsize, Ordering};
16use crate::task::{Header, TaskRef};
17
18#[derive(Debug, Clone, Eq, PartialEq)]
19#[non_exhaustive]
20pub enum TryStealError {
21 /// Tasks could not be stolen because the targeted queue already has a
22 /// consumer.
23 Busy,
24 /// No tasks were available to steal.
25 Empty,
26}
27
28#[derive(Debug)]
29pub struct Injector {
30 run_queue: MpscQueue<Header>,
31 queued: AtomicUsize,
32}
33
34impl Injector {
35 pub fn new() -> Result<Self, AllocError> {
36 let stub_task = TaskRef::new_stub()?;
37
38 Ok(Self {
39 run_queue: MpscQueue::new_with_stub(stub_task),
40 queued: AtomicUsize::new(0),
41 })
42 }
43
44 /// Attempt to steal from this `Injector`, the returned [`Stealer`] will grant exclusive access to
45 /// steal from the `Injector` until it is dropped.
46 ///
47 /// # Errors
48 ///
49 /// When stealing from the target is not possible, either because its queue is *empty*
50 /// or because there is *already an active stealer*, an error is returned.
51 pub fn try_steal(&self) -> Result<Stealer<'_>, TryStealError> {
52 Stealer::new(&self.run_queue, &self.queued)
53 }
54
55 pub fn push_task(&self, task: TaskRef) {
56 self.queued.fetch_add(1, Ordering::SeqCst);
57 self.run_queue.enqueue(task);
58 }
59}
60
61pub struct Stealer<'queue> {
62 queue: mpsc_queue::Consumer<'queue, Header>,
63 tasks: &'queue AtomicUsize,
64 /// The initial task count in the target queue when this `Stealer` was created.
65 task_snapshot: NonZeroUsize,
66}
67
68impl<'queue> Stealer<'queue> {
69 pub(crate) fn new(
70 queue: &'queue MpscQueue<Header>,
71 tasks: &'queue AtomicUsize,
72 ) -> Result<Self, TryStealError> {
73 let queue = queue.try_consume().ok_or(TryStealError::Busy)?;
74
75 let task_snapshot = tasks.load(Ordering::SeqCst);
76 let Some(task_snapshot) = NonZeroUsize::new(task_snapshot) else {
77 return Err(TryStealError::Empty);
78 };
79
80 Ok(Self {
81 queue,
82 tasks,
83 task_snapshot,
84 })
85 }
86
87 /// Steal a task from the queue and spawn it on the provided
88 /// `scheduler`. Returns `true` when a task got successfully stolen
89 /// and `false` if queue was empty.
90 pub fn spawn_one(&self, scheduler: &'static Scheduler) -> bool {
91 let Some(task) = self.queue.dequeue() else {
92 return false;
93 };
94
95 tracing::trace!(?task, "stole");
96
97 // decrement the target queue's task count
98 self.tasks.fetch_sub(1, Ordering::SeqCst);
99
100 // we're moving the task to a different scheduler so we need to
101 // bind to it
102 task.bind_scheduler(scheduler);
103
104 scheduler.schedule(task);
105
106 true
107 }
108
109 /// Steal up to `max` task from the queue and spawn them on the provided
110 /// `scheduler`.
111 ///
112 /// Note this will always steal at least one task.
113 pub fn spawn_n(&self, core: &'static Scheduler, max: usize) -> usize {
114 let mut stolen = 0;
115 while stolen <= max && self.spawn_one(core) {
116 stolen += 1;
117 }
118 stolen
119 }
120
121 /// Steal half the tasks in the current queue and spawn them on the provided
122 /// `scheduler`.
123 ///
124 /// Note this will always steal at least one task.
125 pub fn spawn_half(&self, core: &'static Scheduler) -> usize {
126 let max = self.task_snapshot.get().div_ceil(2);
127 self.spawn_n(core, max)
128 }
129}