Realtime safe, waitfree, concurrency library
at main 12 kB view raw
1// SPDX-FileCopyrightText: 2025 Lucas Baumann 2// SPDX-FileCopyrightText: Lucas Baumann 3// 4// SPDX-License-Identifier: Apache-2.0 5// SPDX-License-Identifier: MIT 6 7//! Simpler version of the left-right from Jon Gjengset library. 8//! 9//! Uses two copies of the value to allow doing small changes, while still allowing non-blocking reading. 10//! Writing can block, while reading doesn't. 11#![no_std] 12 13extern crate alloc; 14 15use core::{marker::PhantomData, ops::Deref, ptr::NonNull}; 16 17use alloc::collections::vec_deque::VecDeque; 18 19mod shared; 20 21use shared::{Ptr, Shared}; 22 23/// Should be implemented on structs that want to be shared with this library 24pub trait Absorb<O> { 25 /// has to be deterministic. Operations will be applied in the same order to both buffers 26 fn absorb(&mut self, operation: O); 27} 28 29/// Dropping the Reader isn't realtime safe, because if dropped after the Writer, it deallocates. 30/// Should only get dropped, when closing the real-time thread 31/// 32/// Reader will be able to read data even if Writer has been dropped. Obviously that data won't change anymore 33/// When there is no Reader the Writer is able to create a new one. The other way around doesn't work. 34/// 35/// Isn't Sync as there is no methos that takes &self, so it is useless anyways. 36#[derive(Debug)] 37pub struct Reader<T> { 38 shared: NonNull<Shared<T>>, 39 locked: bool, 40 /// for drop check 41 _own: PhantomData<Shared<T>>, 42} 43 44impl<T> Reader<T> { 45 fn shared_ref(&self) -> &Shared<T> { 46 // SAFETY: Reader always has a valid Shared<T>, a mut ref to a shared is never created, 47 // only to the UnsafeCell<T>s inside of it 48 unsafe { self.shared.as_ref() } 49 } 50 51 /// this function never blocks. (`fetch_update` loop doesn't count) 52 pub fn lock(&mut self) -> ReadGuard<'_, T> { 53 if self.locked { 54 self.locked = false; 55 panic!("ReadGuard was forgotten"); 56 } 57 self.locked = true; 58 // SAFETY: value just locked 59 let value = unsafe { &*self.shared_ref().lock_read().get() }; 60 ReadGuard { 61 value, 62 reader: self, 63 } 64 } 65} 66 67/// SAFETY: Owns a T 68unsafe impl<T: Send> Send for Reader<T> {} 69 70impl<T> Drop for Reader<T> { 71 fn drop(&mut self) { 72 // SAFETY: self.shared is valid and not used after this. 73 unsafe { Shared::drop(self.shared) }; 74 assert!(!self.locked, "ReadGuard was forgotten"); 75 } 76} 77 78/// Data won't change while holding the Guard. This also means the Writer can only issue one swap, while Guard is being held 79/// If T: !Sync this is guaranteed to be the only ref to this T 80/// 81/// Doesn't implement Clone as that would require refcounting to know when to unlock. 82#[derive(Debug)] 83pub struct ReadGuard<'a, T> { 84 reader: &'a mut Reader<T>, 85 value: &'a T, 86} 87 88impl<T> Deref for ReadGuard<'_, T> { 89 type Target = T; 90 91 fn deref(&self) -> &Self::Target { 92 self.value 93 } 94} 95 96impl<T, E> AsRef<E> for ReadGuard<'_, T> 97where 98 E: ?Sized, 99 T: AsRef<E>, 100{ 101 fn as_ref(&self) -> &E { 102 self.deref().as_ref() 103 } 104} 105 106impl<T> Drop for ReadGuard<'_, T> { 107 fn drop(&mut self) { 108 // release the read lock 109 self.reader.shared_ref().release_read_lock(); 110 self.reader.locked = false; 111 } 112} 113 114/// Not realtime safe object which can change the internal T value. 115#[derive(Debug)] 116pub struct Writer<T, O> { 117 shared: NonNull<Shared<T>>, 118 // sets which buffer the next write is applied to 119 // write_ptr doesn't need to be Atomics as it only changes, when the Writer itself swaps 120 write_ptr: Ptr, 121 // buffer is pushed at the back and popped at the front. 122 op_buffer: VecDeque<O>, 123 locked: bool, 124 // needed for drop_check 125 _own: PhantomData<Shared<T>>, 126} 127 128impl<T, O> Writer<T, O> { 129 fn shared_ref(&self) -> &Shared<T> { 130 // SAFETY: Reader always has a valid Shared<T>, the only possibility to get a &mut Shared requires &mut self 131 unsafe { self.shared.as_ref() } 132 } 133 134 /// if no Reader exists this gives a mut ref to Shared. 135 fn shared_mut(&mut self) -> Option<&mut Shared<T>> { 136 self.shared_ref() 137 .is_unique() 138 // SAFETY: No `Reader` exists, as `is_unique` returns true 139 .then(|| unsafe { &mut *self.shared.as_ptr() }) 140 } 141 142 /// swaps the read and write values. If no changes were made since the last swap nothing happens. Never blocks 143 /// not public as swapping without creating a before `WriteGuard` is pretty useless 144 fn swap(&mut self) { 145 if self.op_buffer.is_empty() { 146 return; 147 } 148 149 self.shared_ref().set_read_ptr(self.write_ptr); 150 151 self.write_ptr.switch(); 152 } 153 154 /// get a Reader if none exists 155 pub fn build_reader(&mut self) -> Option<Reader<T>> { 156 let shared_ref = self.shared_ref(); 157 // SAFETY: all is_unique_with_increase requirements are satisfied. 158 unsafe { 159 shared_ref.is_unique().then(|| { 160 shared_ref.set_shared(); 161 Reader { 162 shared: self.shared, 163 _own: PhantomData, 164 locked: false, 165 } 166 }) 167 } 168 } 169} 170 171impl<T: Absorb<O>, O> Writer<T, O> { 172 /// doesn't block. Returns None if the Reader has a `ReadGuard` pointing to the old value. 173 pub fn try_lock(&mut self) -> Option<WriteGuard<'_, T, O>> { 174 if self.locked { 175 self.locked = false; 176 panic!("WriteGuard was forgotten"); 177 } 178 self.shared_ref() 179 .lock_write(self.write_ptr) 180 .ok() 181 // locking was successful 182 .map(|()| { 183 self.locked = true; 184 let mut guard = WriteGuard { writer: self }; 185 while let Some(operation) = guard.writer.op_buffer.pop_front() { 186 guard.get_data_mut().absorb(operation); 187 } 188 guard 189 }) 190 } 191} 192 193impl<T: Clone, O> Writer<T, O> { 194 /// Creates a new Writer by cloning the value once to get two values 195 /// `T::clone()` shoulnd't give a different value, as that would make this library pretty useless 196 pub fn new(value: T) -> Self { 197 let (shared, write_ptr) = Shared::new(value, |value_1| value_1.clone()); 198 Self { 199 shared, 200 write_ptr, 201 op_buffer: VecDeque::new(), 202 _own: PhantomData, 203 locked: false, 204 } 205 } 206} 207 208impl<T: Default, O> Default for Writer<T, O> { 209 /// Creates a new Writer by calling `T::default()` twice to create the two values 210 /// 211 /// Default impl of T needs to give the same result every time. Not upholding this doens't lead to UB, but turns the library basically useless 212 fn default() -> Self { 213 let (shared, write_ptr) = Shared::new(T::default(), |_| T::default()); 214 Self { 215 shared, 216 write_ptr, 217 op_buffer: VecDeque::new(), 218 _own: PhantomData, 219 locked: false, 220 } 221 } 222} 223 224impl<T: Sync, O> Writer<T, O> { 225 /// The Value returned may be newer than the version the reader is currently seeing. 226 /// This value will be written to next. 227 /// If this is called after swapping the write_lock this will return an older value. 228 /// To get the newest value lock the writer and call `Writeguard::read`. 229 /// 230 /// Needs T: Sync because maybe this is the value the reader is curently reading 231 pub fn read(&self) -> &T { 232 // SAFETY: Only the WriteGuard can write to the values / create mut refs to them. 233 // The WriteGuard holds a mut ref to the writer so this function can't be called while a writeguard exists 234 // This means that reading them / creating refs is safe to do 235 unsafe { self.shared_ref().get_value_ref(self.write_ptr) } 236 } 237} 238 239/// SAFETY: owns T and O 240unsafe impl<T: Send, O: Send> Send for Writer<T, O> {} 241/// SAFETY: &self fn can only create a &T and never gives shared access to O 242unsafe impl<T: Sync, O> Sync for Writer<T, O> {} 243 244impl<T, O> Drop for Writer<T, O> { 245 fn drop(&mut self) { 246 // SAFETY: self.shared is valid and not used after this. 247 unsafe { Shared::drop(self.shared) }; 248 assert!(!self.locked, "WriteGuard was forgotten"); 249 } 250} 251 252// Don't create a WriteGuard directly, as that wouldn't sync with old Operations 253/// Can be used to write to the Data structure. 254/// 255/// When this structure exists the Reader already switched to the other value 256/// 257/// Dropping this makes all changes available to the Reader. 258#[derive(Debug)] 259pub struct WriteGuard<'a, T, O> { 260 // can't hold a mut ref to T, as then it wouldn't be possible to write to both at the same time, 261 // which is an optimization i want to keep. 262 writer: &'a mut Writer<T, O>, 263} 264 265impl<T, O> WriteGuard<'_, T, O> { 266 /// Makes the changes available to the reader. Equivalent to `std::mem::drop(self)` 267 pub fn swap(self) {} 268 269 /// Gets the value currently being written to. 270 pub fn read(&self) -> &T { 271 // SAFETY: Only the WriteGuard can write to the values / create mut refs to them. 272 // The WriteGuard holds a mut ref to the writer so this function can't be called while a writeguard exists 273 // This means that reading them / creating refs is safe to do 274 unsafe { 275 self.writer 276 .shared_ref() 277 .get_value_ref(self.writer.write_ptr) 278 } 279 } 280 281 /// Isn't public as this could easily create disconnects between the two versions. 282 /// While that wouldn't lead to UB it goes against the purpose of this library 283 fn get_data_mut(&mut self) -> &mut T { 284 // SAFETY: When creating the writeguad it is checked that the reader doesnt have access to the same data 285 // This function requires &mut self so there also isn't any ref created by writeguard. 286 unsafe { 287 &mut *self 288 .writer 289 .shared_ref() 290 .get_value(self.writer.write_ptr) 291 .get() 292 } 293 } 294} 295 296impl<T: Absorb<O>, O: Clone> WriteGuard<'_, T, O> { 297 /// applies operation to the current write Value and stores it to apply to the other later. 298 /// If there is no reader the operation is applied to both values immediately and not stored. 299 pub fn apply_op(&mut self, operation: O) { 300 if let Some(shared) = self.writer.shared_mut() { 301 shared.value_1.get_mut().absorb(operation.clone()); 302 shared.value_2.get_mut().absorb(operation); 303 } else { 304 self.writer.op_buffer.push_back(operation.clone()); 305 self.get_data_mut().absorb(operation); 306 } 307 } 308} 309 310impl<T, O> Drop for WriteGuard<'_, T, O> { 311 fn drop(&mut self) { 312 self.writer.swap(); 313 self.writer.locked = false; 314 } 315} 316 317#[cfg(test)] 318mod internal_test { 319 use core::cell::Cell; 320 321 use crate::{Absorb, Writer}; 322 323 #[derive(Clone, Copy, Debug)] 324 pub struct CounterAddOp(i32); 325 326 impl Absorb<CounterAddOp> for i32 { 327 fn absorb(&mut self, operation: CounterAddOp) { 328 *self += operation.0; 329 } 330 } 331 332 impl Absorb<CounterAddOp> for Cell<i32> { 333 fn absorb(&mut self, operation: CounterAddOp) { 334 self.set(self.get() + operation.0); 335 } 336 } 337 338 #[test] 339 fn drop_reader() { 340 let mut writer: Writer<i32, CounterAddOp> = Writer::default(); 341 let reader = writer.build_reader().unwrap(); 342 343 assert!(!writer.shared_ref().is_unique()); 344 drop(reader); 345 assert!(writer.shared_ref().is_unique()); 346 } 347 348 #[test] 349 fn drop_writer() { 350 let mut writer: Writer<i32, CounterAddOp> = Writer::default(); 351 let reader = writer.build_reader().unwrap(); 352 353 assert!(!reader.shared_ref().is_unique()); 354 drop(writer); 355 assert!(reader.shared_ref().is_unique()); 356 } 357}