package leftright import ( "fmt" "iter" "runtime" "sync" "sync/atomic" "time" "weak" ) type ( LRMap[K comparable, V any] struct { mu sync.Mutex left arena[K, V] right arena[K, V] readMap atomic.Pointer[arena[K, V]] writeMap atomic.Pointer[arena[K, V]] redoLog []operation[K, V] readHandlers map[weak.Pointer[ReadHandler[K, V]]]struct{} readHandlerPool sync.Pool metrics *metrics } metrics struct { readHandlersCreated uint64 readHandlersClosed uint64 readHandlersRecycled uint64 readHandlersCleanedUp uint64 } arena[K comparable, V any] map[K]V ) func New[K comparable, V any]() *LRMap[K, V] { m := &LRMap[K, V]{ left: make(arena[K, V]), right: make(arena[K, V]), readHandlers: make(map[weak.Pointer[ReadHandler[K, V]]]struct{}), } m.readHandlerPool.New = func() any { return m.newReadHandler() } m.swap() return m } // recordMetrics enables the collection of metrics for read handlers. This is not thread-safe, you must // call this before sharing the LRMap with other goroutines. func (m *LRMap[K, V]) recordMetrics() { m.metrics = new(metrics) } func (m *LRMap[K, V]) Set(key K, value V) { m.mu.Lock() defer m.mu.Unlock() (*m.writeMap.Load())[key] = value m.redoLog = append(m.redoLog, operation[K, V]{typ: opSet, key: key, value: &value}) } func (m *LRMap[K, V]) Delete(key K) { m.mu.Lock() defer m.mu.Unlock() delete(*(m.writeMap.Load()), key) m.redoLog = append(m.redoLog, operation[K, V]{typ: opDelete, key: key}) } func (m *LRMap[K, V]) Clear() { m.mu.Lock() defer m.mu.Unlock() clear(*(m.writeMap.Load())) // allocate a fresh redo log, since any preceding ops are now obsolete m.redoLog = []operation[K, V]{{typ: opClear}} } func (m *LRMap[K, V]) Get(key K) V { value, _ := m.GetOK(key) return value } func (m *LRMap[K, V]) GetOK(key K) (V, bool) { m.mu.Lock() defer m.mu.Unlock() value, ok := (*m.writeMap.Load())[key] return value, ok } func (m *LRMap[K, V]) Commit() { m.mu.Lock() defer m.mu.Unlock() m.swap() m.waitForReaders() // redo all operations from the redo log into the new write map (old read map) to sync up. for _, op := range m.redoLog { switch op.typ { case opSet: (*(m.writeMap.Load()))[op.key] = *(op.value) case opDelete: delete(*(m.writeMap.Load()), op.key) case opClear: clear(*(m.writeMap.Load())) default: panic(fmt.Errorf("operation(%d) not implemented", op.typ)) } } // drop the redo log completely and let the GC remove all references to stale keys and values m.redoLog = nil } func (m *LRMap[K, V]) NewReadHandler() *ReadHandler[K, V] { rh := m.readHandlerPool.Get().(*ReadHandler[K, V]) rh.ready = true return rh } func (m *LRMap[K, V]) newReadHandler() *ReadHandler[K, V] { // nolint:exhaustivestruct rh := &ReadHandler[K, V]{lrmap: m} wp := weak.Make(rh) rh.wp = wp m.mu.Lock() m.readHandlers[wp] = struct{}{} m.mu.Unlock() runtime.AddCleanup(rh, func(lrmap *LRMap[K, V]) { lrmap.mu.Lock() defer lrmap.mu.Unlock() if lrmap.metrics != nil { atomic.AddUint64(&lrmap.metrics.readHandlersCleanedUp, 1) } delete(lrmap.readHandlers, wp) }, m) if m.metrics != nil { atomic.AddUint64(&m.metrics.readHandlersCreated, 1) } return rh } func (m *LRMap[K, V]) swap() { switch m.readMap.Load() { case nil /* initial case */, &(m.left): m.readMap.Store(&(m.right)) m.writeMap.Store(&(m.left)) case &(m.right): m.readMap.Store(&(m.left)) m.writeMap.Store(&(m.right)) default: // nolint:goerr113 panic(fmt.Errorf("illegal pointer: %v", m.readMap.Load())) } } func (m *LRMap[K, V]) waitForReaders() { readers := make(map[*ReadHandler[K, V]]uint64) for rhWP := range m.readHandlers { rh := rhWP.Value() if rh == nil { panic("illegal reader state: must not have nil reader") } if serial := atomic.LoadUint64(&(rh.serial)); serial%2 == 1 { readers[rh] = serial } } delay := time.Microsecond for { for reader, serial := range readers { if s := atomic.LoadUint64(&(reader.serial)); s != serial { delete(readers, reader) } } if len(readers) == 0 { return } time.Sleep(delay) const maxDelay = 5 * time.Second delay *= 10 if delay > maxDelay { delay = maxDelay } } } type opType int8 const ( opSet opType = iota opDelete opClear ) type operation[K comparable, V any] struct { typ opType key K value *V } type ReadHandler[K comparable, V any] struct { lrmap *LRMap[K, V] live arena[K, V] serial uint64 wp weak.Pointer[ReadHandler[K, V]] ready bool } func (rh *ReadHandler[K, V]) Enter() { rh.assertReady() if rh.entered() { panic("reader illegal state: must not Enter() twice") } atomic.AddUint64(&rh.serial, 1) rh.live = *rh.lrmap.readMap.Load() } func (rh *ReadHandler[K, V]) Leave() { rh.assertReady() if !rh.entered() { panic("reader illegal state: must not Leave() twice") } atomic.AddUint64(&rh.serial, 1) } func (rh *ReadHandler[K, V]) Get(key K) V { rh.assertReady() value, _ := rh.GetOK(key) return value } func (rh *ReadHandler[K, V]) GetOK(key K) (V, bool) { rh.assertReady() if !rh.entered() { panic("reader illegal state: must Enter() before operating on data") } value, ok := rh.live[key] return value, ok } func (rh *ReadHandler[K, V]) Len() int { rh.assertReady() if !rh.entered() { panic("reader illegal state: must Enter() before operation on data") } return len(rh.live) } func (rh *ReadHandler[K, V]) Iter() iter.Seq2[K, V] { return func(yield func(K, V) bool) { rh.assertReady() if !rh.entered() { panic("reader illegal state: must call Enter() before iterating") } for key, value := range rh.live { if ok := yield(key, value); !ok { return } } } } func (rh *ReadHandler[K, V]) Close() { rh.assertReady() if rh.lrmap.metrics != nil { atomic.AddUint64(&rh.lrmap.metrics.readHandlersClosed, 1) } rh.ready = false rh.lrmap.mu.Lock() defer rh.lrmap.mu.Unlock() delete(rh.lrmap.readHandlers, rh.wp) } func (rh *ReadHandler[K, V]) Recycle() { if !rh.ready { panic("illegal reader state: must not recycle twice") } if rh.entered() { panic("reader illegal state: must call Leave() before recycling") } if rh.lrmap.metrics != nil { atomic.AddUint64(&rh.lrmap.metrics.readHandlersRecycled, 1) } rh.ready = false rh.lrmap.readHandlerPool.Put(rh) } func (rh *ReadHandler[K, V]) entered() bool { return rh.serial%2 == 1 } func (rh *ReadHandler[K, V]) assertReady() { if rh == nil { panic("reader illegal state: must create with NewReadHandler()") } if !rh.ready { panic(fmt.Errorf("reader illegal state: must not use after Recycle()")) } }