+11
-7
cmd/eventconsumer/main.go
+11
-7
cmd/eventconsumer/main.go
···
11
)
12
13
func main() {
14
-
sourcesFlag := flag.String("sources", "", "list of wss sources")
15
retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
16
maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
17
workerCount := flag.Int("workers", 10, "goroutine pool size")
18
19
flag.Parse()
20
21
-
if *sourcesFlag == "" {
22
-
fmt.Println("error: -sources is required")
23
flag.Usage()
24
return
25
}
26
27
-
sources := strings.Split(*sourcesFlag, ",")
28
29
consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
30
-
Sources: sources,
31
ProcessFunc: processEvent,
32
RetryInterval: *retryFlag,
33
MaxRetryInterval: *maxRetryFlag,
34
WorkerCount: *workerCount,
35
})
36
37
ctx, cancel := context.WithCancel(context.Background())
···
41
consumer.Stop()
42
}
43
44
-
func processEvent(source string, msg []byte) error {
45
-
fmt.Printf("From %s: %s\n", source, string(msg))
46
return nil
47
}
···
11
)
12
13
func main() {
14
+
knots := flag.String("knots", "", "list of knots to connect to")
15
retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
16
maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
17
workerCount := flag.Int("workers", 10, "goroutine pool size")
18
19
flag.Parse()
20
21
+
if *knots == "" {
22
+
fmt.Println("error: -knots is required")
23
flag.Usage()
24
return
25
}
26
27
+
var srcs []knotclient.EventSource
28
+
for k := range strings.SplitSeq(*knots, ",") {
29
+
srcs = append(srcs, knotclient.EventSource{k})
30
+
}
31
32
consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
33
+
Sources: srcs,
34
ProcessFunc: processEvent,
35
RetryInterval: *retryFlag,
36
MaxRetryInterval: *maxRetryFlag,
37
WorkerCount: *workerCount,
38
+
Dev: true,
39
})
40
41
ctx, cancel := context.WithCancel(context.Background())
···
45
consumer.Stop()
46
}
47
48
+
func processEvent(source knotclient.EventSource, msg knotclient.Message) error {
49
+
fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson))
50
return nil
51
}
+66
-2
knotclient/events.go
+66
-2
knotclient/events.go
···
3
import (
4
"context"
5
"encoding/json"
6
"log/slog"
7
"math/rand"
8
"net/url"
9
"sync"
10
"time"
11
12
"tangled.sh/tangled.sh/core/log"
13
14
"github.com/gorilla/websocket"
···
20
Rkey string
21
Nsid string
22
// do not full deserialize this portion of the message, processFunc can do that
23
-
EventJson json.RawMessage
24
}
25
26
type ConsumerConfig struct {
···
33
QueueSize int
34
Logger *slog.Logger
35
Dev bool
36
}
37
38
type EventSource struct {
···
58
mu sync.RWMutex
59
}
60
61
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
62
scheme := "wss"
63
if e.cfg.Dev {
···
100
}
101
if cfg.QueueSize == 0 {
102
cfg.QueueSize = 100
103
}
104
return &EventConsumer{
105
cfg: cfg,
···
111
}
112
113
func (c *EventConsumer) Start(ctx context.Context) {
114
// start workers
115
for range c.cfg.WorkerCount {
116
c.wg.Add(1)
···
160
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
161
return
162
}
163
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
164
c.logger.Error("error processing message", "source", j.source, "err", err)
165
}
···
204
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
205
defer cancel()
206
207
-
u, err := url.Parse(source)
208
209
u, err := c.buildUrl(source, cursor)
210
if err != nil {
···
3
import (
4
"context"
5
"encoding/json"
6
+
"fmt"
7
"log/slog"
8
"math/rand"
9
"net/url"
10
"sync"
11
"time"
12
13
+
"tangled.sh/tangled.sh/core/appview/cache"
14
"tangled.sh/tangled.sh/core/log"
15
16
"github.com/gorilla/websocket"
···
22
Rkey string
23
Nsid string
24
// do not full deserialize this portion of the message, processFunc can do that
25
+
EventJson json.RawMessage `json:"event"`
26
}
27
28
type ConsumerConfig struct {
···
35
QueueSize int
36
Logger *slog.Logger
37
Dev bool
38
+
CursorStore CursorStore
39
}
40
41
type EventSource struct {
···
61
mu sync.RWMutex
62
}
63
64
+
type CursorStore interface {
65
+
Set(knot, cursor string)
66
+
Get(knot string) (cursor string)
67
+
}
68
+
69
+
type RedisCursorStore struct {
70
+
rdb *cache.Cache
71
+
}
72
+
73
+
func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
74
+
return RedisCursorStore{
75
+
rdb: cache,
76
+
}
77
+
}
78
+
79
+
const (
80
+
cursorKey = "cursor:%s"
81
+
)
82
+
83
+
func (r *RedisCursorStore) Set(knot, cursor string) {
84
+
key := fmt.Sprintf(cursorKey, knot)
85
+
r.rdb.Set(context.Background(), key, cursor, 0)
86
+
}
87
+
88
+
func (r *RedisCursorStore) Get(knot string) (cursor string) {
89
+
key := fmt.Sprintf(cursorKey, knot)
90
+
val, err := r.rdb.Get(context.Background(), key).Result()
91
+
if err != nil {
92
+
return ""
93
+
}
94
+
95
+
return val
96
+
}
97
+
98
+
type MemoryCursorStore struct {
99
+
store sync.Map
100
+
}
101
+
102
+
func (m *MemoryCursorStore) Set(knot, cursor string) {
103
+
m.store.Store(knot, cursor)
104
+
}
105
+
106
+
func (m *MemoryCursorStore) Get(knot string) (cursor string) {
107
+
if result, ok := m.store.Load(knot); ok {
108
+
if val, ok := result.(string); ok {
109
+
return val
110
+
}
111
+
}
112
+
113
+
return ""
114
+
}
115
+
116
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
117
scheme := "wss"
118
if e.cfg.Dev {
···
155
}
156
if cfg.QueueSize == 0 {
157
cfg.QueueSize = 100
158
+
}
159
+
if cfg.CursorStore == nil {
160
+
cfg.CursorStore = &MemoryCursorStore{}
161
}
162
return &EventConsumer{
163
cfg: cfg,
···
169
}
170
171
func (c *EventConsumer) Start(ctx context.Context) {
172
+
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
173
+
174
// start workers
175
for range c.cfg.WorkerCount {
176
c.wg.Add(1)
···
220
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
221
return
222
}
223
+
224
+
// update cursor
225
+
c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey)
226
+
227
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
228
c.logger.Error("error processing message", "source", j.source, "err", err)
229
}
···
268
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
269
defer cancel()
270
271
+
cursor := c.cfg.CursorStore.Get(source.Knot)
272
273
u, err := c.buildUrl(source, cursor)
274
if err != nil {