1package events
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/bluesky-social/indigo/models"
9)
10
11// Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels
12type EventPersistence interface {
13 // Persist may mutate contents of *XRPCStreamEvent and what it points to
14 Persist(ctx context.Context, e *XRPCStreamEvent) error
15 Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error
16 TakeDownRepo(ctx context.Context, usr models.Uid) error
17 Flush(context.Context) error
18 Shutdown(context.Context) error
19
20 SetEventBroadcaster(func(*XRPCStreamEvent))
21}
22
23// MemPersister is the most naive implementation of event persistence
24// This EventPersistence option works fine with all event types
25// ill do better later
26type MemPersister struct {
27 buf []*XRPCStreamEvent
28 lk sync.Mutex
29 seq int64
30
31 broadcast func(*XRPCStreamEvent)
32}
33
34func NewMemPersister() *MemPersister {
35 return &MemPersister{}
36}
37
38func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error {
39 mp.lk.Lock()
40 defer mp.lk.Unlock()
41 mp.seq++
42 switch {
43 case e.RepoCommit != nil:
44 e.RepoCommit.Seq = mp.seq
45 case e.RepoSync != nil:
46 e.RepoSync.Seq = mp.seq
47 case e.RepoIdentity != nil:
48 e.RepoIdentity.Seq = mp.seq
49 case e.RepoAccount != nil:
50 e.RepoAccount.Seq = mp.seq
51 case e.LabelLabels != nil:
52 e.LabelLabels.Seq = mp.seq
53 default:
54 panic("no event in persist call")
55 }
56 mp.buf = append(mp.buf, e)
57
58 mp.broadcast(e)
59
60 return nil
61}
62
63func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
64 mp.lk.Lock()
65 l := len(mp.buf)
66 mp.lk.Unlock()
67
68 if since >= int64(l) {
69 return nil
70 }
71
72 // TODO: abusing the fact that buf[0].seq is currently always 1
73 for _, e := range mp.buf[since:l] {
74 if err := cb(e); err != nil {
75 return err
76 }
77 }
78
79 return nil
80}
81
82func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error {
83 return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only")
84}
85
86func (mp *MemPersister) Flush(ctx context.Context) error {
87 return nil
88}
89
90func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
91 mp.broadcast = brc
92}
93
94func (mp *MemPersister) Shutdown(context.Context) error {
95 return nil
96}