+39
eventconsumer/knot.go
+39
eventconsumer/knot.go
···
1
+
package eventconsumer
2
+
3
+
import (
4
+
"fmt"
5
+
"net/url"
6
+
)
7
+
8
+
type KnotSource struct {
9
+
Knot string
10
+
}
11
+
12
+
func (k KnotSource) Key() string {
13
+
return k.Knot
14
+
}
15
+
16
+
func (k KnotSource) Url(cursor int64, dev bool) (*url.URL, error) {
17
+
scheme := "wss"
18
+
if dev {
19
+
scheme = "ws"
20
+
}
21
+
22
+
u, err := url.Parse(scheme + "://" + k.Knot + "/events")
23
+
if err != nil {
24
+
return nil, err
25
+
}
26
+
27
+
if cursor != 0 {
28
+
query := url.Values{}
29
+
query.Add("cursor", fmt.Sprintf("%d", cursor))
30
+
u.RawQuery = query.Encode()
31
+
}
32
+
return u, nil
33
+
}
34
+
35
+
func NewKnotSource(knot string) KnotSource {
36
+
return KnotSource{
37
+
Knot: knot,
38
+
}
39
+
}
+39
eventconsumer/spindle.go
+39
eventconsumer/spindle.go
···
1
+
package eventconsumer
2
+
3
+
import (
4
+
"fmt"
5
+
"net/url"
6
+
)
7
+
8
+
type SpindleSource struct {
9
+
Spindle string
10
+
}
11
+
12
+
func (s SpindleSource) Key() string {
13
+
return s.Spindle
14
+
}
15
+
16
+
func (s SpindleSource) Url(cursor int64, dev bool) (*url.URL, error) {
17
+
scheme := "wss"
18
+
if dev {
19
+
scheme = "ws"
20
+
}
21
+
22
+
u, err := url.Parse(scheme + "://" + s.Spindle + "/events")
23
+
if err != nil {
24
+
return nil, err
25
+
}
26
+
27
+
if cursor != 0 {
28
+
query := url.Values{}
29
+
query.Add("cursor", fmt.Sprintf("%d", cursor))
30
+
u.RawQuery = query.Encode()
31
+
}
32
+
return u, nil
33
+
}
34
+
35
+
func NewSpindleSource(spindle string) SpindleSource {
36
+
return SpindleSource{
37
+
Spindle: spindle,
38
+
}
39
+
}
knotclient/cursor/memory.go
eventconsumer/cursor/memory.go
knotclient/cursor/memory.go
eventconsumer/cursor/memory.go
knotclient/cursor/redis.go
eventconsumer/cursor/redis.go
knotclient/cursor/redis.go
eventconsumer/cursor/redis.go
knotclient/cursor/sqlite.go
eventconsumer/cursor/sqlite.go
knotclient/cursor/sqlite.go
eventconsumer/cursor/sqlite.go
knotclient/cursor/store.go
eventconsumer/cursor/store.go
knotclient/cursor/store.go
eventconsumer/cursor/store.go
+25
-48
knotclient/events.go
eventconsumer/consumer.go
+25
-48
knotclient/events.go
eventconsumer/consumer.go
···
1
-
package knotclient
1
+
package eventconsumer
2
2
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
-
"fmt"
7
6
"log/slog"
8
7
"math/rand"
9
8
"net/url"
10
9
"sync"
11
10
"time"
12
11
13
-
"tangled.sh/tangled.sh/core/knotclient/cursor"
12
+
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
14
13
"tangled.sh/tangled.sh/core/log"
15
14
16
15
"github.com/gorilla/websocket"
17
16
)
18
17
19
-
type ProcessFunc func(ctx context.Context, source EventSource, message Message) error
18
+
type ProcessFunc func(ctx context.Context, source Source, message Message) error
20
19
21
20
type Message struct {
22
21
Rkey string
···
26
25
}
27
26
28
27
type ConsumerConfig struct {
29
-
Sources map[EventSource]struct{}
28
+
Sources map[Source]struct{}
30
29
ProcessFunc ProcessFunc
31
30
RetryInterval time.Duration
32
31
MaxRetryInterval time.Duration
···
40
39
41
40
func NewConsumerConfig() *ConsumerConfig {
42
41
return &ConsumerConfig{
43
-
Sources: make(map[EventSource]struct{}),
42
+
Sources: make(map[Source]struct{}),
44
43
}
45
44
}
46
45
47
-
type EventSource struct {
48
-
Knot string
46
+
type Source interface {
47
+
// url to start streaming events from
48
+
Url(cursor int64, dev bool) (*url.URL, error)
49
+
// cache key for cursor storage
50
+
Key() string
49
51
}
50
52
51
-
func NewEventSource(knot string) EventSource {
52
-
return EventSource{
53
-
Knot: knot,
54
-
}
55
-
}
56
-
57
-
type EventConsumer struct {
53
+
type Consumer struct {
58
54
wg sync.WaitGroup
59
55
dialer *websocket.Dialer
60
56
connMap sync.Map
···
67
63
cfg ConsumerConfig
68
64
}
69
65
70
-
func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) {
71
-
scheme := "wss"
72
-
if e.cfg.Dev {
73
-
scheme = "ws"
74
-
}
75
-
76
-
u, err := url.Parse(scheme + "://" + s.Knot + "/events")
77
-
if err != nil {
78
-
return nil, err
79
-
}
80
-
81
-
if cursor != 0 {
82
-
query := url.Values{}
83
-
query.Add("cursor", fmt.Sprintf("%d", cursor))
84
-
u.RawQuery = query.Encode()
85
-
}
86
-
return u, nil
87
-
}
88
-
89
66
type job struct {
90
-
source EventSource
67
+
source Source
91
68
message []byte
92
69
}
93
70
94
-
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
71
+
func NewConsumer(cfg ConsumerConfig) *Consumer {
95
72
if cfg.RetryInterval == 0 {
96
73
cfg.RetryInterval = 15 * time.Minute
97
74
}
···
105
82
cfg.MaxRetryInterval = 1 * time.Hour
106
83
}
107
84
if cfg.Logger == nil {
108
-
cfg.Logger = log.New("eventconsumer")
85
+
cfg.Logger = log.New("consumer")
109
86
}
110
87
if cfg.QueueSize == 0 {
111
88
cfg.QueueSize = 100
···
113
90
if cfg.CursorStore == nil {
114
91
cfg.CursorStore = &cursor.MemoryStore{}
115
92
}
116
-
return &EventConsumer{
93
+
return &Consumer{
117
94
cfg: cfg,
118
95
dialer: websocket.DefaultDialer,
119
96
jobQueue: make(chan job, cfg.QueueSize), // buffered job queue
···
122
99
}
123
100
}
124
101
125
-
func (c *EventConsumer) Start(ctx context.Context) {
102
+
func (c *Consumer) Start(ctx context.Context) {
126
103
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
127
104
128
105
// start workers
···
138
115
}
139
116
}
140
117
141
-
func (c *EventConsumer) Stop() {
118
+
func (c *Consumer) Stop() {
142
119
c.connMap.Range(func(_, val any) bool {
143
120
if conn, ok := val.(*websocket.Conn); ok {
144
121
conn.Close()
···
149
126
close(c.jobQueue)
150
127
}
151
128
152
-
func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
129
+
func (c *Consumer) AddSource(ctx context.Context, s Source) {
153
130
// we are already listening to this source
154
131
if _, ok := c.cfg.Sources[s]; ok {
155
132
c.logger.Info("source already present", "source", s)
···
163
140
c.cfgMu.Unlock()
164
141
}
165
142
166
-
func (c *EventConsumer) worker(ctx context.Context) {
143
+
func (c *Consumer) worker(ctx context.Context) {
167
144
defer c.wg.Done()
168
145
for {
169
146
select {
···
177
154
var msg Message
178
155
err := json.Unmarshal(j.message, &msg)
179
156
if err != nil {
180
-
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
157
+
c.logger.Error("error deserializing message", "source", j.source.Key(), "err", err)
181
158
return
182
159
}
183
160
184
161
// update cursor
185
-
c.cfg.CursorStore.Set(j.source.Knot, time.Now().UnixNano())
162
+
c.cfg.CursorStore.Set(j.source.Key(), time.Now().UnixNano())
186
163
187
164
if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
188
165
c.logger.Error("error processing message", "source", j.source, "err", err)
···
191
168
}
192
169
}
193
170
194
-
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
171
+
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
195
172
defer c.wg.Done()
196
173
retryInterval := c.cfg.RetryInterval
197
174
for {
···
224
201
}
225
202
}
226
203
227
-
func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
204
+
func (c *Consumer) runConnection(ctx context.Context, source Source) error {
228
205
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
229
206
defer cancel()
230
207
231
-
cursor := c.cfg.CursorStore.Get(source.Knot)
208
+
cursor := c.cfg.CursorStore.Get(source.Key())
232
209
233
-
u, err := c.buildUrl(source, cursor)
210
+
u, err := source.Url(cursor, c.cfg.Dev)
234
211
if err != nil {
235
212
return err
236
213
}
+2
-2
spindle/ingester.go
+2
-2
spindle/ingester.go
···
6
6
"fmt"
7
7
8
8
"tangled.sh/tangled.sh/core/api/tangled"
9
-
"tangled.sh/tangled.sh/core/knotclient"
9
+
"tangled.sh/tangled.sh/core/eventconsumer"
10
10
11
11
"github.com/bluesky-social/jetstream/pkg/models"
12
12
)
···
128
128
}
129
129
130
130
// add this knot to the event consumer
131
-
src := knotclient.NewEventSource(record.Knot)
131
+
src := eventconsumer.NewKnotSource(record.Knot)
132
132
s.ks.AddSource(context.Background(), src)
133
133
134
134
return nil
+8
-8
spindle/server.go
+8
-8
spindle/server.go
···
9
9
10
10
"github.com/go-chi/chi/v5"
11
11
"tangled.sh/tangled.sh/core/api/tangled"
12
+
"tangled.sh/tangled.sh/core/eventconsumer"
13
+
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
12
14
"tangled.sh/tangled.sh/core/jetstream"
13
-
"tangled.sh/tangled.sh/core/knotclient"
14
-
"tangled.sh/tangled.sh/core/knotclient/cursor"
15
15
"tangled.sh/tangled.sh/core/log"
16
16
"tangled.sh/tangled.sh/core/notifier"
17
17
"tangled.sh/tangled.sh/core/rbac"
···
35
35
eng *engine.Engine
36
36
jq *queue.Queue
37
37
cfg *config.Config
38
-
ks *knotclient.EventConsumer
38
+
ks *eventconsumer.Consumer
39
39
}
40
40
41
41
func Run(ctx context.Context) error {
···
114
114
// for each incoming sh.tangled.pipeline, we execute
115
115
// spindle.processPipeline, which in turn enqueues the pipeline
116
116
// job in the above registered queue.
117
-
ccfg := knotclient.NewConsumerConfig()
117
+
ccfg := eventconsumer.NewConsumerConfig()
118
118
ccfg.Logger = logger
119
119
ccfg.Dev = cfg.Server.Dev
120
120
ccfg.ProcessFunc = spindle.processPipeline
···
125
125
}
126
126
for _, knot := range knownKnots {
127
127
logger.Info("adding source start", "knot", knot)
128
-
ccfg.Sources[knotclient.EventSource{knot}] = struct{}{}
128
+
ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
129
129
}
130
-
spindle.ks = knotclient.NewEventConsumer(*ccfg)
130
+
spindle.ks = eventconsumer.NewConsumer(*ccfg)
131
131
132
132
go func() {
133
133
logger.Info("starting knot event consumer")
···
151
151
return mux
152
152
}
153
153
154
-
func (s *Spindle) processPipeline(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
154
+
func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
155
155
if msg.Nsid == tangled.PipelineNSID {
156
156
pipeline := tangled.Pipeline{}
157
157
err := json.Unmarshal(msg.EventJson, &pipeline)
···
179
179
}
180
180
181
181
pipelineId := models.PipelineId{
182
-
Knot: src.Knot,
182
+
Knot: src.Key(),
183
183
Rkey: msg.Rkey,
184
184
}
185
185
+24
-5
spindle/stream.go
+24
-5
spindle/stream.go
···
2
2
3
3
import (
4
4
"context"
5
+
"encoding/json"
5
6
"fmt"
6
7
"net/http"
7
8
"strconv"
···
206
207
}
207
208
208
209
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *int64) error {
209
-
ops, err := s.db.GetEvents(*cursor)
210
+
events, err := s.db.GetEvents(*cursor)
210
211
if err != nil {
211
212
s.l.Debug("err", "err", err)
212
213
return err
213
214
}
214
-
s.l.Debug("ops", "ops", ops)
215
+
s.l.Debug("ops", "ops", events)
215
216
216
-
for _, op := range ops {
217
-
if err := conn.WriteJSON(op); err != nil {
217
+
for _, event := range events {
218
+
// first extract the inner json into a map
219
+
var eventJson map[string]any
220
+
err := json.Unmarshal([]byte(event.EventJson), &eventJson)
221
+
if err != nil {
222
+
s.l.Error("failed to unmarshal event", "err", err)
223
+
return err
224
+
}
225
+
226
+
jsonMsg, err := json.Marshal(map[string]any{
227
+
"rkey": event.Rkey,
228
+
"nsid": event.Nsid,
229
+
"event": eventJson,
230
+
})
231
+
if err != nil {
232
+
s.l.Error("failed to marshal record", "err", err)
233
+
return err
234
+
}
235
+
236
+
if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
218
237
s.l.Debug("err", "err", err)
219
238
return err
220
239
}
221
-
*cursor = op.Created
240
+
*cursor = event.Created
222
241
}
223
242
224
243
return nil