···1515 Help: "Total bytes received from the stream",
1616}, []string{"remote_addr"})
17171818-var workItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{
1919- Name: "indigo_work_items_added_total",
2020- Help: "Total number of work items added to the consumer pool",
2121-}, []string{"pool"})
2222-2323-var workItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
2424- Name: "indigo_work_items_processed_total",
2525- Help: "Total number of work items processed by the consumer pool",
2626-}, []string{"pool"})
2727-2828-var workItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{
2929- Name: "indigo_work_items_active_total",
3030- Help: "Total number of work items passed into a worker",
3131-}, []string{"pool"})
3232-3318var eventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
3419 Name: "indigo_events_enqueued_for_broadcast_total",
3520 Help: "Total number of events enqueued to broadcast to subscribers",
-110
events/parallel.go
···11-package events
22-33-import (
44- "context"
55- "sync"
66-)
77-88-type Scheduler interface {
99- AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error
1010-}
1111-1212-type SequentialScheduler struct {
1313- Do func(context.Context, *XRPCStreamEvent) error
1414-}
1515-1616-func (s *SequentialScheduler) AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error {
1717- return s.Do(ctx, val)
1818-}
1919-2020-type ParallelConsumerPool struct {
2121- maxConcurrency int
2222- maxQueue int
2323-2424- do func(context.Context, *XRPCStreamEvent) error
2525-2626- feeder chan *consumerTask
2727-2828- lk sync.Mutex
2929- active map[string][]*consumerTask
3030-3131- ident string
3232-}
3333-3434-func NewConsumerPool(maxC, maxQ int, ident string, do func(context.Context, *XRPCStreamEvent) error) *ParallelConsumerPool {
3535- p := &ParallelConsumerPool{
3636- maxConcurrency: maxC,
3737- maxQueue: maxQ,
3838-3939- do: do,
4040-4141- feeder: make(chan *consumerTask),
4242- active: make(map[string][]*consumerTask),
4343-4444- ident: ident,
4545- }
4646-4747- for i := 0; i < maxC; i++ {
4848- go p.worker()
4949- }
5050-5151- return p
5252-}
5353-5454-type consumerTask struct {
5555- repo string
5656- val *XRPCStreamEvent
5757-}
5858-5959-func (p *ParallelConsumerPool) AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error {
6060- workItemsAdded.WithLabelValues(p.ident).Inc()
6161- t := &consumerTask{
6262- repo: repo,
6363- val: val,
6464- }
6565- p.lk.Lock()
6666-6767- a, ok := p.active[repo]
6868- if ok {
6969- p.active[repo] = append(a, t)
7070- p.lk.Unlock()
7171- return nil
7272- }
7373-7474- p.active[repo] = []*consumerTask{}
7575- p.lk.Unlock()
7676-7777- select {
7878- case p.feeder <- t:
7979- return nil
8080- case <-ctx.Done():
8181- return ctx.Err()
8282- }
8383-}
8484-8585-func (p *ParallelConsumerPool) worker() {
8686- for work := range p.feeder {
8787- for work != nil {
8888- workItemsActive.WithLabelValues(p.ident).Inc()
8989- if err := p.do(context.TODO(), work.val); err != nil {
9090- log.Errorf("event handler failed: %s", err)
9191- }
9292- workItemsProcessed.WithLabelValues(p.ident).Inc()
9393-9494- p.lk.Lock()
9595- rem, ok := p.active[work.repo]
9696- if !ok {
9797- log.Errorf("should always have an 'active' entry if a worker is processing a job")
9898- }
9999-100100- if len(rem) == 0 {
101101- delete(p.active, work.repo)
102102- work = nil
103103- } else {
104104- work = rem[0]
105105- p.active[work.repo] = rem[1:]
106106- }
107107- p.lk.Unlock()
108108- }
109109- }
110110-}
+26
events/schedulers/metrics.go
···11+package schedulers
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+var WorkItemsAdded = promauto.NewCounterVec(prometheus.CounterOpts{
99+ Name: "indigo_scheduler_work_items_added_total",
1010+ Help: "Total number of work items added to the consumer pool",
1111+}, []string{"pool", "scheduler_type"})
1212+1313+var WorkItemsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
1414+ Name: "indigo_scheduler_work_items_processed_total",
1515+ Help: "Total number of work items processed by the consumer pool",
1616+}, []string{"pool", "scheduler_type"})
1717+1818+var WorkItemsActive = promauto.NewCounterVec(prometheus.CounterOpts{
1919+ Name: "indigo_scheduler_work_items_active_total",
2020+ Help: "Total number of work items passed into a worker",
2121+}, []string{"pool", "scheduler_type"})
2222+2323+var WorkersActive = promauto.NewGaugeVec(prometheus.GaugeOpts{
2424+ Name: "indigo_scheduler_workers_active",
2525+ Help: "Number of workers currently active",
2626+}, []string{"pool", "scheduler_type"})
+117
events/schedulers/parallel/pool.go
···11+package parallel
22+33+import (
44+ "context"
55+ "sync"
66+77+ "github.com/bluesky-social/indigo/events"
88+ "github.com/bluesky-social/indigo/events/schedulers"
99+ "github.com/labstack/gommon/log"
1010+ "github.com/prometheus/client_golang/prometheus"
1111+)
1212+1313+// Scheduler is a parallel scheduler that will run work on a fixed number of workers
1414+type Scheduler struct {
1515+ maxConcurrency int
1616+ maxQueue int
1717+1818+ do func(context.Context, *events.XRPCStreamEvent) error
1919+2020+ feeder chan *consumerTask
2121+2222+ lk sync.Mutex
2323+ active map[string][]*consumerTask
2424+2525+ ident string
2626+2727+ // metrics
2828+ itemsAdded prometheus.Counter
2929+ itemsProcessed prometheus.Counter
3030+ itemsActive prometheus.Counter
3131+ workesActive prometheus.Gauge
3232+}
3333+3434+func NewScheduler(maxC, maxQ int, ident string, do func(context.Context, *events.XRPCStreamEvent) error) *Scheduler {
3535+ p := &Scheduler{
3636+ maxConcurrency: maxC,
3737+ maxQueue: maxQ,
3838+3939+ do: do,
4040+4141+ feeder: make(chan *consumerTask),
4242+ active: make(map[string][]*consumerTask),
4343+4444+ ident: ident,
4545+4646+ itemsAdded: schedulers.WorkItemsAdded.WithLabelValues(ident, "parallel"),
4747+ itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "parallel"),
4848+ itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "parallel"),
4949+ workesActive: schedulers.WorkersActive.WithLabelValues(ident, "parallel"),
5050+ }
5151+5252+ for i := 0; i < maxC; i++ {
5353+ go p.worker()
5454+ }
5555+5656+ p.workesActive.Set(float64(maxC))
5757+5858+ return p
5959+}
6060+6161+type consumerTask struct {
6262+ repo string
6363+ val *events.XRPCStreamEvent
6464+}
6565+6666+func (p *Scheduler) AddWork(ctx context.Context, repo string, val *events.XRPCStreamEvent) error {
6767+ p.itemsAdded.Inc()
6868+ t := &consumerTask{
6969+ repo: repo,
7070+ val: val,
7171+ }
7272+ p.lk.Lock()
7373+7474+ a, ok := p.active[repo]
7575+ if ok {
7676+ p.active[repo] = append(a, t)
7777+ p.lk.Unlock()
7878+ return nil
7979+ }
8080+8181+ p.active[repo] = []*consumerTask{}
8282+ p.lk.Unlock()
8383+8484+ select {
8585+ case p.feeder <- t:
8686+ return nil
8787+ case <-ctx.Done():
8888+ return ctx.Err()
8989+ }
9090+}
9191+9292+func (p *Scheduler) worker() {
9393+ for work := range p.feeder {
9494+ for work != nil {
9595+ p.itemsActive.Inc()
9696+ if err := p.do(context.TODO(), work.val); err != nil {
9797+ log.Errorf("event handler failed: %s", err)
9898+ }
9999+ p.itemsProcessed.Inc()
100100+101101+ p.lk.Lock()
102102+ rem, ok := p.active[work.repo]
103103+ if !ok {
104104+ log.Errorf("should always have an 'active' entry if a worker is processing a job")
105105+ }
106106+107107+ if len(rem) == 0 {
108108+ delete(p.active, work.repo)
109109+ work = nil
110110+ } else {
111111+ work = rem[0]
112112+ p.active[work.repo] = rem[1:]
113113+ }
114114+ p.lk.Unlock()
115115+ }
116116+ }
117117+}