+66
-7
knotclient/events.go
+66
-7
knotclient/events.go
···
2
2
3
3
import (
4
4
"context"
5
+
"encoding/json"
5
6
"log/slog"
6
7
"math/rand"
7
8
"net/url"
···
13
14
"github.com/gorilla/websocket"
14
15
)
15
16
16
-
type ProcessFunc func(source string, message []byte) error
17
+
type ProcessFunc func(source EventSource, message Message) error
18
+
19
+
type Message struct {
20
+
Rkey string
21
+
Nsid string
22
+
// do not full deserialize this portion of the message, processFunc can do that
23
+
EventJson json.RawMessage
24
+
}
17
25
18
26
type ConsumerConfig struct {
19
-
Sources []string
27
+
Sources map[EventSource]struct{}
20
28
ProcessFunc ProcessFunc
21
29
RetryInterval time.Duration
22
30
MaxRetryInterval time.Duration
···
24
32
WorkerCount int
25
33
QueueSize int
26
34
Logger *slog.Logger
35
+
Dev bool
36
+
}
37
+
38
+
type EventSource struct {
39
+
Knot string
40
+
}
41
+
42
+
func NewEventSource(knot string) EventSource {
43
+
return EventSource{
44
+
Knot: knot,
45
+
}
27
46
}
28
47
29
48
type EventConsumer struct {
···
34
53
jobQueue chan job
35
54
logger *slog.Logger
36
55
randSource *rand.Rand
56
+
57
+
// rw lock over edits to consumer config
58
+
mu sync.RWMutex
59
+
}
60
+
61
+
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
62
+
scheme := "wss"
63
+
if e.cfg.Dev {
64
+
scheme = "ws"
65
+
}
66
+
67
+
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
68
+
if err != nil {
69
+
return nil, err
70
+
}
71
+
72
+
if cursor != "" {
73
+
query := url.Values{}
74
+
query.Add("cursor", cursor)
75
+
u.RawQuery = query.Encode()
76
+
}
77
+
return u, nil
37
78
}
38
79
39
80
type job struct {
40
-
source string
81
+
source EventSource
41
82
message []byte
42
83
}
43
84
···
77
118
}
78
119
79
120
// start streaming
80
-
for _, source := range c.cfg.Sources {
121
+
for source := range c.cfg.Sources {
81
122
c.wg.Add(1)
82
123
go c.startConnectionLoop(ctx, source)
83
124
}
···
94
135
close(c.jobQueue)
95
136
}
96
137
138
+
func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
139
+
c.mu.Lock()
140
+
c.cfg.Sources[s] = struct{}{}
141
+
c.wg.Add(1)
142
+
go c.startConnectionLoop(ctx, s)
143
+
c.mu.Unlock()
144
+
}
145
+
97
146
func (c *EventConsumer) worker(ctx context.Context) {
98
147
defer c.wg.Done()
99
148
for {
···
104
153
if !ok {
105
154
return
106
155
}
107
-
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
156
+
157
+
var msg Message
158
+
err := json.Unmarshal(j.message, &msg)
159
+
if err != nil {
160
+
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
161
+
return
162
+
}
163
+
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
108
164
c.logger.Error("error processing message", "source", j.source, "err", err)
109
165
}
110
166
}
111
167
}
112
168
}
113
169
114
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
170
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
115
171
defer c.wg.Done()
116
172
retryInterval := c.cfg.RetryInterval
117
173
for {
···
144
200
}
145
201
}
146
202
147
-
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
203
+
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
148
204
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
149
205
defer cancel()
150
206
151
207
u, err := url.Parse(source)
208
+
209
+
u, err := c.buildUrl(source, cursor)
152
210
if err != nil {
153
211
return err
154
212
}
155
213
214
+
c.logger.Info("connecting", "url", u.String())
156
215
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
157
216
if err != nil {
158
217
return err