fork of indigo with slightly nicer lexgen
at main 2.9 kB view raw
1package splitter 2 3import ( 4 "context" 5 "sync" 6 7 "github.com/bluesky-social/indigo/events" 8 "github.com/bluesky-social/indigo/models" 9) 10 11func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer { 12 return &EventRingBuffer{ 13 chunkSize: chunkSize, 14 maxChunkCount: nchunks, 15 } 16} 17 18type EventRingBuffer struct { 19 lk sync.Mutex 20 chunks []*ringChunk 21 chunkSize int 22 maxChunkCount int 23 24 broadcast func(*events.XRPCStreamEvent) 25} 26 27type ringChunk struct { 28 lk sync.Mutex 29 buf []*events.XRPCStreamEvent 30} 31 32func (rc *ringChunk) append(evt *events.XRPCStreamEvent) { 33 rc.lk.Lock() 34 defer rc.lk.Unlock() 35 rc.buf = append(rc.buf, evt) 36} 37 38func (rc *ringChunk) events() []*events.XRPCStreamEvent { 39 rc.lk.Lock() 40 defer rc.lk.Unlock() 41 return rc.buf 42} 43 44func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error { 45 er.lk.Lock() 46 defer er.lk.Unlock() 47 48 if len(er.chunks) == 0 { 49 er.chunks = []*ringChunk{new(ringChunk)} 50 } 51 52 last := er.chunks[len(er.chunks)-1] 53 if len(last.buf) >= er.chunkSize { 54 last = new(ringChunk) 55 er.chunks = append(er.chunks, last) 56 if len(er.chunks) > er.maxChunkCount { 57 er.chunks = er.chunks[1:] 58 } 59 } 60 61 last.append(evt) 62 63 er.broadcast(evt) 64 return nil 65} 66 67func (er *EventRingBuffer) Flush(context.Context) error { 68 return nil 69} 70 71func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 72 // run playback a few times to get as close to 'live' as possible before returning 73 for i := 0; i < 10; i++ { 74 n, err := er.playbackRound(ctx, since, cb) 75 if err != nil { 76 return err 77 } 78 79 // playback had no new events 80 if n-since == 0 { 81 return nil 82 } 83 since = n 84 } 85 86 return nil 87} 88 89func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) { 90 // grab a snapshot of the current chunks 91 er.lk.Lock() 92 chunks := er.chunks 93 er.lk.Unlock() 94 95 i := len(chunks) - 1 96 for ; i >= 0; i-- { 97 c := chunks[i] 98 evts := c.events() 99 if since > events.SequenceForEvent(evts[len(evts)-1]) { 100 i++ 101 break 102 } 103 } 104 if i < 0 { 105 i = 0 106 } 107 108 var lastSeq int64 = since 109 for _, c := range chunks[i:] { 110 var nread int 111 evts := c.events() 112 for nread < len(evts) { 113 for _, e := range evts[nread:] { 114 nread++ 115 seq := events.SequenceForEvent(e) 116 if seq <= since { 117 continue 118 } 119 120 if err := cb(e); err != nil { 121 return 0, err 122 } 123 lastSeq = seq 124 } 125 126 // recheck evts buffer to see if more were added while we were here 127 evts = c.events() 128 } 129 } 130 131 return lastSeq, nil 132} 133 134func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 135 er.broadcast = brc 136} 137 138func (er *EventRingBuffer) Shutdown(context.Context) error { 139 return nil 140} 141 142func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error { 143 return nil 144}