fork of indigo with slightly nicer lexgen
at main 2.3 kB view raw
1package events 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 8 "github.com/bluesky-social/indigo/models" 9) 10 11// Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels 12type EventPersistence interface { 13 // Persist may mutate contents of *XRPCStreamEvent and what it points to 14 Persist(ctx context.Context, e *XRPCStreamEvent) error 15 Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error 16 TakeDownRepo(ctx context.Context, usr models.Uid) error 17 Flush(context.Context) error 18 Shutdown(context.Context) error 19 20 SetEventBroadcaster(func(*XRPCStreamEvent)) 21} 22 23// MemPersister is the most naive implementation of event persistence 24// This EventPersistence option works fine with all event types 25// ill do better later 26type MemPersister struct { 27 buf []*XRPCStreamEvent 28 lk sync.Mutex 29 seq int64 30 31 broadcast func(*XRPCStreamEvent) 32} 33 34func NewMemPersister() *MemPersister { 35 return &MemPersister{} 36} 37 38func (mp *MemPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error { 39 mp.lk.Lock() 40 defer mp.lk.Unlock() 41 mp.seq++ 42 switch { 43 case e.RepoCommit != nil: 44 e.RepoCommit.Seq = mp.seq 45 case e.RepoSync != nil: 46 e.RepoSync.Seq = mp.seq 47 case e.RepoIdentity != nil: 48 e.RepoIdentity.Seq = mp.seq 49 case e.RepoAccount != nil: 50 e.RepoAccount.Seq = mp.seq 51 case e.LabelLabels != nil: 52 e.LabelLabels.Seq = mp.seq 53 default: 54 panic("no event in persist call") 55 } 56 mp.buf = append(mp.buf, e) 57 58 mp.broadcast(e) 59 60 return nil 61} 62 63func (mp *MemPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 64 mp.lk.Lock() 65 l := len(mp.buf) 66 mp.lk.Unlock() 67 68 if since >= int64(l) { 69 return nil 70 } 71 72 // TODO: abusing the fact that buf[0].seq is currently always 1 73 for _, e := range mp.buf[since:l] { 74 if err := cb(e); err != nil { 75 return err 76 } 77 } 78 79 return nil 80} 81 82func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid models.Uid) error { 83 return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only") 84} 85 86func (mp *MemPersister) Flush(ctx context.Context) error { 87 return nil 88} 89 90func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 91 mp.broadcast = brc 92} 93 94func (mp *MemPersister) Shutdown(context.Context) error { 95 return nil 96}