A map using a lock-free concurrency using left-right algo, written in Go
at main 343 lines 6.7 kB view raw
1package leftright 2 3import ( 4 "fmt" 5 "iter" 6 "runtime" 7 "sync" 8 "sync/atomic" 9 "time" 10 "weak" 11) 12 13type ( 14 LRMap[K comparable, V any] struct { 15 mu sync.Mutex 16 left arena[K, V] 17 right arena[K, V] 18 readMap atomic.Pointer[arena[K, V]] 19 writeMap atomic.Pointer[arena[K, V]] 20 redoLog []operation[K, V] 21 readHandlers map[weak.Pointer[ReadHandler[K, V]]]struct{} 22 readHandlerPool sync.Pool 23 metrics *metrics 24 } 25 26 metrics struct { 27 readHandlersCreated uint64 28 readHandlersClosed uint64 29 readHandlersRecycled uint64 30 readHandlersCleanedUp uint64 31 } 32 33 arena[K comparable, V any] map[K]V 34) 35 36func New[K comparable, V any]() *LRMap[K, V] { 37 m := &LRMap[K, V]{ 38 left: make(arena[K, V]), 39 right: make(arena[K, V]), 40 readHandlers: make(map[weak.Pointer[ReadHandler[K, V]]]struct{}), 41 } 42 43 m.readHandlerPool.New = func() any { return m.newReadHandler() } 44 45 m.swap() 46 47 return m 48} 49 50// recordMetrics enables the collection of metrics for read handlers. This is not thread-safe, you must 51// call this before sharing the LRMap with other goroutines. 52func (m *LRMap[K, V]) recordMetrics() { m.metrics = new(metrics) } 53 54func (m *LRMap[K, V]) Set(key K, value V) { 55 m.mu.Lock() 56 defer m.mu.Unlock() 57 58 (*m.writeMap.Load())[key] = value 59 60 m.redoLog = append(m.redoLog, operation[K, V]{typ: opSet, key: key, value: &value}) 61} 62 63func (m *LRMap[K, V]) Delete(key K) { 64 m.mu.Lock() 65 defer m.mu.Unlock() 66 67 delete(*(m.writeMap.Load()), key) 68 69 m.redoLog = append(m.redoLog, operation[K, V]{typ: opDelete, key: key}) 70} 71 72func (m *LRMap[K, V]) Clear() { 73 m.mu.Lock() 74 defer m.mu.Unlock() 75 76 clear(*(m.writeMap.Load())) 77 78 // allocate a fresh redo log, since any preceding ops are now obsolete 79 m.redoLog = []operation[K, V]{{typ: opClear}} 80} 81 82func (m *LRMap[K, V]) Get(key K) V { 83 value, _ := m.GetOK(key) 84 85 return value 86} 87 88func (m *LRMap[K, V]) GetOK(key K) (V, bool) { 89 m.mu.Lock() 90 defer m.mu.Unlock() 91 92 value, ok := (*m.writeMap.Load())[key] 93 94 return value, ok 95} 96 97func (m *LRMap[K, V]) Commit() { 98 m.mu.Lock() 99 defer m.mu.Unlock() 100 101 m.swap() 102 103 m.waitForReaders() 104 105 // redo all operations from the redo log into the new write map (old read map) to sync up. 106 for _, op := range m.redoLog { 107 switch op.typ { 108 case opSet: 109 (*(m.writeMap.Load()))[op.key] = *(op.value) 110 case opDelete: 111 delete(*(m.writeMap.Load()), op.key) 112 case opClear: 113 clear(*(m.writeMap.Load())) 114 default: 115 panic(fmt.Errorf("operation(%d) not implemented", op.typ)) 116 } 117 } 118 119 // drop the redo log completely and let the GC remove all references to stale keys and values 120 m.redoLog = nil 121} 122 123func (m *LRMap[K, V]) NewReadHandler() *ReadHandler[K, V] { 124 rh := m.readHandlerPool.Get().(*ReadHandler[K, V]) 125 rh.ready = true 126 127 return rh 128} 129 130func (m *LRMap[K, V]) newReadHandler() *ReadHandler[K, V] { 131 // nolint:exhaustivestruct 132 rh := &ReadHandler[K, V]{lrmap: m} 133 134 wp := weak.Make(rh) 135 rh.wp = wp 136 137 m.mu.Lock() 138 m.readHandlers[wp] = struct{}{} 139 m.mu.Unlock() 140 141 runtime.AddCleanup(rh, func(lrmap *LRMap[K, V]) { 142 lrmap.mu.Lock() 143 defer lrmap.mu.Unlock() 144 145 if lrmap.metrics != nil { 146 atomic.AddUint64(&lrmap.metrics.readHandlersCleanedUp, 1) 147 } 148 149 delete(lrmap.readHandlers, wp) 150 }, m) 151 152 if m.metrics != nil { 153 atomic.AddUint64(&m.metrics.readHandlersCreated, 1) 154 } 155 156 return rh 157} 158 159func (m *LRMap[K, V]) swap() { 160 switch m.readMap.Load() { 161 case nil /* initial case */, &(m.left): 162 m.readMap.Store(&(m.right)) 163 m.writeMap.Store(&(m.left)) 164 case &(m.right): 165 m.readMap.Store(&(m.left)) 166 m.writeMap.Store(&(m.right)) 167 default: 168 // nolint:goerr113 169 panic(fmt.Errorf("illegal pointer: %v", m.readMap.Load())) 170 } 171} 172 173func (m *LRMap[K, V]) waitForReaders() { 174 readers := make(map[*ReadHandler[K, V]]uint64) 175 176 for rhWP := range m.readHandlers { 177 rh := rhWP.Value() 178 if rh == nil { 179 panic("illegal reader state: must not have nil reader") 180 } 181 182 if serial := atomic.LoadUint64(&(rh.serial)); serial%2 == 1 { 183 readers[rh] = serial 184 } 185 } 186 187 delay := time.Microsecond 188 189 for { 190 for reader, serial := range readers { 191 if s := atomic.LoadUint64(&(reader.serial)); s != serial { 192 delete(readers, reader) 193 } 194 } 195 196 if len(readers) == 0 { 197 return 198 } 199 200 time.Sleep(delay) 201 202 const maxDelay = 5 * time.Second 203 204 delay *= 10 205 if delay > maxDelay { 206 delay = maxDelay 207 } 208 } 209} 210 211type opType int8 212 213const ( 214 opSet opType = iota 215 opDelete 216 opClear 217) 218 219type operation[K comparable, V any] struct { 220 typ opType 221 key K 222 value *V 223} 224 225type ReadHandler[K comparable, V any] struct { 226 lrmap *LRMap[K, V] 227 live arena[K, V] 228 serial uint64 229 wp weak.Pointer[ReadHandler[K, V]] 230 ready bool 231} 232 233func (rh *ReadHandler[K, V]) Enter() { 234 rh.assertReady() 235 if rh.entered() { 236 panic("reader illegal state: must not Enter() twice") 237 } 238 239 atomic.AddUint64(&rh.serial, 1) 240 rh.live = *rh.lrmap.readMap.Load() 241} 242 243func (rh *ReadHandler[K, V]) Leave() { 244 rh.assertReady() 245 if !rh.entered() { 246 panic("reader illegal state: must not Leave() twice") 247 } 248 249 atomic.AddUint64(&rh.serial, 1) 250} 251 252func (rh *ReadHandler[K, V]) Get(key K) V { 253 rh.assertReady() 254 255 value, _ := rh.GetOK(key) 256 257 return value 258} 259 260func (rh *ReadHandler[K, V]) GetOK(key K) (V, bool) { 261 rh.assertReady() 262 263 if !rh.entered() { 264 panic("reader illegal state: must Enter() before operating on data") 265 } 266 267 value, ok := rh.live[key] 268 269 return value, ok 270} 271 272func (rh *ReadHandler[K, V]) Len() int { 273 rh.assertReady() 274 275 if !rh.entered() { 276 panic("reader illegal state: must Enter() before operation on data") 277 } 278 279 return len(rh.live) 280} 281 282func (rh *ReadHandler[K, V]) Iter() iter.Seq2[K, V] { 283 return func(yield func(K, V) bool) { 284 rh.assertReady() 285 286 if !rh.entered() { 287 panic("reader illegal state: must call Enter() before iterating") 288 } 289 290 for key, value := range rh.live { 291 if ok := yield(key, value); !ok { 292 return 293 } 294 } 295 } 296} 297 298func (rh *ReadHandler[K, V]) Close() { 299 rh.assertReady() 300 301 if rh.lrmap.metrics != nil { 302 atomic.AddUint64(&rh.lrmap.metrics.readHandlersClosed, 1) 303 } 304 305 rh.ready = false 306 307 rh.lrmap.mu.Lock() 308 defer rh.lrmap.mu.Unlock() 309 310 delete(rh.lrmap.readHandlers, rh.wp) 311} 312 313func (rh *ReadHandler[K, V]) Recycle() { 314 if !rh.ready { 315 panic("illegal reader state: must not recycle twice") 316 } 317 318 if rh.entered() { 319 panic("reader illegal state: must call Leave() before recycling") 320 } 321 322 if rh.lrmap.metrics != nil { 323 atomic.AddUint64(&rh.lrmap.metrics.readHandlersRecycled, 1) 324 } 325 326 rh.ready = false 327 328 rh.lrmap.readHandlerPool.Put(rh) 329} 330 331func (rh *ReadHandler[K, V]) entered() bool { 332 return rh.serial%2 == 1 333} 334 335func (rh *ReadHandler[K, V]) assertReady() { 336 if rh == nil { 337 panic("reader illegal state: must create with NewReadHandler()") 338 } 339 340 if !rh.ready { 341 panic(fmt.Errorf("reader illegal state: must not use after Recycle()")) 342 } 343}