Realtime safe, waitfree, concurrency library
1// SPDX-FileCopyrightText: 2025 Lucas Baumann
2// SPDX-FileCopyrightText: Lucas Baumann
3//
4// SPDX-License-Identifier: Apache-2.0
5// SPDX-License-Identifier: MIT
6
7//! Simpler version of the left-right from Jon Gjengset library.
8//!
9//! Uses two copies of the value to allow doing small changes, while still allowing non-blocking reading.
10//! Writing can block, while reading doesn't.
11#![no_std]
12
13extern crate alloc;
14
15use core::{marker::PhantomData, ops::Deref, ptr::NonNull};
16
17use alloc::collections::vec_deque::VecDeque;
18
19mod shared;
20
21use shared::{Ptr, Shared};
22
23/// Should be implemented on structs that want to be shared with this library
24pub trait Absorb<O> {
25 /// has to be deterministic. Operations will be applied in the same order to both buffers
26 fn absorb(&mut self, operation: O);
27}
28
29/// Dropping the Reader isn't realtime safe, because if dropped after the Writer, it deallocates.
30/// Should only get dropped, when closing the real-time thread
31///
32/// Reader will be able to read data even if Writer has been dropped. Obviously that data won't change anymore
33/// When there is no Reader the Writer is able to create a new one. The other way around doesn't work.
34///
35/// Isn't Sync as there is no methos that takes &self, so it is useless anyways.
36#[derive(Debug)]
37pub struct Reader<T> {
38 shared: NonNull<Shared<T>>,
39 locked: bool,
40 /// for drop check
41 _own: PhantomData<Shared<T>>,
42}
43
44impl<T> Reader<T> {
45 fn shared_ref(&self) -> &Shared<T> {
46 // SAFETY: Reader always has a valid Shared<T>, a mut ref to a shared is never created,
47 // only to the UnsafeCell<T>s inside of it
48 unsafe { self.shared.as_ref() }
49 }
50
51 /// this function never blocks. (`fetch_update` loop doesn't count)
52 pub fn lock(&mut self) -> ReadGuard<'_, T> {
53 if self.locked {
54 self.locked = false;
55 panic!("ReadGuard was forgotten");
56 }
57 self.locked = true;
58 // SAFETY: value just locked
59 let value = unsafe { &*self.shared_ref().lock_read().get() };
60 ReadGuard {
61 value,
62 reader: self,
63 }
64 }
65}
66
67/// SAFETY: Owns a T
68unsafe impl<T: Send> Send for Reader<T> {}
69
70impl<T> Drop for Reader<T> {
71 fn drop(&mut self) {
72 // SAFETY: self.shared is valid and not used after this.
73 unsafe { Shared::drop(self.shared) };
74 assert!(!self.locked, "ReadGuard was forgotten");
75 }
76}
77
78/// Data won't change while holding the Guard. This also means the Writer can only issue one swap, while Guard is being held
79/// If T: !Sync this is guaranteed to be the only ref to this T
80///
81/// Doesn't implement Clone as that would require refcounting to know when to unlock.
82#[derive(Debug)]
83pub struct ReadGuard<'a, T> {
84 reader: &'a mut Reader<T>,
85 value: &'a T,
86}
87
88impl<T> Deref for ReadGuard<'_, T> {
89 type Target = T;
90
91 fn deref(&self) -> &Self::Target {
92 self.value
93 }
94}
95
96impl<T, E> AsRef<E> for ReadGuard<'_, T>
97where
98 E: ?Sized,
99 T: AsRef<E>,
100{
101 fn as_ref(&self) -> &E {
102 self.deref().as_ref()
103 }
104}
105
106impl<T> Drop for ReadGuard<'_, T> {
107 fn drop(&mut self) {
108 // release the read lock
109 self.reader.shared_ref().release_read_lock();
110 self.reader.locked = false;
111 }
112}
113
114/// Not realtime safe object which can change the internal T value.
115#[derive(Debug)]
116pub struct Writer<T, O> {
117 shared: NonNull<Shared<T>>,
118 // sets which buffer the next write is applied to
119 // write_ptr doesn't need to be Atomics as it only changes, when the Writer itself swaps
120 write_ptr: Ptr,
121 // buffer is pushed at the back and popped at the front.
122 op_buffer: VecDeque<O>,
123 locked: bool,
124 // needed for drop_check
125 _own: PhantomData<Shared<T>>,
126}
127
128impl<T, O> Writer<T, O> {
129 fn shared_ref(&self) -> &Shared<T> {
130 // SAFETY: Reader always has a valid Shared<T>, the only possibility to get a &mut Shared requires &mut self
131 unsafe { self.shared.as_ref() }
132 }
133
134 /// if no Reader exists this gives a mut ref to Shared.
135 fn shared_mut(&mut self) -> Option<&mut Shared<T>> {
136 self.shared_ref()
137 .is_unique()
138 // SAFETY: No `Reader` exists, as `is_unique` returns true
139 .then(|| unsafe { &mut *self.shared.as_ptr() })
140 }
141
142 /// swaps the read and write values. If no changes were made since the last swap nothing happens. Never blocks
143 /// not public as swapping without creating a before `WriteGuard` is pretty useless
144 fn swap(&mut self) {
145 if self.op_buffer.is_empty() {
146 return;
147 }
148
149 self.shared_ref().set_read_ptr(self.write_ptr);
150
151 self.write_ptr.switch();
152 }
153
154 /// get a Reader if none exists
155 pub fn build_reader(&mut self) -> Option<Reader<T>> {
156 let shared_ref = self.shared_ref();
157 // SAFETY: all is_unique_with_increase requirements are satisfied.
158 unsafe {
159 shared_ref.is_unique().then(|| {
160 shared_ref.set_shared();
161 Reader {
162 shared: self.shared,
163 _own: PhantomData,
164 locked: false,
165 }
166 })
167 }
168 }
169}
170
171impl<T: Absorb<O>, O> Writer<T, O> {
172 /// doesn't block. Returns None if the Reader has a `ReadGuard` pointing to the old value.
173 pub fn try_lock(&mut self) -> Option<WriteGuard<'_, T, O>> {
174 if self.locked {
175 self.locked = false;
176 panic!("WriteGuard was forgotten");
177 }
178 self.shared_ref()
179 .lock_write(self.write_ptr)
180 .ok()
181 // locking was successful
182 .map(|()| {
183 self.locked = true;
184 let mut guard = WriteGuard { writer: self };
185 while let Some(operation) = guard.writer.op_buffer.pop_front() {
186 guard.get_data_mut().absorb(operation);
187 }
188 guard
189 })
190 }
191}
192
193impl<T: Clone, O> Writer<T, O> {
194 /// Creates a new Writer by cloning the value once to get two values
195 /// `T::clone()` shoulnd't give a different value, as that would make this library pretty useless
196 pub fn new(value: T) -> Self {
197 let (shared, write_ptr) = Shared::new(value, |value_1| value_1.clone());
198 Self {
199 shared,
200 write_ptr,
201 op_buffer: VecDeque::new(),
202 _own: PhantomData,
203 locked: false,
204 }
205 }
206}
207
208impl<T: Default, O> Default for Writer<T, O> {
209 /// Creates a new Writer by calling `T::default()` twice to create the two values
210 ///
211 /// Default impl of T needs to give the same result every time. Not upholding this doens't lead to UB, but turns the library basically useless
212 fn default() -> Self {
213 let (shared, write_ptr) = Shared::new(T::default(), |_| T::default());
214 Self {
215 shared,
216 write_ptr,
217 op_buffer: VecDeque::new(),
218 _own: PhantomData,
219 locked: false,
220 }
221 }
222}
223
224impl<T: Sync, O> Writer<T, O> {
225 /// The Value returned may be newer than the version the reader is currently seeing.
226 /// This value will be written to next.
227 /// If this is called after swapping the write_lock this will return an older value.
228 /// To get the newest value lock the writer and call `Writeguard::read`.
229 ///
230 /// Needs T: Sync because maybe this is the value the reader is curently reading
231 pub fn read(&self) -> &T {
232 // SAFETY: Only the WriteGuard can write to the values / create mut refs to them.
233 // The WriteGuard holds a mut ref to the writer so this function can't be called while a writeguard exists
234 // This means that reading them / creating refs is safe to do
235 unsafe { self.shared_ref().get_value_ref(self.write_ptr) }
236 }
237}
238
239/// SAFETY: owns T and O
240unsafe impl<T: Send, O: Send> Send for Writer<T, O> {}
241/// SAFETY: &self fn can only create a &T and never gives shared access to O
242unsafe impl<T: Sync, O> Sync for Writer<T, O> {}
243
244impl<T, O> Drop for Writer<T, O> {
245 fn drop(&mut self) {
246 // SAFETY: self.shared is valid and not used after this.
247 unsafe { Shared::drop(self.shared) };
248 assert!(!self.locked, "WriteGuard was forgotten");
249 }
250}
251
252// Don't create a WriteGuard directly, as that wouldn't sync with old Operations
253/// Can be used to write to the Data structure.
254///
255/// When this structure exists the Reader already switched to the other value
256///
257/// Dropping this makes all changes available to the Reader.
258#[derive(Debug)]
259pub struct WriteGuard<'a, T, O> {
260 // can't hold a mut ref to T, as then it wouldn't be possible to write to both at the same time,
261 // which is an optimization i want to keep.
262 writer: &'a mut Writer<T, O>,
263}
264
265impl<T, O> WriteGuard<'_, T, O> {
266 /// Makes the changes available to the reader. Equivalent to `std::mem::drop(self)`
267 pub fn swap(self) {}
268
269 /// Gets the value currently being written to.
270 pub fn read(&self) -> &T {
271 // SAFETY: Only the WriteGuard can write to the values / create mut refs to them.
272 // The WriteGuard holds a mut ref to the writer so this function can't be called while a writeguard exists
273 // This means that reading them / creating refs is safe to do
274 unsafe {
275 self.writer
276 .shared_ref()
277 .get_value_ref(self.writer.write_ptr)
278 }
279 }
280
281 /// Isn't public as this could easily create disconnects between the two versions.
282 /// While that wouldn't lead to UB it goes against the purpose of this library
283 fn get_data_mut(&mut self) -> &mut T {
284 // SAFETY: When creating the writeguad it is checked that the reader doesnt have access to the same data
285 // This function requires &mut self so there also isn't any ref created by writeguard.
286 unsafe {
287 &mut *self
288 .writer
289 .shared_ref()
290 .get_value(self.writer.write_ptr)
291 .get()
292 }
293 }
294}
295
296impl<T: Absorb<O>, O: Clone> WriteGuard<'_, T, O> {
297 /// applies operation to the current write Value and stores it to apply to the other later.
298 /// If there is no reader the operation is applied to both values immediately and not stored.
299 pub fn apply_op(&mut self, operation: O) {
300 if let Some(shared) = self.writer.shared_mut() {
301 shared.value_1.get_mut().absorb(operation.clone());
302 shared.value_2.get_mut().absorb(operation);
303 } else {
304 self.writer.op_buffer.push_back(operation.clone());
305 self.get_data_mut().absorb(operation);
306 }
307 }
308}
309
310impl<T, O> Drop for WriteGuard<'_, T, O> {
311 fn drop(&mut self) {
312 self.writer.swap();
313 self.writer.locked = false;
314 }
315}
316
317#[cfg(test)]
318mod internal_test {
319 use core::cell::Cell;
320
321 use crate::{Absorb, Writer};
322
323 #[derive(Clone, Copy, Debug)]
324 pub struct CounterAddOp(i32);
325
326 impl Absorb<CounterAddOp> for i32 {
327 fn absorb(&mut self, operation: CounterAddOp) {
328 *self += operation.0;
329 }
330 }
331
332 impl Absorb<CounterAddOp> for Cell<i32> {
333 fn absorb(&mut self, operation: CounterAddOp) {
334 self.set(self.get() + operation.0);
335 }
336 }
337
338 #[test]
339 fn drop_reader() {
340 let mut writer: Writer<i32, CounterAddOp> = Writer::default();
341 let reader = writer.build_reader().unwrap();
342
343 assert!(!writer.shared_ref().is_unique());
344 drop(reader);
345 assert!(writer.shared_ref().is_unique());
346 }
347
348 #[test]
349 fn drop_writer() {
350 let mut writer: Writer<i32, CounterAddOp> = Writer::default();
351 let reader = writer.build_reader().unwrap();
352
353 assert!(!reader.shared_ref().is_unique());
354 drop(writer);
355 assert!(reader.shared_ref().is_unique());
356 }
357}