tangled
alpha
login
or
join now
joh.dev
/
leftright
1
fork
atom
A map using a lock-free concurrency using left-right algo, written in Go
1
fork
atom
overview
issues
pulls
pipelines
Initial commit
authored by
Johannes Kohnen
and committed by
joh.dev
9 months ago
d4f85921
+342
3 changed files
expand all
collapse all
unified
split
go.mod
lrmap.go
lrmap_test.go
+3
go.mod
···
1
1
+
module github.com/jwkohnen/lrmap
2
2
+
3
3
+
go 1.16
+215
lrmap.go
···
1
1
+
package lrmap
2
2
+
3
3
+
import (
4
4
+
"fmt"
5
5
+
"runtime"
6
6
+
"sync"
7
7
+
"sync/atomic"
8
8
+
"time"
9
9
+
"unsafe"
10
10
+
)
11
11
+
12
12
+
type (
13
13
+
LRMap struct {
14
14
+
mu sync.Mutex
15
15
+
left mapT
16
16
+
right mapT
17
17
+
readMap unsafe.Pointer
18
18
+
writeMap *mapT
19
19
+
opLog []op
20
20
+
readHandlers map[*ReadHandler]struct{}
21
21
+
}
22
22
+
23
23
+
mapT map[Key]Value
24
24
+
25
25
+
Key int
26
26
+
Value int
27
27
+
)
28
28
+
29
29
+
func New() *LRMap {
30
30
+
m := &LRMap{
31
31
+
left: make(map[Key]Value),
32
32
+
right: make(map[Key]Value),
33
33
+
readHandlers: make(map[*ReadHandler]struct{}),
34
34
+
}
35
35
+
36
36
+
m.swap()
37
37
+
38
38
+
return m
39
39
+
}
40
40
+
41
41
+
func (m *LRMap) Set(k Key, v Value) {
42
42
+
m.mu.Lock()
43
43
+
defer m.mu.Unlock()
44
44
+
45
45
+
(*(m.writeMap))[k] = v
46
46
+
47
47
+
m.opLog = append(m.opLog, op{t: opSet, k: k, v: &v})
48
48
+
}
49
49
+
50
50
+
func (m *LRMap) Delete(k Key) {
51
51
+
m.mu.Lock()
52
52
+
defer m.mu.Unlock()
53
53
+
54
54
+
delete(*(m.writeMap), k)
55
55
+
56
56
+
m.opLog = append(m.opLog, op{t: opDelete, k: k})
57
57
+
}
58
58
+
59
59
+
func (m *LRMap) Flush() {
60
60
+
m.mu.Lock()
61
61
+
defer m.mu.Unlock()
62
62
+
63
63
+
m.swap()
64
64
+
65
65
+
m.waitForReaders()
66
66
+
67
67
+
// redo all operations from the op log into the new write map (old read map) to sync up.
68
68
+
for _, o := range m.opLog {
69
69
+
switch o.t {
70
70
+
case opSet:
71
71
+
(*(m.writeMap))[o.k] = *(o.v)
72
72
+
case opDelete:
73
73
+
delete(*(m.writeMap), o.k)
74
74
+
default:
75
75
+
panic(fmt.Errorf("op(%d) not implemented", o.t))
76
76
+
}
77
77
+
}
78
78
+
79
79
+
// drop the op log completely and let the GC remove all references to stale keys and values
80
80
+
m.opLog = nil
81
81
+
}
82
82
+
83
83
+
func (m *LRMap) NewReadHandler() *ReadHandler {
84
84
+
m.mu.Lock()
85
85
+
defer m.mu.Unlock()
86
86
+
87
87
+
rh := &ReadHandler{lrm: m}
88
88
+
89
89
+
m.readHandlers[rh] = struct{}{}
90
90
+
91
91
+
// sharp-edged finalizer: https://crawshaw.io/blog/sharp-edged-finalizers
92
92
+
_, file, line, _ := runtime.Caller(1)
93
93
+
runtime.SetFinalizer(rh, func(*ReadHandler) {
94
94
+
panic(fmt.Errorf("reader illegal state: must call Close() (created at %s:%d)", file, line))
95
95
+
})
96
96
+
97
97
+
return rh
98
98
+
}
99
99
+
100
100
+
func (m *LRMap) swap() {
101
101
+
var (
102
102
+
r = (*mapT)(m.readMap)
103
103
+
w *mapT
104
104
+
)
105
105
+
106
106
+
switch r {
107
107
+
case nil:
108
108
+
// initial case: start with left map as reader, right map as writer
109
109
+
r = &(m.left)
110
110
+
w = &(m.right)
111
111
+
case &(m.left):
112
112
+
r = &(m.right)
113
113
+
w = &(m.left)
114
114
+
case &(m.right):
115
115
+
r = &(m.left)
116
116
+
w = &(m.right)
117
117
+
default:
118
118
+
panic(fmt.Errorf("illegal pointer: %v", r))
119
119
+
}
120
120
+
121
121
+
atomic.StorePointer(&(m.readMap), unsafe.Pointer(r))
122
122
+
m.writeMap = w
123
123
+
}
124
124
+
125
125
+
func (m *LRMap) waitForReaders() {
126
126
+
readers := make(map[*ReadHandler]uint64)
127
127
+
for rh := range m.readHandlers {
128
128
+
epoch := atomic.LoadUint64(&(rh.epoch))
129
129
+
if epoch%2 == 1 {
130
130
+
readers[rh] = epoch
131
131
+
}
132
132
+
}
133
133
+
134
134
+
delay := time.Microsecond
135
135
+
136
136
+
for {
137
137
+
for reader, epoch := range readers {
138
138
+
if e := atomic.LoadUint64(&(reader.epoch)); e != epoch {
139
139
+
delete(readers, reader)
140
140
+
}
141
141
+
}
142
142
+
143
143
+
if len(readers) == 0 {
144
144
+
return
145
145
+
}
146
146
+
147
147
+
time.Sleep(delay)
148
148
+
149
149
+
delay *= 10
150
150
+
if delay > 5*time.Second {
151
151
+
delay = 5 * time.Second
152
152
+
}
153
153
+
154
154
+
}
155
155
+
}
156
156
+
157
157
+
func (m *LRMap) getMapAtomic() mapT {
158
158
+
return *(*mapT)(atomic.LoadPointer(&(m.readMap)))
159
159
+
}
160
160
+
161
161
+
type opType int8
162
162
+
163
163
+
const (
164
164
+
opSet opType = iota
165
165
+
opDelete
166
166
+
)
167
167
+
168
168
+
type op struct {
169
169
+
t opType
170
170
+
k Key
171
171
+
v *Value
172
172
+
}
173
173
+
174
174
+
type ReadHandler struct {
175
175
+
lrm *LRMap
176
176
+
live mapT
177
177
+
epoch uint64
178
178
+
closed bool
179
179
+
}
180
180
+
181
181
+
func (r *ReadHandler) Enter() {
182
182
+
atomic.AddUint64(&r.epoch, 1)
183
183
+
r.live = r.lrm.getMapAtomic()
184
184
+
}
185
185
+
186
186
+
func (r *ReadHandler) Leave() {
187
187
+
atomic.AddUint64(&r.epoch, 1)
188
188
+
}
189
189
+
190
190
+
func (r *ReadHandler) Get(k Key) Value {
191
191
+
v, _ := r.GetOK(k)
192
192
+
return v
193
193
+
}
194
194
+
195
195
+
func (r *ReadHandler) GetOK(k Key) (Value, bool) {
196
196
+
if r.epoch%2 == 0 {
197
197
+
panic("reader illegal state: must Enter() before operating on data")
198
198
+
}
199
199
+
200
200
+
v, ok := r.live[k]
201
201
+
return v, ok
202
202
+
}
203
203
+
204
204
+
func (r *ReadHandler) Close() {
205
205
+
r.lrm.mu.Lock()
206
206
+
defer r.lrm.mu.Unlock()
207
207
+
208
208
+
if r.closed {
209
209
+
panic("reader illegal state: must not Close() more than once")
210
210
+
}
211
211
+
212
212
+
delete(r.lrm.readHandlers, r)
213
213
+
runtime.SetFinalizer(r, nil)
214
214
+
r.closed = true
215
215
+
}
+124
lrmap_test.go
···
1
1
+
package lrmap
2
2
+
3
3
+
import (
4
4
+
"math"
5
5
+
"testing"
6
6
+
)
7
7
+
8
8
+
func TestHundred(t *testing.T) {
9
9
+
lrm := New()
10
10
+
11
11
+
rh := lrm.NewReadHandler()
12
12
+
defer rh.Close()
13
13
+
14
14
+
for i := 1; i < 100; i += 2 {
15
15
+
k, v := Key(i), Value(i)
16
16
+
17
17
+
lrm.Set(k, v)
18
18
+
}
19
19
+
20
20
+
lrm.Flush()
21
21
+
22
22
+
for i := 1; i < 100; i += 2 {
23
23
+
k, v := Key(i), Value(i)
24
24
+
25
25
+
rh.Enter()
26
26
+
if _v := rh.Get(k); _v != v {
27
27
+
28
28
+
t.Errorf("Get(%d) want %d, got %d", k, v, _v)
29
29
+
}
30
30
+
rh.Leave()
31
31
+
}
32
32
+
33
33
+
for i := 0; i < 100; i += 2 {
34
34
+
k, v := Key(i), Value(i)
35
35
+
lrm.Set(k, v)
36
36
+
}
37
37
+
38
38
+
lrm.Flush()
39
39
+
40
40
+
for i := 0; i < 100; i++ {
41
41
+
rh.Enter()
42
42
+
k, v := Key(i), Value(i)
43
43
+
44
44
+
if _v := rh.Get(k); _v != v {
45
45
+
t.Errorf("Get(%d) want %d, got %d", k, v, _v)
46
46
+
}
47
47
+
rh.Leave()
48
48
+
}
49
49
+
50
50
+
lrm.Flush()
51
51
+
52
52
+
rh.Enter()
53
53
+
for i := 0; i < 100; i++ {
54
54
+
k, v := Key(i), Value(i)
55
55
+
56
56
+
if _v := rh.Get(k); _v != v {
57
57
+
t.Errorf("Get(%d) want %d, got %d", k, v, _v)
58
58
+
}
59
59
+
}
60
60
+
rh.Leave()
61
61
+
62
62
+
for i := 0; i < 100; i += 2 {
63
63
+
k := Key(i)
64
64
+
lrm.Delete(k)
65
65
+
}
66
66
+
67
67
+
lrm.Flush()
68
68
+
69
69
+
rh.Enter()
70
70
+
for i := 0; i < 100; i++ {
71
71
+
k, v := Key(i), Value(i)
72
72
+
73
73
+
_v, ok := rh.GetOK(k)
74
74
+
if i%2 == 0 {
75
75
+
if ok || _v != 0 {
76
76
+
t.Errorf("GetOK(%d), want (0, false), got (%d, %t)", i, _v, ok)
77
77
+
}
78
78
+
} else {
79
79
+
if !ok || _v != v {
80
80
+
t.Errorf("GetOK(%d), want (%d, true), got (%d, %t)", i, v, _v, ok)
81
81
+
}
82
82
+
}
83
83
+
}
84
84
+
rh.Leave()
85
85
+
86
86
+
lrm.Flush()
87
87
+
88
88
+
rh.Enter()
89
89
+
for i := 0; i < 100; i++ {
90
90
+
k, v := Key(i), Value(i)
91
91
+
92
92
+
_v, ok := rh.GetOK(k)
93
93
+
if i%2 == 0 {
94
94
+
if ok || _v != 0 {
95
95
+
t.Errorf("GetOK(%d), want (0, false), got (%d, %t)", i, _v, ok)
96
96
+
}
97
97
+
} else {
98
98
+
if !ok || _v != v {
99
99
+
t.Errorf("GetOK(%d), want (%d, true), got (%d, %t)", i, v, _v, ok)
100
100
+
}
101
101
+
}
102
102
+
}
103
103
+
rh.Leave()
104
104
+
}
105
105
+
106
106
+
func TestEpochOverflow(t *testing.T) {
107
107
+
var maxEven uint64 = math.MaxUint64 - 1
108
108
+
t.Logf("maxEven(%d)", maxEven)
109
109
+
if maxEven%2 == 1 {
110
110
+
t.Errorf("maxEven(%d) is odd", maxEven)
111
111
+
}
112
112
+
113
113
+
maxOdd := maxEven + 1
114
114
+
t.Logf("maxOdd(%d)", maxOdd)
115
115
+
if maxOdd%2 == 0 {
116
116
+
t.Errorf("maxOdd(%d) is even", maxOdd)
117
117
+
}
118
118
+
119
119
+
overflow := maxOdd + 1
120
120
+
t.Logf("overflow(%d)", overflow)
121
121
+
if overflow%2 == 1 {
122
122
+
t.Errorf("overflow(%d) is odd", overflow)
123
123
+
}
124
124
+
}