1package splitter
2
3import (
4 "context"
5 "sync"
6
7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/models"
9)
10
11func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer {
12 return &EventRingBuffer{
13 chunkSize: chunkSize,
14 maxChunkCount: nchunks,
15 }
16}
17
18type EventRingBuffer struct {
19 lk sync.Mutex
20 chunks []*ringChunk
21 chunkSize int
22 maxChunkCount int
23
24 broadcast func(*events.XRPCStreamEvent)
25}
26
27type ringChunk struct {
28 lk sync.Mutex
29 buf []*events.XRPCStreamEvent
30}
31
32func (rc *ringChunk) append(evt *events.XRPCStreamEvent) {
33 rc.lk.Lock()
34 defer rc.lk.Unlock()
35 rc.buf = append(rc.buf, evt)
36}
37
38func (rc *ringChunk) events() []*events.XRPCStreamEvent {
39 rc.lk.Lock()
40 defer rc.lk.Unlock()
41 return rc.buf
42}
43
44func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error {
45 er.lk.Lock()
46 defer er.lk.Unlock()
47
48 if len(er.chunks) == 0 {
49 er.chunks = []*ringChunk{new(ringChunk)}
50 }
51
52 last := er.chunks[len(er.chunks)-1]
53 if len(last.buf) >= er.chunkSize {
54 last = new(ringChunk)
55 er.chunks = append(er.chunks, last)
56 if len(er.chunks) > er.maxChunkCount {
57 er.chunks = er.chunks[1:]
58 }
59 }
60
61 last.append(evt)
62
63 er.broadcast(evt)
64 return nil
65}
66
67func (er *EventRingBuffer) Flush(context.Context) error {
68 return nil
69}
70
71func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
72 // run playback a few times to get as close to 'live' as possible before returning
73 for i := 0; i < 10; i++ {
74 n, err := er.playbackRound(ctx, since, cb)
75 if err != nil {
76 return err
77 }
78
79 // playback had no new events
80 if n-since == 0 {
81 return nil
82 }
83 since = n
84 }
85
86 return nil
87}
88
89func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) {
90 // grab a snapshot of the current chunks
91 er.lk.Lock()
92 chunks := er.chunks
93 er.lk.Unlock()
94
95 i := len(chunks) - 1
96 for ; i >= 0; i-- {
97 c := chunks[i]
98 evts := c.events()
99 if since > events.SequenceForEvent(evts[len(evts)-1]) {
100 i++
101 break
102 }
103 }
104 if i < 0 {
105 i = 0
106 }
107
108 var lastSeq int64 = since
109 for _, c := range chunks[i:] {
110 var nread int
111 evts := c.events()
112 for nread < len(evts) {
113 for _, e := range evts[nread:] {
114 nread++
115 seq := events.SequenceForEvent(e)
116 if seq <= since {
117 continue
118 }
119
120 if err := cb(e); err != nil {
121 return 0, err
122 }
123 lastSeq = seq
124 }
125
126 // recheck evts buffer to see if more were added while we were here
127 evts = c.events()
128 }
129 }
130
131 return lastSeq, nil
132}
133
134func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
135 er.broadcast = brc
136}
137
138func (er *EventRingBuffer) Shutdown(context.Context) error {
139 return nil
140}
141
142func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error {
143 return nil
144}