1package events
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "sync"
11 "time"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 lexutil "github.com/bluesky-social/indigo/lex/util"
15 "github.com/bluesky-social/indigo/models"
16 "github.com/prometheus/client_golang/prometheus"
17
18 cbg "github.com/whyrusleeping/cbor-gen"
19 "go.opentelemetry.io/otel"
20)
21
22var log = slog.Default().With("system", "events")
23
24type Scheduler interface {
25 AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error
26 Shutdown()
27}
28
29type EventManager struct {
30 subs []*Subscriber
31 subsLk sync.Mutex
32
33 bufferSize int
34 crossoverBufferSize int
35
36 persister EventPersistence
37
38 log *slog.Logger
39}
40
41func NewEventManager(persister EventPersistence) *EventManager {
42 em := &EventManager{
43 bufferSize: 16 << 10,
44 crossoverBufferSize: 512,
45 persister: persister,
46 log: slog.Default().With("system", "events"),
47 }
48
49 persister.SetEventBroadcaster(em.broadcastEvent)
50
51 return em
52}
53
54const (
55 opSubscribe = iota
56 opUnsubscribe
57 opSend
58)
59
60type Operation struct {
61 op int
62 sub *Subscriber
63 evt *XRPCStreamEvent
64}
65
66func (em *EventManager) Shutdown(ctx context.Context) error {
67 return em.persister.Shutdown(ctx)
68}
69
70func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
71 // the main thing we do is send it out, so MarshalCBOR once
72 if err := evt.Preserialize(); err != nil {
73 em.log.Error("broadcast serialize failed", "err", err)
74 // serialize isn't going to go better later, this event is cursed
75 return
76 }
77
78 em.subsLk.Lock()
79 defer em.subsLk.Unlock()
80
81 // TODO: for a larger fanout we should probably have dedicated goroutines
82 // for subsets of the subscriber set, and tiered channels to distribute
83 // events out to them, or some similar architecture
84 // Alternatively, we might just want to not allow too many subscribers
85 // directly to the bgs, and have rebroadcasting proxies instead
86 for _, s := range em.subs {
87 if s.filter(evt) {
88 s.enqueuedCounter.Inc()
89 select {
90 case s.outgoing <- evt:
91 case <-s.done:
92 default:
93 // filter out all future messages that would be
94 // sent to this subscriber, but wait for it to
95 // actually be removed by the correct bit of
96 // code
97 s.filter = func(*XRPCStreamEvent) bool { return false }
98
99 em.log.Warn("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident)
100 go func(torem *Subscriber) {
101 torem.lk.Lock()
102 if !torem.cleanedUp {
103 select {
104 case torem.outgoing <- &XRPCStreamEvent{
105 Error: &ErrorFrame{
106 Error: "ConsumerTooSlow",
107 },
108 }:
109 case <-time.After(time.Second * 5):
110 em.log.Warn("failed to send error frame to backed up consumer", "ident", torem.ident)
111 }
112 }
113 torem.lk.Unlock()
114 torem.cleanup()
115 }(s)
116 }
117 s.broadcastCounter.Inc()
118 }
119 }
120}
121
122func (em *EventManager) persistAndSendEvent(ctx context.Context, evt *XRPCStreamEvent) {
123 // TODO: can cut 5-10% off of disk persister benchmarks by making this function
124 // accept a uid. The lookup inside the persister is notably expensive (despite
125 // being an lru cache?)
126 if err := em.persister.Persist(ctx, evt); err != nil {
127 em.log.Error("failed to persist outbound event", "err", err)
128 }
129}
130
131type Subscriber struct {
132 outgoing chan *XRPCStreamEvent
133
134 filter func(*XRPCStreamEvent) bool
135
136 done chan struct{}
137
138 cleanup func()
139
140 lk sync.Mutex
141 cleanedUp bool
142
143 ident string
144 enqueuedCounter prometheus.Counter
145 broadcastCounter prometheus.Counter
146}
147
148const (
149 EvtKindErrorFrame = -1
150 EvtKindMessage = 1
151)
152
153type EventHeader struct {
154 Op int64 `cborgen:"op"`
155 MsgType string `cborgen:"t,omitempty"`
156}
157
158var (
159 // AccountStatusActive is not in the spec but used internally
160 // the alternative would be an additional SQL column for "active" or status="" to imply active
161 AccountStatusActive = "active"
162
163 AccountStatusDeactivated = "deactivated"
164 AccountStatusDeleted = "deleted"
165 AccountStatusDesynchronized = "desynchronized"
166 AccountStatusSuspended = "suspended"
167 AccountStatusTakendown = "takendown"
168 AccountStatusThrottled = "throttled"
169)
170
171var AccountStatusList = []string{
172 AccountStatusActive,
173 AccountStatusDeactivated,
174 AccountStatusDeleted,
175 AccountStatusDesynchronized,
176 AccountStatusSuspended,
177 AccountStatusTakendown,
178 AccountStatusThrottled,
179}
180var AccountStatuses map[string]bool
181
182func init() {
183 AccountStatuses = make(map[string]bool, len(AccountStatusList))
184 for _, status := range AccountStatusList {
185 AccountStatuses[status] = true
186 }
187}
188
189type XRPCStreamEvent struct {
190 Error *ErrorFrame
191 RepoCommit *comatproto.SyncSubscribeRepos_Commit
192 RepoSync *comatproto.SyncSubscribeRepos_Sync
193 RepoIdentity *comatproto.SyncSubscribeRepos_Identity
194 RepoInfo *comatproto.SyncSubscribeRepos_Info
195 RepoAccount *comatproto.SyncSubscribeRepos_Account
196 LabelLabels *comatproto.LabelSubscribeLabels_Labels
197 LabelInfo *comatproto.LabelSubscribeLabels_Info
198
199 // some private fields for internal routing perf
200 PrivUid models.Uid `json:"-" cborgen:"-"`
201 PrivPdsId uint `json:"-" cborgen:"-"`
202 PrivRelevantPds []uint `json:"-" cborgen:"-"`
203 Preserialized []byte `json:"-" cborgen:"-"`
204}
205
206func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error {
207 header := EventHeader{Op: EvtKindMessage}
208 var obj lexutil.CBOR
209
210 switch {
211 case evt.Error != nil:
212 header.Op = EvtKindErrorFrame
213 obj = evt.Error
214 case evt.RepoCommit != nil:
215 header.MsgType = "#commit"
216 obj = evt.RepoCommit
217 case evt.RepoSync != nil:
218 header.MsgType = "#sync"
219 obj = evt.RepoSync
220 case evt.RepoIdentity != nil:
221 header.MsgType = "#identity"
222 obj = evt.RepoIdentity
223 case evt.RepoAccount != nil:
224 header.MsgType = "#account"
225 obj = evt.RepoAccount
226 case evt.RepoInfo != nil:
227 header.MsgType = "#info"
228 obj = evt.RepoInfo
229 default:
230 return fmt.Errorf("unrecognized event kind")
231 }
232
233 cborWriter := cbg.NewCborWriter(wc)
234 if err := header.MarshalCBOR(cborWriter); err != nil {
235 return fmt.Errorf("failed to write header: %w", err)
236 }
237 return obj.MarshalCBOR(cborWriter)
238}
239
240func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error {
241 var header EventHeader
242 if err := header.UnmarshalCBOR(r); err != nil {
243 return fmt.Errorf("reading header: %w", err)
244 }
245 switch header.Op {
246 case EvtKindMessage:
247 switch header.MsgType {
248 case "#commit":
249 var evt comatproto.SyncSubscribeRepos_Commit
250 if err := evt.UnmarshalCBOR(r); err != nil {
251 return fmt.Errorf("reading repoCommit event: %w", err)
252 }
253 xevt.RepoCommit = &evt
254 case "#sync":
255 var evt comatproto.SyncSubscribeRepos_Sync
256 if err := evt.UnmarshalCBOR(r); err != nil {
257 return fmt.Errorf("reading repoSync event: %w", err)
258 }
259 xevt.RepoSync = &evt
260 case "#identity":
261 var evt comatproto.SyncSubscribeRepos_Identity
262 if err := evt.UnmarshalCBOR(r); err != nil {
263 return err
264 }
265 xevt.RepoIdentity = &evt
266 case "#account":
267 var evt comatproto.SyncSubscribeRepos_Account
268 if err := evt.UnmarshalCBOR(r); err != nil {
269 return err
270 }
271 xevt.RepoAccount = &evt
272 case "#info":
273 // TODO: this might also be a LabelInfo (as opposed to RepoInfo)
274 var evt comatproto.SyncSubscribeRepos_Info
275 if err := evt.UnmarshalCBOR(r); err != nil {
276 return err
277 }
278 xevt.RepoInfo = &evt
279 case "#labels":
280 var evt comatproto.LabelSubscribeLabels_Labels
281 if err := evt.UnmarshalCBOR(r); err != nil {
282 return fmt.Errorf("reading Labels event: %w", err)
283 }
284 xevt.LabelLabels = &evt
285 }
286 case EvtKindErrorFrame:
287 var errframe ErrorFrame
288 if err := errframe.UnmarshalCBOR(r); err != nil {
289 return err
290 }
291 xevt.Error = &errframe
292 default:
293 return fmt.Errorf("unrecognized event stream type: %d", header.Op)
294 }
295 return nil
296}
297
298var ErrNoSeq = errors.New("event has no sequence number")
299
300// serialize content into Preserialized cache
301func (evt *XRPCStreamEvent) Preserialize() error {
302 if evt.Preserialized != nil {
303 return nil
304 }
305 var buf bytes.Buffer
306 err := evt.Serialize(&buf)
307 if err != nil {
308 return err
309 }
310 evt.Preserialized = buf.Bytes()
311 return nil
312}
313
314type ErrorFrame struct {
315 Error string `cborgen:"error"`
316 Message string `cborgen:"message"`
317}
318
319func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error {
320 ctx, span := otel.Tracer("events").Start(ctx, "AddEvent")
321 defer span.End()
322
323 em.persistAndSendEvent(ctx, ev)
324 return nil
325}
326
327var (
328 ErrPlaybackShutdown = fmt.Errorf("playback shutting down")
329 ErrCaughtUp = fmt.Errorf("caught up")
330)
331
332func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error) {
333 if filter == nil {
334 filter = func(*XRPCStreamEvent) bool { return true }
335 }
336
337 done := make(chan struct{})
338 sub := &Subscriber{
339 ident: ident,
340 outgoing: make(chan *XRPCStreamEvent, em.bufferSize),
341 filter: filter,
342 done: done,
343 enqueuedCounter: eventsEnqueued.WithLabelValues(ident),
344 broadcastCounter: eventsBroadcast.WithLabelValues(ident),
345 }
346
347 sub.cleanup = sync.OnceFunc(func() {
348 sub.lk.Lock()
349 defer sub.lk.Unlock()
350 close(done)
351 em.rmSubscriber(sub)
352 close(sub.outgoing)
353 sub.cleanedUp = true
354 })
355
356 if since == nil {
357 em.addSubscriber(sub)
358 return sub.outgoing, sub.cleanup, nil
359 }
360
361 out := make(chan *XRPCStreamEvent, em.crossoverBufferSize)
362
363 go func() {
364 lastSeq := *since
365 // run playback to get through *most* of the events, getting our current cursor close to realtime
366 if err := em.persister.Playback(ctx, *since, func(e *XRPCStreamEvent) error {
367 select {
368 case <-done:
369 return ErrPlaybackShutdown
370 case out <- e:
371 seq := SequenceForEvent(e)
372 if seq > 0 {
373 lastSeq = seq
374 }
375 return nil
376 }
377 }); err != nil {
378 if errors.Is(err, ErrPlaybackShutdown) {
379 em.log.Warn("events playback", "err", err)
380 } else {
381 em.log.Error("events playback", "err", err)
382 }
383
384 // TODO: send an error frame or something?
385 // NOTE: not doing em.rmSubscriber(sub) here because it hasn't been added yet
386 close(out)
387 return
388 }
389
390 // now, start buffering events from the live stream
391 em.addSubscriber(sub)
392
393 // ensure that we clean up any return paths from here out, after having added the subscriber. Note that `out` is not `sub.output`, so needs to be closed separately.
394 defer func() {
395 close(out)
396 em.rmSubscriber(sub)
397 }()
398
399 first := <-sub.outgoing
400
401 // run playback again to get us to the events that have started buffering
402 if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error {
403 seq := SequenceForEvent(e)
404 if seq > SequenceForEvent(first) {
405 return ErrCaughtUp
406 }
407
408 select {
409 case <-done:
410 return ErrPlaybackShutdown
411 case out <- e:
412 return nil
413 }
414 }); err != nil {
415 if !errors.Is(err, ErrCaughtUp) {
416 em.log.Error("events playback", "err", err)
417 return
418 }
419 }
420
421 // now that we are caught up, just copy events from the channel over
422 for evt := range sub.outgoing {
423 select {
424 case out <- evt:
425 case <-done:
426 return
427 }
428 }
429 }()
430
431 return out, sub.cleanup, nil
432}
433
434func SequenceForEvent(evt *XRPCStreamEvent) int64 {
435 return evt.Sequence()
436}
437
438func (evt *XRPCStreamEvent) Sequence() int64 {
439 switch {
440 case evt == nil:
441 return -1
442 case evt.RepoCommit != nil:
443 return evt.RepoCommit.Seq
444 case evt.RepoSync != nil:
445 return evt.RepoSync.Seq
446 case evt.RepoIdentity != nil:
447 return evt.RepoIdentity.Seq
448 case evt.RepoAccount != nil:
449 return evt.RepoAccount.Seq
450 case evt.RepoInfo != nil:
451 return -1
452 case evt.Error != nil:
453 return -1
454 default:
455 return -1
456 }
457}
458
459func (evt *XRPCStreamEvent) GetSequence() (int64, bool) {
460 switch {
461 case evt == nil:
462 return -1, false
463 case evt.RepoCommit != nil:
464 return evt.RepoCommit.Seq, true
465 case evt.RepoSync != nil:
466 return evt.RepoSync.Seq, true
467 case evt.RepoIdentity != nil:
468 return evt.RepoIdentity.Seq, true
469 case evt.RepoAccount != nil:
470 return evt.RepoAccount.Seq, true
471 case evt.RepoInfo != nil:
472 return -1, false
473 case evt.Error != nil:
474 return -1, false
475 default:
476 return -1, false
477 }
478}
479
480func (em *EventManager) rmSubscriber(sub *Subscriber) {
481 em.subsLk.Lock()
482 defer em.subsLk.Unlock()
483
484 for i, s := range em.subs {
485 if s == sub {
486 em.subs[i] = em.subs[len(em.subs)-1]
487 em.subs = em.subs[:len(em.subs)-1]
488 break
489 }
490 }
491}
492
493func (em *EventManager) addSubscriber(sub *Subscriber) {
494 em.subsLk.Lock()
495 defer em.subsLk.Unlock()
496
497 em.subs = append(em.subs, sub)
498}
499
500func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error {
501 return em.persister.TakeDownRepo(ctx, user)
502}