+17
-7
knotclient/events.go
+17
-7
knotclient/events.go
···
17
17
"github.com/gorilla/websocket"
18
18
)
19
19
20
-
type ProcessFunc func(source EventSource, message Message) error
20
+
type ProcessFunc func(ctx context.Context, source EventSource, message Message) error
21
21
22
22
type Message struct {
23
23
Rkey string
···
39
39
CursorStore CursorStore
40
40
}
41
41
42
+
func NewConsumerConfig() *ConsumerConfig {
43
+
return &ConsumerConfig{
44
+
Sources: make(map[EventSource]struct{}),
45
+
}
46
+
}
47
+
48
+
func (cc *ConsumerConfig) AddEventSource(es EventSource) {
49
+
cc.Sources[es] = struct{}{}
50
+
}
51
+
42
52
type EventSource struct {
43
53
Knot string
44
54
}
···
50
60
}
51
61
52
62
type EventConsumer struct {
53
-
cfg ConsumerConfig
54
63
wg sync.WaitGroup
55
64
dialer *websocket.Dialer
56
65
connMap sync.Map
···
58
67
logger *slog.Logger
59
68
randSource *rand.Rand
60
69
61
-
// rw lock over edits to consumer config
62
-
mu sync.RWMutex
70
+
// rw lock over edits to ConsumerConfig
71
+
cfgMu sync.RWMutex
72
+
cfg ConsumerConfig
63
73
}
64
74
65
75
type CursorStore interface {
···
202
212
}
203
213
204
214
func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
205
-
c.mu.Lock()
215
+
c.cfgMu.Lock()
206
216
c.cfg.Sources[s] = struct{}{}
207
217
c.wg.Add(1)
208
218
go c.startConnectionLoop(ctx, s)
209
-
c.mu.Unlock()
219
+
c.cfgMu.Unlock()
210
220
}
211
221
212
222
func (c *EventConsumer) worker(ctx context.Context) {
···
230
240
// update cursor
231
241
c.cfg.CursorStore.Set(j.source.Knot, time.Now().Unix())
232
242
233
-
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
243
+
if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
234
244
c.logger.Error("error processing message", "source", j.source, "err", err)
235
245
}
236
246
}