A B+Tree Storage Engine
1package buffer
2
3import (
4 "fmt"
5 "sync"
6 "sync/atomic"
7
8 "github.com/jobala/petro/storage/disk"
9)
10
11const BUFFER_CAPACITY = 20
12
13func NewBufferpoolManager(size int, replacer *lrukReplacer, diskScheduler *disk.DiskScheduler) *BufferpoolManager {
14 frames := make([]*frame, size)
15 freeFrames := make([]int, size)
16
17 for i := range size {
18 f := &frame{
19 id: i,
20 data: make([]byte, disk.PAGE_SIZE),
21 }
22
23 frames[i] = f
24 freeFrames[i] = i
25 }
26
27 bpm := &BufferpoolManager{
28 mu: sync.Mutex{},
29 frames: frames,
30 pageTable: make(map[int64]int),
31 replacer: replacer,
32 diskScheduler: diskScheduler,
33 freeFrames: freeFrames,
34 }
35 bpm.cond = *sync.NewCond(&bpm.mu)
36 return bpm
37}
38
39func (b *BufferpoolManager) ReadPage(pageId int64) (*ReadPageGuard, error) {
40 b.mu.Lock()
41 defer b.mu.Unlock()
42 var frame *frame
43
44 for {
45 if id, ok := b.pageTable[pageId]; ok {
46 frame := b.frames[id]
47
48 b.replacer.recordAccess(frame.id)
49 b.replacer.setEvictable(frame.id, false)
50 frame.mu.RLock()
51 frame.pin()
52
53 return NewReadPageGuard(frame, b), nil
54 }
55
56 // try to get a frame
57 if len(b.freeFrames) > 0 {
58 id := b.freeFrames[0]
59 frame = b.frames[id]
60 b.freeFrames = b.freeFrames[1:]
61 } else {
62 if id, _ := b.replacer.evict(); id != disk.INVALID_PAGE_ID {
63 frame = b.frames[id]
64 b.flush(frame)
65 }
66 }
67
68 // got a frame
69 if frame != nil {
70 delete(b.pageTable, frame.pageId)
71 b.pageTable[pageId] = frame.id
72
73 b.replacer.recordAccess(frame.id)
74 b.replacer.setEvictable(frame.id, false)
75
76 frame.mu.RLock()
77 frame.reset()
78 frame.pin()
79 frame.pageId = pageId
80 diskReq := disk.NewRequest(pageId, nil, false)
81 respCh := b.diskScheduler.Schedule(diskReq)
82 resp := <-respCh
83 copy(frame.data, resp.Data)
84
85 return NewReadPageGuard(frame, b), nil
86 }
87
88 // failed to get a frame, wait for a frame to become available
89 b.cond.Wait()
90 }
91}
92
93func (b *BufferpoolManager) WritePage(pageId int64) (*WritePageGuard, error) {
94 b.mu.Lock()
95 defer b.mu.Unlock()
96
97 var frame *frame
98
99 for {
100 if id, ok := b.pageTable[pageId]; ok {
101 frame := b.frames[id]
102
103 b.replacer.recordAccess(frame.id)
104 b.replacer.setEvictable(frame.id, false)
105 frame.mu.Lock()
106 frame.pin()
107 frame.dirty = true
108
109 return NewWritePageGuard(frame, b), nil
110 }
111
112 // try getting a frame
113 if len(b.freeFrames) > 0 {
114 id := b.freeFrames[0]
115 frame = b.frames[id]
116 b.freeFrames = b.freeFrames[1:]
117 } else {
118 if id, _ := b.replacer.evict(); id != disk.INVALID_PAGE_ID {
119 frame = b.frames[id]
120 b.flush(frame)
121 }
122 }
123
124 // got the frame, return a page guard
125 if frame != nil {
126 delete(b.pageTable, frame.pageId)
127 b.pageTable[pageId] = frame.id
128
129 b.replacer.recordAccess(frame.id)
130 b.replacer.setEvictable(frame.id, false)
131
132 frame.mu.Lock()
133 frame.reset()
134 frame.pin()
135 frame.dirty = true
136 frame.pageId = pageId
137
138 diskReq := disk.NewRequest(pageId, nil, false)
139 respCh := b.diskScheduler.Schedule(diskReq)
140 resp := <-respCh
141 copy(frame.data, resp.Data)
142 return NewWritePageGuard(frame, b), nil
143 }
144
145 // failed to get a frame, wait for a frame to become available
146 // pageGuard.Drop will send a signal
147 fmt.Println("waiting for a frame to become available")
148 b.cond.Wait()
149 }
150}
151
152func (b *BufferpoolManager) NewPageId() int64 {
153 return b.nextPageId.Add(1)
154}
155
156func (b *BufferpoolManager) FlushAll() {
157 for _, frame := range b.frames {
158 b.flush(frame)
159 }
160}
161
162func (b *BufferpoolManager) flush(frame *frame) {
163 if frame.dirty {
164 writeReq := disk.NewRequest(frame.pageId, frame.data, true)
165 respCh := b.diskScheduler.Schedule(writeReq)
166
167 // block until data is written to disk
168 <-respCh
169 }
170}
171
172type BufferpoolManager struct {
173 mu sync.Mutex
174 frames []*frame
175 pageTable map[int64]int
176 nextPageId atomic.Int64
177 diskScheduler *disk.DiskScheduler
178 replacer *lrukReplacer
179 freeFrames []int
180 cond sync.Cond
181}