+66
-7
knotclient/events.go
+66
-7
knotclient/events.go
···
2
3
import (
4
"context"
5
"log/slog"
6
"math/rand"
7
"net/url"
···
13
"github.com/gorilla/websocket"
14
)
15
16
-
type ProcessFunc func(source string, message []byte) error
17
18
type ConsumerConfig struct {
19
-
Sources []string
20
ProcessFunc ProcessFunc
21
RetryInterval time.Duration
22
MaxRetryInterval time.Duration
···
24
WorkerCount int
25
QueueSize int
26
Logger *slog.Logger
27
}
28
29
type EventConsumer struct {
···
34
jobQueue chan job
35
logger *slog.Logger
36
randSource *rand.Rand
37
}
38
39
type job struct {
40
-
source string
41
message []byte
42
}
43
···
77
}
78
79
// start streaming
80
-
for _, source := range c.cfg.Sources {
81
c.wg.Add(1)
82
go c.startConnectionLoop(ctx, source)
83
}
···
94
close(c.jobQueue)
95
}
96
97
func (c *EventConsumer) worker(ctx context.Context) {
98
defer c.wg.Done()
99
for {
···
104
if !ok {
105
return
106
}
107
-
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
108
c.logger.Error("error processing message", "source", j.source, "err", err)
109
}
110
}
111
}
112
}
113
114
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
115
defer c.wg.Done()
116
retryInterval := c.cfg.RetryInterval
117
for {
···
144
}
145
}
146
147
-
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
148
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
149
defer cancel()
150
151
u, err := url.Parse(source)
152
if err != nil {
153
return err
154
}
155
156
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
157
if err != nil {
158
return err
···
2
3
import (
4
"context"
5
+
"encoding/json"
6
"log/slog"
7
"math/rand"
8
"net/url"
···
14
"github.com/gorilla/websocket"
15
)
16
17
+
type ProcessFunc func(source EventSource, message Message) error
18
+
19
+
type Message struct {
20
+
Rkey string
21
+
Nsid string
22
+
// do not full deserialize this portion of the message, processFunc can do that
23
+
EventJson json.RawMessage
24
+
}
25
26
type ConsumerConfig struct {
27
+
Sources map[EventSource]struct{}
28
ProcessFunc ProcessFunc
29
RetryInterval time.Duration
30
MaxRetryInterval time.Duration
···
32
WorkerCount int
33
QueueSize int
34
Logger *slog.Logger
35
+
Dev bool
36
+
}
37
+
38
+
type EventSource struct {
39
+
Knot string
40
+
}
41
+
42
+
func NewEventSource(knot string) EventSource {
43
+
return EventSource{
44
+
Knot: knot,
45
+
}
46
}
47
48
type EventConsumer struct {
···
53
jobQueue chan job
54
logger *slog.Logger
55
randSource *rand.Rand
56
+
57
+
// rw lock over edits to consumer config
58
+
mu sync.RWMutex
59
+
}
60
+
61
+
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
62
+
scheme := "wss"
63
+
if e.cfg.Dev {
64
+
scheme = "ws"
65
+
}
66
+
67
+
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
68
+
if err != nil {
69
+
return nil, err
70
+
}
71
+
72
+
if cursor != "" {
73
+
query := url.Values{}
74
+
query.Add("cursor", cursor)
75
+
u.RawQuery = query.Encode()
76
+
}
77
+
return u, nil
78
}
79
80
type job struct {
81
+
source EventSource
82
message []byte
83
}
84
···
118
}
119
120
// start streaming
121
+
for source := range c.cfg.Sources {
122
c.wg.Add(1)
123
go c.startConnectionLoop(ctx, source)
124
}
···
135
close(c.jobQueue)
136
}
137
138
+
func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
139
+
c.mu.Lock()
140
+
c.cfg.Sources[s] = struct{}{}
141
+
c.wg.Add(1)
142
+
go c.startConnectionLoop(ctx, s)
143
+
c.mu.Unlock()
144
+
}
145
+
146
func (c *EventConsumer) worker(ctx context.Context) {
147
defer c.wg.Done()
148
for {
···
153
if !ok {
154
return
155
}
156
+
157
+
var msg Message
158
+
err := json.Unmarshal(j.message, &msg)
159
+
if err != nil {
160
+
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
161
+
return
162
+
}
163
+
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
164
c.logger.Error("error processing message", "source", j.source, "err", err)
165
}
166
}
167
}
168
}
169
170
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
171
defer c.wg.Done()
172
retryInterval := c.cfg.RetryInterval
173
for {
···
200
}
201
}
202
203
+
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
204
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
205
defer cancel()
206
207
u, err := url.Parse(source)
208
+
209
+
u, err := c.buildUrl(source, cursor)
210
if err != nil {
211
return err
212
}
213
214
+
c.logger.Info("connecting", "url", u.String())
215
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
216
if err != nil {
217
return err