+18
-8
appview/config/config.go
+18
-8
appview/config/config.go
···
4
4
"context"
5
5
"fmt"
6
6
"net/url"
7
+
"time"
7
8
8
9
"github.com/sethvargo/go-envconfig"
9
10
)
···
22
23
23
24
type JetstreamConfig struct {
24
25
Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"`
26
+
}
27
+
28
+
type KnotstreamConfig struct {
29
+
RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"`
30
+
MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"`
31
+
ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"`
32
+
WorkerCount int `env:"WORKER_COUNT, default=64"`
33
+
QueueSize int `env:"QUEUE_SIZE, default=100"`
25
34
}
26
35
27
36
type ResendConfig struct {
···
65
74
}
66
75
67
76
type Config struct {
68
-
Core CoreConfig `env:",prefix=TANGLED_"`
69
-
Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"`
70
-
Resend ResendConfig `env:",prefix=TANGLED_RESEND_"`
71
-
Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"`
72
-
Camo CamoConfig `env:",prefix=TANGLED_CAMO_"`
73
-
Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"`
74
-
OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"`
75
-
Redis RedisConfig `env:",prefix=TANGLED_REDIS_"`
77
+
Core CoreConfig `env:",prefix=TANGLED_"`
78
+
Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"`
79
+
Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"`
80
+
Resend ResendConfig `env:",prefix=TANGLED_RESEND_"`
81
+
Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"`
82
+
Camo CamoConfig `env:",prefix=TANGLED_CAMO_"`
83
+
Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"`
84
+
OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"`
85
+
Redis RedisConfig `env:",prefix=TANGLED_REDIS_"`
76
86
}
77
87
78
88
func LoadConfig(ctx context.Context) (*Config, error) {
+89
appview/state/knotstream.go
+89
appview/state/knotstream.go
···
1
+
package state
2
+
3
+
import (
4
+
"encoding/json"
5
+
"fmt"
6
+
"slices"
7
+
"time"
8
+
9
+
"tangled.sh/tangled.sh/core/api/tangled"
10
+
"tangled.sh/tangled.sh/core/appview/cache"
11
+
"tangled.sh/tangled.sh/core/appview/config"
12
+
"tangled.sh/tangled.sh/core/appview/db"
13
+
kc "tangled.sh/tangled.sh/core/knotclient"
14
+
"tangled.sh/tangled.sh/core/log"
15
+
"tangled.sh/tangled.sh/core/rbac"
16
+
)
17
+
18
+
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
19
+
knots, err := db.GetCompletedRegistrations(d)
20
+
if err != nil {
21
+
return nil, err
22
+
}
23
+
24
+
srcs := make(map[kc.EventSource]struct{})
25
+
for _, k := range knots {
26
+
s := kc.EventSource{k}
27
+
srcs[s] = struct{}{}
28
+
}
29
+
30
+
logger := log.New("knotstream")
31
+
cache := cache.New(c.Redis.Addr)
32
+
cursorStore := kc.NewRedisCursorStore(cache)
33
+
34
+
cfg := kc.ConsumerConfig{
35
+
Sources: srcs,
36
+
ProcessFunc: knotstreamIngester(d, enforcer),
37
+
RetryInterval: c.Knotstream.RetryInterval,
38
+
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
39
+
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
40
+
WorkerCount: c.Knotstream.WorkerCount,
41
+
QueueSize: c.Knotstream.QueueSize,
42
+
Logger: logger,
43
+
Dev: c.Core.Dev,
44
+
CursorStore: &cursorStore,
45
+
}
46
+
47
+
return kc.NewEventConsumer(cfg), nil
48
+
}
49
+
50
+
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
51
+
return func(source kc.EventSource, msg kc.Message) error {
52
+
switch msg.Nsid {
53
+
case tangled.GitRefUpdateNSID:
54
+
return ingestRefUpdate(d, enforcer, source, msg)
55
+
case tangled.PipelineNSID:
56
+
// TODO
57
+
}
58
+
59
+
return nil
60
+
}
61
+
}
62
+
63
+
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
64
+
var record tangled.GitRefUpdate
65
+
err := json.Unmarshal(msg.EventJson, &record)
66
+
if err != nil {
67
+
return err
68
+
}
69
+
70
+
knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
71
+
if err != nil {
72
+
return err
73
+
}
74
+
75
+
if !slices.Contains(knownKnots, source.Knot) {
76
+
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
77
+
}
78
+
79
+
punch := db.Punch{
80
+
Did: record.CommitterDid,
81
+
Date: time.Now(),
82
+
Count: 1,
83
+
}
84
+
if err := db.AddPunch(d, punch); err != nil {
85
+
return err
86
+
}
87
+
88
+
return nil
89
+
}
+11
appview/state/state.go
+11
appview/state/state.go
···
45
45
jc *jetstream.JetstreamClient
46
46
config *config.Config
47
47
repoResolver *reporesolver.RepoResolver
48
+
knotstream *knotclient.EventConsumer
48
49
}
49
50
50
51
func Make(config *config.Config) (*State, error) {
···
108
109
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
109
110
}
110
111
112
+
knotstream, err := KnotstreamConsumer(config, d, enforcer)
113
+
if err != nil {
114
+
return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
115
+
}
116
+
knotstream.Start(context.Background())
117
+
111
118
state := &State{
112
119
d,
113
120
oauth,
···
120
127
jc,
121
128
config,
122
129
repoResolver,
130
+
knotstream,
123
131
}
124
132
125
133
return state, nil
···
356
364
http.Error(w, err.Error(), http.StatusInternalServerError)
357
365
return
358
366
}
367
+
368
+
// add this knot to knotstream
369
+
go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain})
359
370
360
371
w.Write([]byte("check success"))
361
372
}