concurrent, realtime write data structure.
1use core::{
2 cell::UnsafeCell,
3 ptr::NonNull,
4 sync::atomic::{self, AtomicU8, Ordering},
5};
6
7use alloc::boxed::Box;
8
9#[derive(Clone, Copy, PartialEq, Eq)]
10pub enum Ptr {
11 Value1,
12 Value2,
13}
14
15impl Ptr {
16 fn other(self) -> Self {
17 match self {
18 Ptr::Value1 => Ptr::Value2,
19 Ptr::Value2 => Ptr::Value1,
20 }
21 }
22}
23
24#[derive(Clone, Copy)]
25struct State(u8);
26
27impl State {
28 /// - Unique
29 /// - no read
30 /// - new read allowed
31 /// - should read value 0
32 const INITIAL: u8 = 0b01000;
33
34 const CURRENT_READ_MASK: u8 = 0b00110;
35 const INV_CURRENT_READ_MASK: u8 = 0b11001;
36 const UNIQUE_MASK: u8 = 0b00001;
37 const INV_UNIQUE_MASK: u8 = 0b11110;
38
39 const NEXT_READ: u8 = 0b10000;
40 const INV_NEXT_READ: u8 = 0b01111;
41 const READ_ALLOWED: u8 = 0b01000;
42 const INV_READ_ALLOWED: u8 = 0b10111;
43
44 fn new(value: u8) -> Self {
45 // could also be assert unchecked
46 debug_assert_eq!(
47 value & 0b1110_0000,
48 0,
49 "only the lower 5 bits should be used. value: {value:b}"
50 );
51 debug_assert!(
52 (value & Self::CURRENT_READ_MASK).count_ones() <= 1,
53 "max 1 read. value: {value:b}"
54 );
55 Self(value)
56 }
57
58 // returns None if no read is allowed
59 fn get_next_read(self) -> Option<Ptr> {
60 // is reading allowed
61 if self.0 & Self::READ_ALLOWED == 0 {
62 None
63 } else {
64 // which value should be read
65 if self.0 & Self::NEXT_READ == 0 {
66 Some(Ptr::Value1)
67 } else {
68 Some(Ptr::Value2)
69 }
70 }
71 }
72
73 fn set_current_read(self, ptr: Ptr) -> Self {
74 debug_assert!(
75 self.0 & Self::CURRENT_READ_MASK == 0,
76 "no read should be active. value: {:b}",
77 self.0
78 );
79
80 let mask = match ptr {
81 Ptr::Value1 => 0b10,
82 Ptr::Value2 => 0b100,
83 };
84
85 Self(self.0 | mask)
86 }
87
88 // none if there is no current read
89 fn get_current_read(self) -> Option<Ptr> {
90 match self.0 & Self::CURRENT_READ_MASK {
91 0b000 => None,
92 0b010 => Some(Ptr::Value1),
93 0b100 => Some(Ptr::Value2),
94 _ => unreachable!(),
95 }
96 }
97
98 fn is_unique(self) -> bool {
99 self.0 & Self::UNIQUE_MASK == 0
100 }
101
102 fn disallow_read(self) -> Self {
103 Self(self.0 & Self::INV_READ_ALLOWED)
104 }
105}
106
107pub struct Shared<T> {
108 /// # Bits from low to high
109 /// | bit | meaning |
110 /// |---|---|
111 /// | 0 | 0: unique, 1: second object exists |
112 /// | 1 | is value 1 being read (never both) |
113 /// | 2 | is value 2 being read (never both) |
114 /// | 3 | new read allowed |
115 /// | 4 | which value should be read (0: value1, 1: value2) |
116 state: AtomicU8,
117 value_1: UnsafeCell<T>,
118 value_2: UnsafeCell<T>,
119}
120
121impl<T: Clone> Shared<T> {
122 pub fn new(value: T) -> NonNull<Shared<T>> {
123 NonNull::new(Box::into_raw(Box::new(Shared {
124 state: AtomicU8::new(State::INITIAL),
125 value_1: UnsafeCell::new(value.clone()),
126 value_2: UnsafeCell::new(value),
127 })))
128 .unwrap()
129 }
130}
131
132impl<T: Default> Shared<T> {
133 pub fn new_default() -> NonNull<Shared<T>> {
134 NonNull::new(Box::into_raw(Box::new(Shared {
135 state: AtomicU8::new(State::INITIAL),
136 value_1: UnsafeCell::new(T::default()),
137 value_2: UnsafeCell::new(T::default()),
138 })))
139 .unwrap()
140 }
141}
142
143impl<T> Shared<T> {
144 // returns a unsafecell to allow some lifetime stuff
145 pub fn try_read(&self) -> Option<&UnsafeCell<T>> {
146 let result = self
147 .state
148 .fetch_update(Ordering::Acquire, Ordering::Relaxed, |value| {
149 let state = State::new(value);
150 let read = state.get_next_read();
151 read.map(|p| state.set_current_read(p).0)
152 });
153 match result {
154 // read is allowed. could be unwrap unchecked
155 Ok(v) => {
156 // unwrap ok, because Ok returned. could also be unchecked
157 let ptr = State::new(v).get_next_read().unwrap();
158 // SAFETY: atomic was just locked and the read state was set.
159 Some(match ptr {
160 Ptr::Value1 => &self.value_1,
161 Ptr::Value2 => &self.value_2,
162 })
163 }
164 // read isn't allowed
165 Err(_) => None,
166 }
167 }
168
169 /// The Option is None if the reader is blocked by this write. This can happen if the reader is keeping
170 /// a readlock for a long time. If it is None the writelock doesn't need to set the next_read bit when
171 /// unlocking. It only needs to remove the no_read bit.
172 ///
173 /// If this is Some the reader isn't blocked. The writelock needs to set the next_read bit to the Ptr value
174 pub unsafe fn write(&self) -> (&UnsafeCell<T>, Option<Ptr>) {
175 let result = self
176 .state
177 .fetch_update(Ordering::Acquire, Ordering::Acquire, |v| {
178 let state = State::new(v);
179 let current_read = state.get_current_read();
180 // this can't be None because if WriteLock was forgotten instead of dropped the outer API layer panics,
181 // before calling this function
182 let next_read = unsafe { state.get_next_read().unwrap_unchecked() };
183
184 match (current_read, next_read) {
185 (Some(c), n) if c != n => Some(state.disallow_read().0),
186 _ => {
187 // one of the following situations:
188 // - no read currently
189 // - should read and current read are the same
190 // both mean that the reader isn't behind, so no need to block it
191 //
192 // This is the branch that should be taken in most of the cases, so this fetch_update should be
193 // a single atomic load most of the time
194 None
195 }
196 }
197 });
198
199 // this basically duplicates the logic from the closure. I am not sure if this is a good idea, but i want to
200 // keep the closure as small as possible to not make the cmp exchange loop harder than necessary
201 let pre_res = match result {
202 Ok(v) => {
203 let state = State::new(v);
204 // SAFETY: when locking for writing reading is always allowed. This operates on the state before the
205 // cmp exchange loop, so this is still true.
206 let next_read = unsafe { state.get_next_read().unwrap_unchecked() };
207
208 (next_read, None)
209 }
210 Err(v) => {
211 let state = State::new(v);
212 // SAFETY: when locking for writing reading is always allowed. This operates on the state before the
213 // cmp exchange loop, so this is still true.
214 let next_read = unsafe { state.get_next_read().unwrap_unchecked() };
215 let write = next_read.other();
216
217 (write, Some(write))
218 }
219 };
220
221 let value = match pre_res.0 {
222 Ptr::Value1 => &self.value_1,
223 Ptr::Value2 => &self.value_2,
224 };
225
226 (value, pre_res.1)
227 }
228
229 pub fn remove_current_read(&self) {
230 // relaxed because modifivations are not possible from the read side
231 let old_state = self
232 .state
233 .fetch_and(State::INV_CURRENT_READ_MASK, Ordering::Relaxed);
234 debug_assert!(
235 (old_state & State::CURRENT_READ_MASK).count_ones() == 1,
236 "don't call this method if there is no active read. value: {old_state:b}"
237 )
238 }
239
240 pub fn unlock_write(&self, read_state: Option<Ptr>) {
241 match read_state {
242 // set the next read bit to the correct value
243 // value1 so remove the bit
244 Some(Ptr::Value1) => self
245 .state
246 .fetch_and(State::INV_NEXT_READ, Ordering::Release),
247 // value2 so set the bit
248 Some(Ptr::Value2) => self.state.fetch_or(State::NEXT_READ, Ordering::Release),
249 // only set the allow read bit
250 None => self.state.fetch_or(State::READ_ALLOWED, Ordering::Release),
251 };
252 }
253
254 /// SAFETY: this has to be valid and gets invalidated by the function
255 pub unsafe fn drop(this: NonNull<Self>) {
256 // SAFETY: function preconditions
257 let old_state = unsafe { this.as_ref() }
258 .state
259 .fetch_and(State::INV_UNIQUE_MASK, Ordering::Release);
260
261 if State::new(old_state).is_unique() {
262 // see std::arc
263 atomic::fence(Ordering::Acquire);
264
265 // SAFETY: was alloced with box and is unique
266 core::mem::drop(unsafe { Box::from_raw(this.as_ptr()) })
267 }
268 }
269
270 /// this allows then duplicating the pointer to it
271 pub fn try_create_other(&self) -> bool {
272 // relaxed, because it is singlethreaded if this succeeds
273 let old_state = State::new(self.state.fetch_or(State::UNIQUE_MASK, Ordering::Relaxed));
274 // if it was unique before creating another is successfull
275 old_state.is_unique()
276 }
277
278 #[cfg(test)]
279 pub fn is_unique(&self) -> bool {
280 State::new(self.state.load(Ordering::Relaxed)).is_unique()
281 }
282}
283
284#[cfg(test)]
285mod state_tests {
286 use crate::shared::{Ptr, State};
287
288 #[test]
289 fn unique() {
290 let state = State::new(State::INITIAL);
291 assert!(state.is_unique());
292 let non_unique = State::new(state.0 | State::UNIQUE_MASK);
293 assert!(!non_unique.is_unique());
294 let unique_again = State::new(non_unique.0 & State::INV_UNIQUE_MASK);
295 assert!(unique_again.is_unique());
296 }
297
298 #[test]
299 fn read_state() {
300 let state = State::new(State::INITIAL);
301 assert!(state.get_current_read().is_none());
302 // ptr 1 read
303 let read1 = state.set_current_read(Ptr::Value1);
304 assert!(read1.get_current_read() == Some(Ptr::Value1));
305 // ptr 2 read
306 let read2 = state.set_current_read(Ptr::Value2);
307 assert!(read2.get_current_read() == Some(Ptr::Value2));
308 }
309}