A B+Tree Storage Engine
at main 181 lines 3.9 kB view raw
1package buffer 2 3import ( 4 "fmt" 5 "sync" 6 "sync/atomic" 7 8 "github.com/jobala/petro/storage/disk" 9) 10 11const BUFFER_CAPACITY = 20 12 13func NewBufferpoolManager(size int, replacer *lrukReplacer, diskScheduler *disk.DiskScheduler) *BufferpoolManager { 14 frames := make([]*frame, size) 15 freeFrames := make([]int, size) 16 17 for i := range size { 18 f := &frame{ 19 id: i, 20 data: make([]byte, disk.PAGE_SIZE), 21 } 22 23 frames[i] = f 24 freeFrames[i] = i 25 } 26 27 bpm := &BufferpoolManager{ 28 mu: sync.Mutex{}, 29 frames: frames, 30 pageTable: make(map[int64]int), 31 replacer: replacer, 32 diskScheduler: diskScheduler, 33 freeFrames: freeFrames, 34 } 35 bpm.cond = *sync.NewCond(&bpm.mu) 36 return bpm 37} 38 39func (b *BufferpoolManager) ReadPage(pageId int64) (*ReadPageGuard, error) { 40 b.mu.Lock() 41 defer b.mu.Unlock() 42 var frame *frame 43 44 for { 45 if id, ok := b.pageTable[pageId]; ok { 46 frame := b.frames[id] 47 48 b.replacer.recordAccess(frame.id) 49 b.replacer.setEvictable(frame.id, false) 50 frame.mu.RLock() 51 frame.pin() 52 53 return NewReadPageGuard(frame, b), nil 54 } 55 56 // try to get a frame 57 if len(b.freeFrames) > 0 { 58 id := b.freeFrames[0] 59 frame = b.frames[id] 60 b.freeFrames = b.freeFrames[1:] 61 } else { 62 if id, _ := b.replacer.evict(); id != disk.INVALID_PAGE_ID { 63 frame = b.frames[id] 64 b.flush(frame) 65 } 66 } 67 68 // got a frame 69 if frame != nil { 70 delete(b.pageTable, frame.pageId) 71 b.pageTable[pageId] = frame.id 72 73 b.replacer.recordAccess(frame.id) 74 b.replacer.setEvictable(frame.id, false) 75 76 frame.mu.RLock() 77 frame.reset() 78 frame.pin() 79 frame.pageId = pageId 80 diskReq := disk.NewRequest(pageId, nil, false) 81 respCh := b.diskScheduler.Schedule(diskReq) 82 resp := <-respCh 83 copy(frame.data, resp.Data) 84 85 return NewReadPageGuard(frame, b), nil 86 } 87 88 // failed to get a frame, wait for a frame to become available 89 b.cond.Wait() 90 } 91} 92 93func (b *BufferpoolManager) WritePage(pageId int64) (*WritePageGuard, error) { 94 b.mu.Lock() 95 defer b.mu.Unlock() 96 97 var frame *frame 98 99 for { 100 if id, ok := b.pageTable[pageId]; ok { 101 frame := b.frames[id] 102 103 b.replacer.recordAccess(frame.id) 104 b.replacer.setEvictable(frame.id, false) 105 frame.mu.Lock() 106 frame.pin() 107 frame.dirty = true 108 109 return NewWritePageGuard(frame, b), nil 110 } 111 112 // try getting a frame 113 if len(b.freeFrames) > 0 { 114 id := b.freeFrames[0] 115 frame = b.frames[id] 116 b.freeFrames = b.freeFrames[1:] 117 } else { 118 if id, _ := b.replacer.evict(); id != disk.INVALID_PAGE_ID { 119 frame = b.frames[id] 120 b.flush(frame) 121 } 122 } 123 124 // got the frame, return a page guard 125 if frame != nil { 126 delete(b.pageTable, frame.pageId) 127 b.pageTable[pageId] = frame.id 128 129 b.replacer.recordAccess(frame.id) 130 b.replacer.setEvictable(frame.id, false) 131 132 frame.mu.Lock() 133 frame.reset() 134 frame.pin() 135 frame.dirty = true 136 frame.pageId = pageId 137 138 diskReq := disk.NewRequest(pageId, nil, false) 139 respCh := b.diskScheduler.Schedule(diskReq) 140 resp := <-respCh 141 copy(frame.data, resp.Data) 142 return NewWritePageGuard(frame, b), nil 143 } 144 145 // failed to get a frame, wait for a frame to become available 146 // pageGuard.Drop will send a signal 147 fmt.Println("waiting for a frame to become available") 148 b.cond.Wait() 149 } 150} 151 152func (b *BufferpoolManager) NewPageId() int64 { 153 return b.nextPageId.Add(1) 154} 155 156func (b *BufferpoolManager) FlushAll() { 157 for _, frame := range b.frames { 158 b.flush(frame) 159 } 160} 161 162func (b *BufferpoolManager) flush(frame *frame) { 163 if frame.dirty { 164 writeReq := disk.NewRequest(frame.pageId, frame.data, true) 165 respCh := b.diskScheduler.Schedule(writeReq) 166 167 // block until data is written to disk 168 <-respCh 169 } 170} 171 172type BufferpoolManager struct { 173 mu sync.Mutex 174 frames []*frame 175 pageTable map[int64]int 176 nextPageId atomic.Int64 177 diskScheduler *disk.DiskScheduler 178 replacer *lrukReplacer 179 freeFrames []int 180 cond sync.Cond 181}