+17
-5
appview/state/knotstream.go
+17
-5
appview/state/knotstream.go
···
13
kc "tangled.sh/tangled.sh/core/knotclient"
14
"tangled.sh/tangled.sh/core/log"
15
"tangled.sh/tangled.sh/core/rbac"
16
)
17
18
-
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
19
knots, err := db.GetCompletedRegistrations(d)
20
if err != nil {
21
return nil, err
···
33
34
cfg := kc.ConsumerConfig{
35
Sources: srcs,
36
-
ProcessFunc: knotstreamIngester(d, enforcer),
37
RetryInterval: c.Knotstream.RetryInterval,
38
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
39
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
47
return kc.NewEventConsumer(cfg), nil
48
}
49
50
-
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
51
return func(source kc.EventSource, msg kc.Message) error {
52
switch msg.Nsid {
53
case tangled.GitRefUpdateNSID:
54
-
return ingestRefUpdate(d, enforcer, source, msg)
55
case tangled.PipelineNSID:
56
// TODO
57
}
···
60
}
61
}
62
63
-
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
64
var record tangled.GitRefUpdate
65
err := json.Unmarshal(msg.EventJson, &record)
66
if err != nil {
···
104
}
105
if err := db.AddPunch(d, punch); err != nil {
106
return err
107
}
108
109
return nil
···
13
kc "tangled.sh/tangled.sh/core/knotclient"
14
"tangled.sh/tangled.sh/core/log"
15
"tangled.sh/tangled.sh/core/rbac"
16
+
17
+
"github.com/posthog/posthog-go"
18
)
19
20
+
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
21
knots, err := db.GetCompletedRegistrations(d)
22
if err != nil {
23
return nil, err
···
35
36
cfg := kc.ConsumerConfig{
37
Sources: srcs,
38
+
ProcessFunc: knotstreamIngester(d, enforcer, posthog, c.Core.Dev),
39
RetryInterval: c.Knotstream.RetryInterval,
40
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
41
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
49
return kc.NewEventConsumer(cfg), nil
50
}
51
52
+
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
53
return func(source kc.EventSource, msg kc.Message) error {
54
switch msg.Nsid {
55
case tangled.GitRefUpdateNSID:
56
+
return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
57
case tangled.PipelineNSID:
58
// TODO
59
}
···
62
}
63
}
64
65
+
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error {
66
var record tangled.GitRefUpdate
67
err := json.Unmarshal(msg.EventJson, &record)
68
if err != nil {
···
106
}
107
if err := db.AddPunch(d, punch); err != nil {
108
return err
109
+
}
110
+
111
+
if !dev {
112
+
err = pc.Enqueue(posthog.Capture{
113
+
DistinctId: record.CommitterDid,
114
+
Event: "git_ref_update",
115
+
})
116
+
if err != nil {
117
+
// non-fatal, TODO: log this
118
+
}
119
}
120
121
return nil
+1
-1
appview/state/state.go
+1
-1
appview/state/state.go