+5
-4
appview/state/knotstream.go
+5
-4
appview/state/knotstream.go
···
1
1
package state
2
2
3
3
import (
4
+
"context"
4
5
"encoding/json"
5
6
"fmt"
6
7
"slices"
···
17
18
"github.com/posthog/posthog-go"
18
19
)
19
20
20
-
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
21
+
func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
21
22
knots, err := db.GetCompletedRegistrations(d)
22
23
if err != nil {
23
24
return nil, err
···
35
36
36
37
cfg := kc.ConsumerConfig{
37
38
Sources: srcs,
38
-
ProcessFunc: knotstreamIngester(d, enforcer, posthog, c.Core.Dev),
39
+
ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev),
39
40
RetryInterval: c.Knotstream.RetryInterval,
40
41
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
41
42
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
49
50
return kc.NewEventConsumer(cfg), nil
50
51
}
51
52
52
-
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
53
-
return func(source kc.EventSource, msg kc.Message) error {
53
+
func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
54
+
return func(ctx context.Context, source kc.EventSource, msg kc.Message) error {
54
55
switch msg.Nsid {
55
56
case tangled.GitRefUpdateNSID:
56
57
return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
+4
-4
appview/state/state.go
+4
-4
appview/state/state.go
···
48
48
knotstream *knotclient.EventConsumer
49
49
}
50
50
51
-
func Make(config *config.Config) (*State, error) {
51
+
func Make(ctx context.Context, config *config.Config) (*State, error) {
52
52
d, err := db.Make(config.Core.DbPath)
53
53
if err != nil {
54
54
return nil, err
···
104
104
if err != nil {
105
105
return nil, fmt.Errorf("failed to create jetstream client: %w", err)
106
106
}
107
-
err = jc.StartJetstream(context.Background(), appview.Ingest(wrapper, enforcer))
107
+
err = jc.StartJetstream(ctx, appview.Ingest(wrapper, enforcer))
108
108
if err != nil {
109
109
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
110
110
}
111
111
112
-
knotstream, err := KnotstreamConsumer(config, d, enforcer, posthog)
112
+
knotstream, err := KnotstreamConsumer(ctx, config, d, enforcer, posthog)
113
113
if err != nil {
114
114
return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
115
115
}
116
-
knotstream.Start(context.Background())
116
+
knotstream.Start(ctx)
117
117
118
118
state := &State{
119
119
d,
+4
-2
cmd/appview/main.go
+4
-2
cmd/appview/main.go
···
14
14
func main() {
15
15
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, nil)))
16
16
17
-
c, err := config.LoadConfig(context.Background())
17
+
ctx := context.Background()
18
+
19
+
c, err := config.LoadConfig(ctx)
18
20
if err != nil {
19
21
log.Println("failed to load config", "error", err)
20
22
return
21
23
}
22
24
23
-
state, err := state.Make(c)
25
+
state, err := state.Make(ctx, c)
24
26
25
27
if err != nil {
26
28
log.Fatal(err)
+8
-9
cmd/eventconsumer/main.go
+8
-9
cmd/eventconsumer/main.go
···
24
24
return
25
25
}
26
26
27
-
var srcs []knotclient.EventSource
28
-
for k := range strings.SplitSeq(*knots, ",") {
29
-
srcs = append(srcs, knotclient.EventSource{k})
30
-
}
31
-
32
-
consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
33
-
Sources: srcs,
27
+
ccfg := knotclient.ConsumerConfig{
34
28
ProcessFunc: processEvent,
35
29
RetryInterval: *retryFlag,
36
30
MaxRetryInterval: *maxRetryFlag,
37
31
WorkerCount: *workerCount,
38
32
Dev: true,
39
-
})
33
+
}
34
+
for k := range strings.SplitSeq(*knots, ",") {
35
+
ccfg.AddEventSource(knotclient.NewEventSource(k))
36
+
}
37
+
38
+
consumer := knotclient.NewEventConsumer(ccfg)
40
39
41
40
ctx, cancel := context.WithCancel(context.Background())
42
41
consumer.Start(ctx)
···
45
44
consumer.Stop()
46
45
}
47
46
48
-
func processEvent(source knotclient.EventSource, msg knotclient.Message) error {
47
+
func processEvent(_ context.Context, source knotclient.EventSource, msg knotclient.Message) error {
49
48
fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson))
50
49
return nil
51
50
}
+1
-1
knotserver/events.go
+1
-1
knotserver/events.go
···
50
50
}
51
51
52
52
// complete backfill first before going to live data
53
+
l.Info("going through backfill", "cursor", cursor)
53
54
l.Debug("going through backfill", "cursor", cursor)
54
55
if err := h.streamOps(conn, &cursor); err != nil {
55
56
l.Error("failed to backfill", "err", err)
56
57
return
57
58
}
58
-
59
59
for {
60
60
// wait for new data or timeout
61
61
select {
-54
spindle/exec.go
-54
spindle/exec.go
···
1
-
package spindle
2
-
3
-
import (
4
-
"context"
5
-
"encoding/json"
6
-
"fmt"
7
-
8
-
"tangled.sh/tangled.sh/core/api/tangled"
9
-
)
10
-
11
-
func (s *Spindle) exec(ctx context.Context, src string, msg []byte) error {
12
-
pipeline := tangled.Pipeline{}
13
-
data := map[string]any{}
14
-
err := json.Unmarshal(msg, &data)
15
-
if err != nil {
16
-
fmt.Println("error unmarshalling", err)
17
-
return err
18
-
}
19
-
20
-
if data["nsid"] == tangled.PipelineNSID {
21
-
event, ok := data["event"]
22
-
if !ok {
23
-
s.l.Error("no event in message")
24
-
return nil
25
-
}
26
-
27
-
rawEvent, err := json.Marshal(event)
28
-
if err != nil {
29
-
return err
30
-
}
31
-
32
-
err = json.Unmarshal(rawEvent, &pipeline)
33
-
if err != nil {
34
-
return err
35
-
}
36
-
37
-
rkey, ok := data["rkey"].(string)
38
-
if !ok {
39
-
s.l.Error("no rkey in message")
40
-
return nil
41
-
}
42
-
43
-
err = s.eng.SetupPipeline(ctx, &pipeline, rkey)
44
-
if err != nil {
45
-
return err
46
-
}
47
-
err = s.eng.StartWorkflows(ctx, &pipeline, rkey)
48
-
if err != nil {
49
-
return err
50
-
}
51
-
}
52
-
53
-
return nil
54
-
}
+29
-3
spindle/server.go
+29
-3
spindle/server.go
···
1
1
package spindle
2
2
3
3
import (
4
+
"encoding/json"
4
5
"fmt"
5
6
"log/slog"
6
7
"net/http"
···
68
69
69
70
go func() {
70
71
logger.Info("starting event consumer")
71
-
ec := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
72
-
Sources: []string{"ws://localhost:5555/events"},
72
+
knotEventSource := knotclient.NewEventSource("localhost:5555")
73
+
ccfg := knotclient.ConsumerConfig{
73
74
Logger: logger,
74
75
ProcessFunc: spindle.exec,
75
-
})
76
+
}
77
+
ccfg.AddEventSource(knotEventSource)
78
+
79
+
ec := knotclient.NewEventConsumer(ccfg)
76
80
77
81
ec.Start(ctx)
78
82
}()
···
89
93
mux.HandleFunc("/events", s.Events)
90
94
return mux
91
95
}
96
+
97
+
func (s *Spindle) exec(ctx context.Context, src knotclient.EventSource, msg knotclient.Message) error {
98
+
pipeline := tangled.Pipeline{}
99
+
err := json.Unmarshal(msg.EventJson, &pipeline)
100
+
if err != nil {
101
+
fmt.Println("error unmarshalling", err)
102
+
return err
103
+
}
104
+
105
+
if msg.Nsid == tangled.PipelineNSID {
106
+
err = s.eng.SetupPipeline(ctx, &pipeline, msg.Rkey)
107
+
if err != nil {
108
+
return err
109
+
}
110
+
err = s.eng.StartWorkflows(ctx, &pipeline, msg.Rkey)
111
+
if err != nil {
112
+
return err
113
+
}
114
+
}
115
+
116
+
return nil
117
+
}