A map using a lock-free concurrency using left-right algo, written in Go
1package leftright
2
3import (
4 "fmt"
5 "iter"
6 "runtime"
7 "sync"
8 "sync/atomic"
9 "time"
10 "weak"
11)
12
13type (
14 LRMap[K comparable, V any] struct {
15 mu sync.Mutex
16 left arena[K, V]
17 right arena[K, V]
18 readMap atomic.Pointer[arena[K, V]]
19 writeMap atomic.Pointer[arena[K, V]]
20 redoLog []operation[K, V]
21 readHandlers map[weak.Pointer[ReadHandler[K, V]]]struct{}
22 readHandlerPool sync.Pool
23 metrics *metrics
24 }
25
26 metrics struct {
27 readHandlersCreated uint64
28 readHandlersClosed uint64
29 readHandlersRecycled uint64
30 readHandlersCleanedUp uint64
31 }
32
33 arena[K comparable, V any] map[K]V
34)
35
36func New[K comparable, V any]() *LRMap[K, V] {
37 m := &LRMap[K, V]{
38 left: make(arena[K, V]),
39 right: make(arena[K, V]),
40 readHandlers: make(map[weak.Pointer[ReadHandler[K, V]]]struct{}),
41 }
42
43 m.readHandlerPool.New = func() any { return m.newReadHandler() }
44
45 m.swap()
46
47 return m
48}
49
50// recordMetrics enables the collection of metrics for read handlers. This is not thread-safe, you must
51// call this before sharing the LRMap with other goroutines.
52func (m *LRMap[K, V]) recordMetrics() { m.metrics = new(metrics) }
53
54func (m *LRMap[K, V]) Set(key K, value V) {
55 m.mu.Lock()
56 defer m.mu.Unlock()
57
58 (*m.writeMap.Load())[key] = value
59
60 m.redoLog = append(m.redoLog, operation[K, V]{typ: opSet, key: key, value: &value})
61}
62
63func (m *LRMap[K, V]) Delete(key K) {
64 m.mu.Lock()
65 defer m.mu.Unlock()
66
67 delete(*(m.writeMap.Load()), key)
68
69 m.redoLog = append(m.redoLog, operation[K, V]{typ: opDelete, key: key})
70}
71
72func (m *LRMap[K, V]) Clear() {
73 m.mu.Lock()
74 defer m.mu.Unlock()
75
76 clear(*(m.writeMap.Load()))
77
78 // allocate a fresh redo log, since any preceding ops are now obsolete
79 m.redoLog = []operation[K, V]{{typ: opClear}}
80}
81
82func (m *LRMap[K, V]) Get(key K) V {
83 value, _ := m.GetOK(key)
84
85 return value
86}
87
88func (m *LRMap[K, V]) GetOK(key K) (V, bool) {
89 m.mu.Lock()
90 defer m.mu.Unlock()
91
92 value, ok := (*m.writeMap.Load())[key]
93
94 return value, ok
95}
96
97func (m *LRMap[K, V]) Commit() {
98 m.mu.Lock()
99 defer m.mu.Unlock()
100
101 m.swap()
102
103 m.waitForReaders()
104
105 // redo all operations from the redo log into the new write map (old read map) to sync up.
106 for _, op := range m.redoLog {
107 switch op.typ {
108 case opSet:
109 (*(m.writeMap.Load()))[op.key] = *(op.value)
110 case opDelete:
111 delete(*(m.writeMap.Load()), op.key)
112 case opClear:
113 clear(*(m.writeMap.Load()))
114 default:
115 panic(fmt.Errorf("operation(%d) not implemented", op.typ))
116 }
117 }
118
119 // drop the redo log completely and let the GC remove all references to stale keys and values
120 m.redoLog = nil
121}
122
123func (m *LRMap[K, V]) NewReadHandler() *ReadHandler[K, V] {
124 rh := m.readHandlerPool.Get().(*ReadHandler[K, V])
125 rh.ready = true
126
127 return rh
128}
129
130func (m *LRMap[K, V]) newReadHandler() *ReadHandler[K, V] {
131 // nolint:exhaustivestruct
132 rh := &ReadHandler[K, V]{lrmap: m}
133
134 wp := weak.Make(rh)
135 rh.wp = wp
136
137 m.mu.Lock()
138 m.readHandlers[wp] = struct{}{}
139 m.mu.Unlock()
140
141 runtime.AddCleanup(rh, func(lrmap *LRMap[K, V]) {
142 lrmap.mu.Lock()
143 defer lrmap.mu.Unlock()
144
145 if lrmap.metrics != nil {
146 atomic.AddUint64(&lrmap.metrics.readHandlersCleanedUp, 1)
147 }
148
149 delete(lrmap.readHandlers, wp)
150 }, m)
151
152 if m.metrics != nil {
153 atomic.AddUint64(&m.metrics.readHandlersCreated, 1)
154 }
155
156 return rh
157}
158
159func (m *LRMap[K, V]) swap() {
160 switch m.readMap.Load() {
161 case nil /* initial case */, &(m.left):
162 m.readMap.Store(&(m.right))
163 m.writeMap.Store(&(m.left))
164 case &(m.right):
165 m.readMap.Store(&(m.left))
166 m.writeMap.Store(&(m.right))
167 default:
168 // nolint:goerr113
169 panic(fmt.Errorf("illegal pointer: %v", m.readMap.Load()))
170 }
171}
172
173func (m *LRMap[K, V]) waitForReaders() {
174 readers := make(map[*ReadHandler[K, V]]uint64)
175
176 for rhWP := range m.readHandlers {
177 rh := rhWP.Value()
178 if rh == nil {
179 panic("illegal reader state: must not have nil reader")
180 }
181
182 if serial := atomic.LoadUint64(&(rh.serial)); serial%2 == 1 {
183 readers[rh] = serial
184 }
185 }
186
187 delay := time.Microsecond
188
189 for {
190 for reader, serial := range readers {
191 if s := atomic.LoadUint64(&(reader.serial)); s != serial {
192 delete(readers, reader)
193 }
194 }
195
196 if len(readers) == 0 {
197 return
198 }
199
200 time.Sleep(delay)
201
202 const maxDelay = 5 * time.Second
203
204 delay *= 10
205 if delay > maxDelay {
206 delay = maxDelay
207 }
208 }
209}
210
211type opType int8
212
213const (
214 opSet opType = iota
215 opDelete
216 opClear
217)
218
219type operation[K comparable, V any] struct {
220 typ opType
221 key K
222 value *V
223}
224
225type ReadHandler[K comparable, V any] struct {
226 lrmap *LRMap[K, V]
227 live arena[K, V]
228 serial uint64
229 wp weak.Pointer[ReadHandler[K, V]]
230 ready bool
231}
232
233func (rh *ReadHandler[K, V]) Enter() {
234 rh.assertReady()
235 if rh.entered() {
236 panic("reader illegal state: must not Enter() twice")
237 }
238
239 atomic.AddUint64(&rh.serial, 1)
240 rh.live = *rh.lrmap.readMap.Load()
241}
242
243func (rh *ReadHandler[K, V]) Leave() {
244 rh.assertReady()
245 if !rh.entered() {
246 panic("reader illegal state: must not Leave() twice")
247 }
248
249 atomic.AddUint64(&rh.serial, 1)
250}
251
252func (rh *ReadHandler[K, V]) Get(key K) V {
253 rh.assertReady()
254
255 value, _ := rh.GetOK(key)
256
257 return value
258}
259
260func (rh *ReadHandler[K, V]) GetOK(key K) (V, bool) {
261 rh.assertReady()
262
263 if !rh.entered() {
264 panic("reader illegal state: must Enter() before operating on data")
265 }
266
267 value, ok := rh.live[key]
268
269 return value, ok
270}
271
272func (rh *ReadHandler[K, V]) Len() int {
273 rh.assertReady()
274
275 if !rh.entered() {
276 panic("reader illegal state: must Enter() before operation on data")
277 }
278
279 return len(rh.live)
280}
281
282func (rh *ReadHandler[K, V]) Iter() iter.Seq2[K, V] {
283 return func(yield func(K, V) bool) {
284 rh.assertReady()
285
286 if !rh.entered() {
287 panic("reader illegal state: must call Enter() before iterating")
288 }
289
290 for key, value := range rh.live {
291 if ok := yield(key, value); !ok {
292 return
293 }
294 }
295 }
296}
297
298func (rh *ReadHandler[K, V]) Close() {
299 rh.assertReady()
300
301 if rh.lrmap.metrics != nil {
302 atomic.AddUint64(&rh.lrmap.metrics.readHandlersClosed, 1)
303 }
304
305 rh.ready = false
306
307 rh.lrmap.mu.Lock()
308 defer rh.lrmap.mu.Unlock()
309
310 delete(rh.lrmap.readHandlers, rh.wp)
311}
312
313func (rh *ReadHandler[K, V]) Recycle() {
314 if !rh.ready {
315 panic("illegal reader state: must not recycle twice")
316 }
317
318 if rh.entered() {
319 panic("reader illegal state: must call Leave() before recycling")
320 }
321
322 if rh.lrmap.metrics != nil {
323 atomic.AddUint64(&rh.lrmap.metrics.readHandlersRecycled, 1)
324 }
325
326 rh.ready = false
327
328 rh.lrmap.readHandlerPool.Put(rh)
329}
330
331func (rh *ReadHandler[K, V]) entered() bool {
332 return rh.serial%2 == 1
333}
334
335func (rh *ReadHandler[K, V]) assertReady() {
336 if rh == nil {
337 panic("reader illegal state: must create with NewReadHandler()")
338 }
339
340 if !rh.ready {
341 panic(fmt.Errorf("reader illegal state: must not use after Recycle()"))
342 }
343}