fork of indigo with slightly nicer lexgen
at main 13 kB view raw
1package events 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "sync" 11 "time" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 lexutil "github.com/bluesky-social/indigo/lex/util" 15 "github.com/bluesky-social/indigo/models" 16 "github.com/prometheus/client_golang/prometheus" 17 18 cbg "github.com/whyrusleeping/cbor-gen" 19 "go.opentelemetry.io/otel" 20) 21 22var log = slog.Default().With("system", "events") 23 24type Scheduler interface { 25 AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error 26 Shutdown() 27} 28 29type EventManager struct { 30 subs []*Subscriber 31 subsLk sync.Mutex 32 33 bufferSize int 34 crossoverBufferSize int 35 36 persister EventPersistence 37 38 log *slog.Logger 39} 40 41func NewEventManager(persister EventPersistence) *EventManager { 42 em := &EventManager{ 43 bufferSize: 16 << 10, 44 crossoverBufferSize: 512, 45 persister: persister, 46 log: slog.Default().With("system", "events"), 47 } 48 49 persister.SetEventBroadcaster(em.broadcastEvent) 50 51 return em 52} 53 54const ( 55 opSubscribe = iota 56 opUnsubscribe 57 opSend 58) 59 60type Operation struct { 61 op int 62 sub *Subscriber 63 evt *XRPCStreamEvent 64} 65 66func (em *EventManager) Shutdown(ctx context.Context) error { 67 return em.persister.Shutdown(ctx) 68} 69 70func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { 71 // the main thing we do is send it out, so MarshalCBOR once 72 if err := evt.Preserialize(); err != nil { 73 em.log.Error("broadcast serialize failed", "err", err) 74 // serialize isn't going to go better later, this event is cursed 75 return 76 } 77 78 em.subsLk.Lock() 79 defer em.subsLk.Unlock() 80 81 // TODO: for a larger fanout we should probably have dedicated goroutines 82 // for subsets of the subscriber set, and tiered channels to distribute 83 // events out to them, or some similar architecture 84 // Alternatively, we might just want to not allow too many subscribers 85 // directly to the bgs, and have rebroadcasting proxies instead 86 for _, s := range em.subs { 87 if s.filter(evt) { 88 s.enqueuedCounter.Inc() 89 select { 90 case s.outgoing <- evt: 91 case <-s.done: 92 default: 93 // filter out all future messages that would be 94 // sent to this subscriber, but wait for it to 95 // actually be removed by the correct bit of 96 // code 97 s.filter = func(*XRPCStreamEvent) bool { return false } 98 99 em.log.Warn("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) 100 go func(torem *Subscriber) { 101 torem.lk.Lock() 102 if !torem.cleanedUp { 103 select { 104 case torem.outgoing <- &XRPCStreamEvent{ 105 Error: &ErrorFrame{ 106 Error: "ConsumerTooSlow", 107 }, 108 }: 109 case <-time.After(time.Second * 5): 110 em.log.Warn("failed to send error frame to backed up consumer", "ident", torem.ident) 111 } 112 } 113 torem.lk.Unlock() 114 torem.cleanup() 115 }(s) 116 } 117 s.broadcastCounter.Inc() 118 } 119 } 120} 121 122func (em *EventManager) persistAndSendEvent(ctx context.Context, evt *XRPCStreamEvent) { 123 // TODO: can cut 5-10% off of disk persister benchmarks by making this function 124 // accept a uid. The lookup inside the persister is notably expensive (despite 125 // being an lru cache?) 126 if err := em.persister.Persist(ctx, evt); err != nil { 127 em.log.Error("failed to persist outbound event", "err", err) 128 } 129} 130 131type Subscriber struct { 132 outgoing chan *XRPCStreamEvent 133 134 filter func(*XRPCStreamEvent) bool 135 136 done chan struct{} 137 138 cleanup func() 139 140 lk sync.Mutex 141 cleanedUp bool 142 143 ident string 144 enqueuedCounter prometheus.Counter 145 broadcastCounter prometheus.Counter 146} 147 148const ( 149 EvtKindErrorFrame = -1 150 EvtKindMessage = 1 151) 152 153type EventHeader struct { 154 Op int64 `cborgen:"op"` 155 MsgType string `cborgen:"t,omitempty"` 156} 157 158var ( 159 // AccountStatusActive is not in the spec but used internally 160 // the alternative would be an additional SQL column for "active" or status="" to imply active 161 AccountStatusActive = "active" 162 163 AccountStatusDeactivated = "deactivated" 164 AccountStatusDeleted = "deleted" 165 AccountStatusDesynchronized = "desynchronized" 166 AccountStatusSuspended = "suspended" 167 AccountStatusTakendown = "takendown" 168 AccountStatusThrottled = "throttled" 169) 170 171var AccountStatusList = []string{ 172 AccountStatusActive, 173 AccountStatusDeactivated, 174 AccountStatusDeleted, 175 AccountStatusDesynchronized, 176 AccountStatusSuspended, 177 AccountStatusTakendown, 178 AccountStatusThrottled, 179} 180var AccountStatuses map[string]bool 181 182func init() { 183 AccountStatuses = make(map[string]bool, len(AccountStatusList)) 184 for _, status := range AccountStatusList { 185 AccountStatuses[status] = true 186 } 187} 188 189type XRPCStreamEvent struct { 190 Error *ErrorFrame 191 RepoCommit *comatproto.SyncSubscribeRepos_Commit 192 RepoSync *comatproto.SyncSubscribeRepos_Sync 193 RepoIdentity *comatproto.SyncSubscribeRepos_Identity 194 RepoInfo *comatproto.SyncSubscribeRepos_Info 195 RepoAccount *comatproto.SyncSubscribeRepos_Account 196 LabelLabels *comatproto.LabelSubscribeLabels_Labels 197 LabelInfo *comatproto.LabelSubscribeLabels_Info 198 199 // some private fields for internal routing perf 200 PrivUid models.Uid `json:"-" cborgen:"-"` 201 PrivPdsId uint `json:"-" cborgen:"-"` 202 PrivRelevantPds []uint `json:"-" cborgen:"-"` 203 Preserialized []byte `json:"-" cborgen:"-"` 204} 205 206func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error { 207 header := EventHeader{Op: EvtKindMessage} 208 var obj lexutil.CBOR 209 210 switch { 211 case evt.Error != nil: 212 header.Op = EvtKindErrorFrame 213 obj = evt.Error 214 case evt.RepoCommit != nil: 215 header.MsgType = "#commit" 216 obj = evt.RepoCommit 217 case evt.RepoSync != nil: 218 header.MsgType = "#sync" 219 obj = evt.RepoSync 220 case evt.RepoIdentity != nil: 221 header.MsgType = "#identity" 222 obj = evt.RepoIdentity 223 case evt.RepoAccount != nil: 224 header.MsgType = "#account" 225 obj = evt.RepoAccount 226 case evt.RepoInfo != nil: 227 header.MsgType = "#info" 228 obj = evt.RepoInfo 229 default: 230 return fmt.Errorf("unrecognized event kind") 231 } 232 233 cborWriter := cbg.NewCborWriter(wc) 234 if err := header.MarshalCBOR(cborWriter); err != nil { 235 return fmt.Errorf("failed to write header: %w", err) 236 } 237 return obj.MarshalCBOR(cborWriter) 238} 239 240func (xevt *XRPCStreamEvent) Deserialize(r io.Reader) error { 241 var header EventHeader 242 if err := header.UnmarshalCBOR(r); err != nil { 243 return fmt.Errorf("reading header: %w", err) 244 } 245 switch header.Op { 246 case EvtKindMessage: 247 switch header.MsgType { 248 case "#commit": 249 var evt comatproto.SyncSubscribeRepos_Commit 250 if err := evt.UnmarshalCBOR(r); err != nil { 251 return fmt.Errorf("reading repoCommit event: %w", err) 252 } 253 xevt.RepoCommit = &evt 254 case "#sync": 255 var evt comatproto.SyncSubscribeRepos_Sync 256 if err := evt.UnmarshalCBOR(r); err != nil { 257 return fmt.Errorf("reading repoSync event: %w", err) 258 } 259 xevt.RepoSync = &evt 260 case "#identity": 261 var evt comatproto.SyncSubscribeRepos_Identity 262 if err := evt.UnmarshalCBOR(r); err != nil { 263 return err 264 } 265 xevt.RepoIdentity = &evt 266 case "#account": 267 var evt comatproto.SyncSubscribeRepos_Account 268 if err := evt.UnmarshalCBOR(r); err != nil { 269 return err 270 } 271 xevt.RepoAccount = &evt 272 case "#info": 273 // TODO: this might also be a LabelInfo (as opposed to RepoInfo) 274 var evt comatproto.SyncSubscribeRepos_Info 275 if err := evt.UnmarshalCBOR(r); err != nil { 276 return err 277 } 278 xevt.RepoInfo = &evt 279 case "#labels": 280 var evt comatproto.LabelSubscribeLabels_Labels 281 if err := evt.UnmarshalCBOR(r); err != nil { 282 return fmt.Errorf("reading Labels event: %w", err) 283 } 284 xevt.LabelLabels = &evt 285 } 286 case EvtKindErrorFrame: 287 var errframe ErrorFrame 288 if err := errframe.UnmarshalCBOR(r); err != nil { 289 return err 290 } 291 xevt.Error = &errframe 292 default: 293 return fmt.Errorf("unrecognized event stream type: %d", header.Op) 294 } 295 return nil 296} 297 298var ErrNoSeq = errors.New("event has no sequence number") 299 300// serialize content into Preserialized cache 301func (evt *XRPCStreamEvent) Preserialize() error { 302 if evt.Preserialized != nil { 303 return nil 304 } 305 var buf bytes.Buffer 306 err := evt.Serialize(&buf) 307 if err != nil { 308 return err 309 } 310 evt.Preserialized = buf.Bytes() 311 return nil 312} 313 314type ErrorFrame struct { 315 Error string `cborgen:"error"` 316 Message string `cborgen:"message"` 317} 318 319func (em *EventManager) AddEvent(ctx context.Context, ev *XRPCStreamEvent) error { 320 ctx, span := otel.Tracer("events").Start(ctx, "AddEvent") 321 defer span.End() 322 323 em.persistAndSendEvent(ctx, ev) 324 return nil 325} 326 327var ( 328 ErrPlaybackShutdown = fmt.Errorf("playback shutting down") 329 ErrCaughtUp = fmt.Errorf("caught up") 330) 331 332func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func(*XRPCStreamEvent) bool, since *int64) (<-chan *XRPCStreamEvent, func(), error) { 333 if filter == nil { 334 filter = func(*XRPCStreamEvent) bool { return true } 335 } 336 337 done := make(chan struct{}) 338 sub := &Subscriber{ 339 ident: ident, 340 outgoing: make(chan *XRPCStreamEvent, em.bufferSize), 341 filter: filter, 342 done: done, 343 enqueuedCounter: eventsEnqueued.WithLabelValues(ident), 344 broadcastCounter: eventsBroadcast.WithLabelValues(ident), 345 } 346 347 sub.cleanup = sync.OnceFunc(func() { 348 sub.lk.Lock() 349 defer sub.lk.Unlock() 350 close(done) 351 em.rmSubscriber(sub) 352 close(sub.outgoing) 353 sub.cleanedUp = true 354 }) 355 356 if since == nil { 357 em.addSubscriber(sub) 358 return sub.outgoing, sub.cleanup, nil 359 } 360 361 out := make(chan *XRPCStreamEvent, em.crossoverBufferSize) 362 363 go func() { 364 lastSeq := *since 365 // run playback to get through *most* of the events, getting our current cursor close to realtime 366 if err := em.persister.Playback(ctx, *since, func(e *XRPCStreamEvent) error { 367 select { 368 case <-done: 369 return ErrPlaybackShutdown 370 case out <- e: 371 seq := SequenceForEvent(e) 372 if seq > 0 { 373 lastSeq = seq 374 } 375 return nil 376 } 377 }); err != nil { 378 if errors.Is(err, ErrPlaybackShutdown) { 379 em.log.Warn("events playback", "err", err) 380 } else { 381 em.log.Error("events playback", "err", err) 382 } 383 384 // TODO: send an error frame or something? 385 // NOTE: not doing em.rmSubscriber(sub) here because it hasn't been added yet 386 close(out) 387 return 388 } 389 390 // now, start buffering events from the live stream 391 em.addSubscriber(sub) 392 393 // ensure that we clean up any return paths from here out, after having added the subscriber. Note that `out` is not `sub.output`, so needs to be closed separately. 394 defer func() { 395 close(out) 396 em.rmSubscriber(sub) 397 }() 398 399 first := <-sub.outgoing 400 401 // run playback again to get us to the events that have started buffering 402 if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error { 403 seq := SequenceForEvent(e) 404 if seq > SequenceForEvent(first) { 405 return ErrCaughtUp 406 } 407 408 select { 409 case <-done: 410 return ErrPlaybackShutdown 411 case out <- e: 412 return nil 413 } 414 }); err != nil { 415 if !errors.Is(err, ErrCaughtUp) { 416 em.log.Error("events playback", "err", err) 417 return 418 } 419 } 420 421 // now that we are caught up, just copy events from the channel over 422 for evt := range sub.outgoing { 423 select { 424 case out <- evt: 425 case <-done: 426 return 427 } 428 } 429 }() 430 431 return out, sub.cleanup, nil 432} 433 434func SequenceForEvent(evt *XRPCStreamEvent) int64 { 435 return evt.Sequence() 436} 437 438func (evt *XRPCStreamEvent) Sequence() int64 { 439 switch { 440 case evt == nil: 441 return -1 442 case evt.RepoCommit != nil: 443 return evt.RepoCommit.Seq 444 case evt.RepoSync != nil: 445 return evt.RepoSync.Seq 446 case evt.RepoIdentity != nil: 447 return evt.RepoIdentity.Seq 448 case evt.RepoAccount != nil: 449 return evt.RepoAccount.Seq 450 case evt.RepoInfo != nil: 451 return -1 452 case evt.Error != nil: 453 return -1 454 default: 455 return -1 456 } 457} 458 459func (evt *XRPCStreamEvent) GetSequence() (int64, bool) { 460 switch { 461 case evt == nil: 462 return -1, false 463 case evt.RepoCommit != nil: 464 return evt.RepoCommit.Seq, true 465 case evt.RepoSync != nil: 466 return evt.RepoSync.Seq, true 467 case evt.RepoIdentity != nil: 468 return evt.RepoIdentity.Seq, true 469 case evt.RepoAccount != nil: 470 return evt.RepoAccount.Seq, true 471 case evt.RepoInfo != nil: 472 return -1, false 473 case evt.Error != nil: 474 return -1, false 475 default: 476 return -1, false 477 } 478} 479 480func (em *EventManager) rmSubscriber(sub *Subscriber) { 481 em.subsLk.Lock() 482 defer em.subsLk.Unlock() 483 484 for i, s := range em.subs { 485 if s == sub { 486 em.subs[i] = em.subs[len(em.subs)-1] 487 em.subs = em.subs[:len(em.subs)-1] 488 break 489 } 490 } 491} 492 493func (em *EventManager) addSubscriber(sub *Subscriber) { 494 em.subsLk.Lock() 495 defer em.subsLk.Unlock() 496 497 em.subs = append(em.subs, sub) 498} 499 500func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error { 501 return em.persister.TakeDownRepo(ctx, user) 502}