concurrent, realtime write data structure.
at main 10 kB view raw
1use core::{ 2 cell::UnsafeCell, 3 ptr::NonNull, 4 sync::atomic::{self, AtomicU8, Ordering}, 5}; 6 7use alloc::boxed::Box; 8 9#[derive(Clone, Copy, PartialEq, Eq)] 10pub enum Ptr { 11 Value1, 12 Value2, 13} 14 15impl Ptr { 16 fn other(self) -> Self { 17 match self { 18 Ptr::Value1 => Ptr::Value2, 19 Ptr::Value2 => Ptr::Value1, 20 } 21 } 22} 23 24#[derive(Clone, Copy)] 25struct State(u8); 26 27impl State { 28 /// - Unique 29 /// - no read 30 /// - new read allowed 31 /// - should read value 0 32 const INITIAL: u8 = 0b01000; 33 34 const CURRENT_READ_MASK: u8 = 0b00110; 35 const INV_CURRENT_READ_MASK: u8 = 0b11001; 36 const UNIQUE_MASK: u8 = 0b00001; 37 const INV_UNIQUE_MASK: u8 = 0b11110; 38 39 const NEXT_READ: u8 = 0b10000; 40 const INV_NEXT_READ: u8 = 0b01111; 41 const READ_ALLOWED: u8 = 0b01000; 42 const INV_READ_ALLOWED: u8 = 0b10111; 43 44 fn new(value: u8) -> Self { 45 // could also be assert unchecked 46 debug_assert_eq!( 47 value & 0b1110_0000, 48 0, 49 "only the lower 5 bits should be used. value: {value:b}" 50 ); 51 debug_assert!( 52 (value & Self::CURRENT_READ_MASK).count_ones() <= 1, 53 "max 1 read. value: {value:b}" 54 ); 55 Self(value) 56 } 57 58 // returns None if no read is allowed 59 fn get_next_read(self) -> Option<Ptr> { 60 // is reading allowed 61 if self.0 & Self::READ_ALLOWED == 0 { 62 None 63 } else { 64 // which value should be read 65 if self.0 & Self::NEXT_READ == 0 { 66 Some(Ptr::Value1) 67 } else { 68 Some(Ptr::Value2) 69 } 70 } 71 } 72 73 fn set_current_read(self, ptr: Ptr) -> Self { 74 debug_assert!( 75 self.0 & Self::CURRENT_READ_MASK == 0, 76 "no read should be active. value: {:b}", 77 self.0 78 ); 79 80 let mask = match ptr { 81 Ptr::Value1 => 0b10, 82 Ptr::Value2 => 0b100, 83 }; 84 85 Self(self.0 | mask) 86 } 87 88 // none if there is no current read 89 fn get_current_read(self) -> Option<Ptr> { 90 match self.0 & Self::CURRENT_READ_MASK { 91 0b000 => None, 92 0b010 => Some(Ptr::Value1), 93 0b100 => Some(Ptr::Value2), 94 _ => unreachable!(), 95 } 96 } 97 98 fn is_unique(self) -> bool { 99 self.0 & Self::UNIQUE_MASK == 0 100 } 101 102 fn disallow_read(self) -> Self { 103 Self(self.0 & Self::INV_READ_ALLOWED) 104 } 105} 106 107pub struct Shared<T> { 108 /// # Bits from low to high 109 /// | bit | meaning | 110 /// |---|---| 111 /// | 0 | 0: unique, 1: second object exists | 112 /// | 1 | is value 1 being read (never both) | 113 /// | 2 | is value 2 being read (never both) | 114 /// | 3 | new read allowed | 115 /// | 4 | which value should be read (0: value1, 1: value2) | 116 state: AtomicU8, 117 value_1: UnsafeCell<T>, 118 value_2: UnsafeCell<T>, 119} 120 121impl<T: Clone> Shared<T> { 122 pub fn new(value: T) -> NonNull<Shared<T>> { 123 NonNull::new(Box::into_raw(Box::new(Shared { 124 state: AtomicU8::new(State::INITIAL), 125 value_1: UnsafeCell::new(value.clone()), 126 value_2: UnsafeCell::new(value), 127 }))) 128 .unwrap() 129 } 130} 131 132impl<T: Default> Shared<T> { 133 pub fn new_default() -> NonNull<Shared<T>> { 134 NonNull::new(Box::into_raw(Box::new(Shared { 135 state: AtomicU8::new(State::INITIAL), 136 value_1: UnsafeCell::new(T::default()), 137 value_2: UnsafeCell::new(T::default()), 138 }))) 139 .unwrap() 140 } 141} 142 143impl<T> Shared<T> { 144 // returns a unsafecell to allow some lifetime stuff 145 pub fn try_read(&self) -> Option<&UnsafeCell<T>> { 146 let result = self 147 .state 148 .fetch_update(Ordering::Acquire, Ordering::Relaxed, |value| { 149 let state = State::new(value); 150 let read = state.get_next_read(); 151 read.map(|p| state.set_current_read(p).0) 152 }); 153 match result { 154 // read is allowed. could be unwrap unchecked 155 Ok(v) => { 156 // unwrap ok, because Ok returned. could also be unchecked 157 let ptr = State::new(v).get_next_read().unwrap(); 158 // SAFETY: atomic was just locked and the read state was set. 159 Some(match ptr { 160 Ptr::Value1 => &self.value_1, 161 Ptr::Value2 => &self.value_2, 162 }) 163 } 164 // read isn't allowed 165 Err(_) => None, 166 } 167 } 168 169 /// The Option is None if the reader is blocked by this write. This can happen if the reader is keeping 170 /// a readlock for a long time. If it is None the writelock doesn't need to set the next_read bit when 171 /// unlocking. It only needs to remove the no_read bit. 172 /// 173 /// If this is Some the reader isn't blocked. The writelock needs to set the next_read bit to the Ptr value 174 pub unsafe fn write(&self) -> (&UnsafeCell<T>, Option<Ptr>) { 175 let result = self 176 .state 177 .fetch_update(Ordering::Acquire, Ordering::Acquire, |v| { 178 let state = State::new(v); 179 let current_read = state.get_current_read(); 180 // this can't be None because if WriteLock was forgotten instead of dropped the outer API layer panics, 181 // before calling this function 182 let next_read = unsafe { state.get_next_read().unwrap_unchecked() }; 183 184 match (current_read, next_read) { 185 (Some(c), n) if c != n => Some(state.disallow_read().0), 186 _ => { 187 // one of the following situations: 188 // - no read currently 189 // - should read and current read are the same 190 // both mean that the reader isn't behind, so no need to block it 191 // 192 // This is the branch that should be taken in most of the cases, so this fetch_update should be 193 // a single atomic load most of the time 194 None 195 } 196 } 197 }); 198 199 // this basically duplicates the logic from the closure. I am not sure if this is a good idea, but i want to 200 // keep the closure as small as possible to not make the cmp exchange loop harder than necessary 201 let pre_res = match result { 202 Ok(v) => { 203 let state = State::new(v); 204 // SAFETY: when locking for writing reading is always allowed. This operates on the state before the 205 // cmp exchange loop, so this is still true. 206 let next_read = unsafe { state.get_next_read().unwrap_unchecked() }; 207 208 (next_read, None) 209 } 210 Err(v) => { 211 let state = State::new(v); 212 // SAFETY: when locking for writing reading is always allowed. This operates on the state before the 213 // cmp exchange loop, so this is still true. 214 let next_read = unsafe { state.get_next_read().unwrap_unchecked() }; 215 let write = next_read.other(); 216 217 (write, Some(write)) 218 } 219 }; 220 221 let value = match pre_res.0 { 222 Ptr::Value1 => &self.value_1, 223 Ptr::Value2 => &self.value_2, 224 }; 225 226 (value, pre_res.1) 227 } 228 229 pub fn remove_current_read(&self) { 230 // relaxed because modifivations are not possible from the read side 231 let old_state = self 232 .state 233 .fetch_and(State::INV_CURRENT_READ_MASK, Ordering::Relaxed); 234 debug_assert!( 235 (old_state & State::CURRENT_READ_MASK).count_ones() == 1, 236 "don't call this method if there is no active read. value: {old_state:b}" 237 ) 238 } 239 240 pub fn unlock_write(&self, read_state: Option<Ptr>) { 241 match read_state { 242 // set the next read bit to the correct value 243 // value1 so remove the bit 244 Some(Ptr::Value1) => self 245 .state 246 .fetch_and(State::INV_NEXT_READ, Ordering::Release), 247 // value2 so set the bit 248 Some(Ptr::Value2) => self.state.fetch_or(State::NEXT_READ, Ordering::Release), 249 // only set the allow read bit 250 None => self.state.fetch_or(State::READ_ALLOWED, Ordering::Release), 251 }; 252 } 253 254 /// SAFETY: this has to be valid and gets invalidated by the function 255 pub unsafe fn drop(this: NonNull<Self>) { 256 // SAFETY: function preconditions 257 let old_state = unsafe { this.as_ref() } 258 .state 259 .fetch_and(State::INV_UNIQUE_MASK, Ordering::Release); 260 261 if State::new(old_state).is_unique() { 262 // see std::arc 263 atomic::fence(Ordering::Acquire); 264 265 // SAFETY: was alloced with box and is unique 266 core::mem::drop(unsafe { Box::from_raw(this.as_ptr()) }) 267 } 268 } 269 270 /// this allows then duplicating the pointer to it 271 pub fn try_create_other(&self) -> bool { 272 // relaxed, because it is singlethreaded if this succeeds 273 let old_state = State::new(self.state.fetch_or(State::UNIQUE_MASK, Ordering::Relaxed)); 274 // if it was unique before creating another is successfull 275 old_state.is_unique() 276 } 277 278 #[cfg(test)] 279 pub fn is_unique(&self) -> bool { 280 State::new(self.state.load(Ordering::Relaxed)).is_unique() 281 } 282} 283 284#[cfg(test)] 285mod state_tests { 286 use crate::shared::{Ptr, State}; 287 288 #[test] 289 fn unique() { 290 let state = State::new(State::INITIAL); 291 assert!(state.is_unique()); 292 let non_unique = State::new(state.0 | State::UNIQUE_MASK); 293 assert!(!non_unique.is_unique()); 294 let unique_again = State::new(non_unique.0 & State::INV_UNIQUE_MASK); 295 assert!(unique_again.is_unique()); 296 } 297 298 #[test] 299 fn read_state() { 300 let state = State::new(State::INITIAL); 301 assert!(state.get_current_read().is_none()); 302 // ptr 1 read 303 let read1 = state.set_current_read(Ptr::Value1); 304 assert!(read1.get_current_read() == Some(Ptr::Value1)); 305 // ptr 2 read 306 let read2 = state.set_current_read(Ptr::Value2); 307 assert!(read2.get_current_read() == Some(Ptr::Value2)); 308 } 309}