forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

appview: introduce knotstream consumer

similar to jetstream consumer, we now ingest events from every known
knot.

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 4b01dc65 3e5d2f1c

verified
Changed files
+118 -8
appview
+18 -8
appview/config/config.go
··· 4 4 "context" 5 5 "fmt" 6 6 "net/url" 7 + "time" 7 8 8 9 "github.com/sethvargo/go-envconfig" 9 10 ) ··· 22 23 23 24 type JetstreamConfig struct { 24 25 Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"` 26 + } 27 + 28 + type KnotstreamConfig struct { 29 + RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"` 30 + MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"` 31 + ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"` 32 + WorkerCount int `env:"WORKER_COUNT, default=64"` 33 + QueueSize int `env:"QUEUE_SIZE, default=100"` 25 34 } 26 35 27 36 type ResendConfig struct { ··· 65 74 } 66 75 67 76 type Config struct { 68 - Core CoreConfig `env:",prefix=TANGLED_"` 69 - Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` 70 - Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` 71 - Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` 72 - Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` 73 - Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` 74 - OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` 75 - Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` 77 + Core CoreConfig `env:",prefix=TANGLED_"` 78 + Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` 79 + Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"` 80 + Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` 81 + Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` 82 + Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` 83 + Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` 84 + OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` 85 + Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` 76 86 } 77 87 78 88 func LoadConfig(ctx context.Context) (*Config, error) {
+89
appview/state/knotstream.go
··· 1 + package state 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "slices" 7 + "time" 8 + 9 + "tangled.sh/tangled.sh/core/api/tangled" 10 + "tangled.sh/tangled.sh/core/appview/cache" 11 + "tangled.sh/tangled.sh/core/appview/config" 12 + "tangled.sh/tangled.sh/core/appview/db" 13 + kc "tangled.sh/tangled.sh/core/knotclient" 14 + "tangled.sh/tangled.sh/core/log" 15 + "tangled.sh/tangled.sh/core/rbac" 16 + ) 17 + 18 + func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) { 19 + knots, err := db.GetCompletedRegistrations(d) 20 + if err != nil { 21 + return nil, err 22 + } 23 + 24 + srcs := make(map[kc.EventSource]struct{}) 25 + for _, k := range knots { 26 + s := kc.EventSource{k} 27 + srcs[s] = struct{}{} 28 + } 29 + 30 + logger := log.New("knotstream") 31 + cache := cache.New(c.Redis.Addr) 32 + cursorStore := kc.NewRedisCursorStore(cache) 33 + 34 + cfg := kc.ConsumerConfig{ 35 + Sources: srcs, 36 + ProcessFunc: knotstreamIngester(d, enforcer), 37 + RetryInterval: c.Knotstream.RetryInterval, 38 + MaxRetryInterval: c.Knotstream.MaxRetryInterval, 39 + ConnectionTimeout: c.Knotstream.ConnectionTimeout, 40 + WorkerCount: c.Knotstream.WorkerCount, 41 + QueueSize: c.Knotstream.QueueSize, 42 + Logger: logger, 43 + Dev: c.Core.Dev, 44 + CursorStore: &cursorStore, 45 + } 46 + 47 + return kc.NewEventConsumer(cfg), nil 48 + } 49 + 50 + func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc { 51 + return func(source kc.EventSource, msg kc.Message) error { 52 + switch msg.Nsid { 53 + case tangled.GitRefUpdateNSID: 54 + return ingestRefUpdate(d, enforcer, source, msg) 55 + case tangled.PipelineNSID: 56 + // TODO 57 + } 58 + 59 + return nil 60 + } 61 + } 62 + 63 + func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error { 64 + var record tangled.GitRefUpdate 65 + err := json.Unmarshal(msg.EventJson, &record) 66 + if err != nil { 67 + return err 68 + } 69 + 70 + knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid) 71 + if err != nil { 72 + return err 73 + } 74 + 75 + if !slices.Contains(knownKnots, source.Knot) { 76 + return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 77 + } 78 + 79 + punch := db.Punch{ 80 + Did: record.CommitterDid, 81 + Date: time.Now(), 82 + Count: 1, 83 + } 84 + if err := db.AddPunch(d, punch); err != nil { 85 + return err 86 + } 87 + 88 + return nil 89 + }
+11
appview/state/state.go
··· 45 45 jc *jetstream.JetstreamClient 46 46 config *config.Config 47 47 repoResolver *reporesolver.RepoResolver 48 + knotstream *knotclient.EventConsumer 48 49 } 49 50 50 51 func Make(config *config.Config) (*State, error) { ··· 108 109 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 109 110 } 110 111 112 + knotstream, err := KnotstreamConsumer(config, d, enforcer) 113 + if err != nil { 114 + return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 115 + } 116 + knotstream.Start(context.Background()) 117 + 111 118 state := &State{ 112 119 d, 113 120 oauth, ··· 120 127 jc, 121 128 config, 122 129 repoResolver, 130 + knotstream, 123 131 } 124 132 125 133 return state, nil ··· 356 364 http.Error(w, err.Error(), http.StatusInternalServerError) 357 365 return 358 366 } 367 + 368 + // add this knot to knotstream 369 + go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain}) 359 370 360 371 w.Write([]byte("check success")) 361 372 }