forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

appview: consumer events from spindles

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled 6ced1c38 85720a2c

Changed files
+215 -41
appview
config
pages
templates
spindles
state
+11 -10
appview/config/config.go
··· 25 Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"` 26 } 27 28 - type KnotstreamConfig struct { 29 RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"` 30 MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"` 31 ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"` ··· 74 } 75 76 type Config struct { 77 - Core CoreConfig `env:",prefix=TANGLED_"` 78 - Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` 79 - Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"` 80 - Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` 81 - Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` 82 - Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` 83 - Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` 84 - OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` 85 - Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` 86 } 87 88 func LoadConfig(ctx context.Context) (*Config, error) {
··· 25 Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"` 26 } 27 28 + type ConsumerConfig struct { 29 RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"` 30 MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"` 31 ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"` ··· 74 } 75 76 type Config struct { 77 + Core CoreConfig `env:",prefix=TANGLED_"` 78 + Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` 79 + Knotstream ConsumerConfig `env:",prefix=TANGLED_KNOTSTREAM_"` 80 + Spindlestream ConsumerConfig `env:",prefix=TANGLED_SPINDLESTREAM_"` 81 + Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` 82 + Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` 83 + Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` 84 + Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` 85 + OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` 86 + Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` 87 } 88 89 func LoadConfig(ctx context.Context) (*Config, error) {
+3 -3
appview/pages/templates/spindles/index.html
··· 14 {{ end }} 15 16 {{ define "all" }} 17 - <section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm w-full flex flex-col gap-2"> 18 <h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">your spindles</h2> 19 <div class="flex flex-col rounded border border-gray-200 dark:border-gray-700 w-full"> 20 {{ range $spindle := .Spindles }} ··· 30 {{ end }} 31 32 {{ define "register" }} 33 - <section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm w-full lg:w-fit flex flex-col gap-2"> 34 <h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">register a spindle</h2> 35 - <p class="mb-2 dark:text-gray-300">Enter the hostname of your spindle to get started</p> 36 <form 37 hx-post="/spindles/register" 38 class="max-w-2xl mb-2 space-y-4"
··· 14 {{ end }} 15 16 {{ define "all" }} 17 + <section class="rounded w-full flex flex-col gap-2"> 18 <h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">your spindles</h2> 19 <div class="flex flex-col rounded border border-gray-200 dark:border-gray-700 w-full"> 20 {{ range $spindle := .Spindles }} ··· 30 {{ end }} 31 32 {{ define "register" }} 33 + <section class="rounded w-full lg:w-fit flex flex-col gap-2"> 34 <h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">register a spindle</h2> 35 + <p class="mb-2 dark:text-gray-300">Enter the hostname of your spindle to get started.</p> 36 <form 37 hx-post="/spindles/register" 38 class="max-w-2xl mb-2 space-y-4"
+82 -14
appview/state/knotstream.go
··· 11 "tangled.sh/tangled.sh/core/appview/cache" 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 - kc "tangled.sh/tangled.sh/core/knotclient" 15 - "tangled.sh/tangled.sh/core/knotclient/cursor" 16 "tangled.sh/tangled.sh/core/log" 17 "tangled.sh/tangled.sh/core/rbac" 18 19 "github.com/posthog/posthog-go" 20 ) 21 22 - func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) { 23 knots, err := db.GetCompletedRegistrations(d) 24 if err != nil { 25 return nil, err 26 } 27 28 - srcs := make(map[kc.EventSource]struct{}) 29 for _, k := range knots { 30 - s := kc.EventSource{k} 31 srcs[s] = struct{}{} 32 } 33 ··· 35 cache := cache.New(c.Redis.Addr) 36 cursorStore := cursor.NewRedisCursorStore(cache) 37 38 - cfg := kc.ConsumerConfig{ 39 Sources: srcs, 40 - ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev), 41 RetryInterval: c.Knotstream.RetryInterval, 42 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 43 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 48 CursorStore: &cursorStore, 49 } 50 51 - return kc.NewEventConsumer(cfg), nil 52 } 53 54 - func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc { 55 - return func(ctx context.Context, source kc.EventSource, msg kc.Message) error { 56 switch msg.Nsid { 57 case tangled.GitRefUpdateNSID: 58 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 59 case tangled.PipelineNSID: 60 - // TODO 61 } 62 63 return nil 64 } 65 } 66 67 - func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error { 68 var record tangled.GitRefUpdate 69 err := json.Unmarshal(msg.EventJson, &record) 70 if err != nil { ··· 75 if err != nil { 76 return err 77 } 78 - if !slices.Contains(knownKnots, source.Knot) { 79 - return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 80 } 81 82 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) ··· 122 123 return nil 124 }
··· 11 "tangled.sh/tangled.sh/core/appview/cache" 12 "tangled.sh/tangled.sh/core/appview/config" 13 "tangled.sh/tangled.sh/core/appview/db" 14 + ec "tangled.sh/tangled.sh/core/eventconsumer" 15 + "tangled.sh/tangled.sh/core/eventconsumer/cursor" 16 "tangled.sh/tangled.sh/core/log" 17 "tangled.sh/tangled.sh/core/rbac" 18 + "tangled.sh/tangled.sh/core/workflow" 19 20 + "github.com/bluesky-social/indigo/atproto/syntax" 21 "github.com/posthog/posthog-go" 22 ) 23 24 + func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 25 knots, err := db.GetCompletedRegistrations(d) 26 if err != nil { 27 return nil, err 28 } 29 30 + srcs := make(map[ec.Source]struct{}) 31 for _, k := range knots { 32 + s := ec.NewKnotSource(k) 33 srcs[s] = struct{}{} 34 } 35 ··· 37 cache := cache.New(c.Redis.Addr) 38 cursorStore := cursor.NewRedisCursorStore(cache) 39 40 + cfg := ec.ConsumerConfig{ 41 Sources: srcs, 42 + ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev), 43 RetryInterval: c.Knotstream.RetryInterval, 44 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 45 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 50 CursorStore: &cursorStore, 51 } 52 53 + return ec.NewConsumer(cfg), nil 54 } 55 56 + func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 57 + return func(ctx context.Context, source ec.Source, msg ec.Message) error { 58 switch msg.Nsid { 59 case tangled.GitRefUpdateNSID: 60 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 61 case tangled.PipelineNSID: 62 + return ingestPipeline(d, source, msg) 63 } 64 65 return nil 66 } 67 } 68 69 + func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 70 var record tangled.GitRefUpdate 71 err := json.Unmarshal(msg.EventJson, &record) 72 if err != nil { ··· 77 if err != nil { 78 return err 79 } 80 + if !slices.Contains(knownKnots, source.Key()) { 81 + return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 82 } 83 84 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) ··· 124 125 return nil 126 } 127 + 128 + func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 129 + var record tangled.Pipeline 130 + err := json.Unmarshal(msg.EventJson, &record) 131 + if err != nil { 132 + return err 133 + } 134 + 135 + if record.TriggerMetadata == nil { 136 + return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 137 + } 138 + 139 + if record.TriggerMetadata.Repo == nil { 140 + return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 141 + } 142 + 143 + // trigger info 144 + var trigger db.Trigger 145 + var sha string 146 + switch record.TriggerMetadata.Kind { 147 + case workflow.TriggerKindPush: 148 + trigger.Kind = workflow.TriggerKindPush 149 + trigger.PushRef = &record.TriggerMetadata.Push.Ref 150 + trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 151 + trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 152 + sha = *trigger.PushNewSha 153 + case workflow.TriggerKindPullRequest: 154 + trigger.Kind = workflow.TriggerKindPush 155 + trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 156 + trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 157 + trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 158 + trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 159 + sha = *trigger.PRSourceSha 160 + } 161 + 162 + tx, err := d.Begin() 163 + if err != nil { 164 + return err 165 + } 166 + 167 + triggerId, err := db.AddTrigger(tx, trigger) 168 + if err != nil { 169 + return err 170 + } 171 + 172 + pipeline := db.Pipeline{ 173 + Rkey: msg.Rkey, 174 + Knot: source.Key(), 175 + RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 176 + RepoName: record.TriggerMetadata.Repo.Repo, 177 + TriggerId: int(triggerId), 178 + Sha: sha, 179 + } 180 + 181 + err = db.AddPipeline(tx, pipeline) 182 + if err != nil { 183 + return err 184 + } 185 + 186 + err = tx.Commit() 187 + if err != nil { 188 + return err 189 + } 190 + 191 + return err 192 + }
+93
appview/state/spindlestream.go
···
··· 1 + package state 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "log/slog" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.sh/tangled.sh/core/api/tangled" 11 + "tangled.sh/tangled.sh/core/appview/cache" 12 + "tangled.sh/tangled.sh/core/appview/config" 13 + "tangled.sh/tangled.sh/core/appview/db" 14 + ec "tangled.sh/tangled.sh/core/eventconsumer" 15 + "tangled.sh/tangled.sh/core/eventconsumer/cursor" 16 + "tangled.sh/tangled.sh/core/log" 17 + "tangled.sh/tangled.sh/core/rbac" 18 + ) 19 + 20 + func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 21 + spindles, err := db.GetSpindles(d) 22 + if err != nil { 23 + return nil, err 24 + } 25 + 26 + srcs := make(map[ec.Source]struct{}) 27 + for _, s := range spindles { 28 + src := ec.NewSpindleSource(s.Instance) 29 + srcs[src] = struct{}{} 30 + } 31 + 32 + logger := log.New("spindlestream") 33 + cache := cache.New(c.Redis.Addr) 34 + cursorStore := cursor.NewRedisCursorStore(cache) 35 + 36 + cfg := ec.ConsumerConfig{ 37 + Sources: srcs, 38 + ProcessFunc: spindleIngester(ctx, logger, d), 39 + RetryInterval: c.Spindlestream.RetryInterval, 40 + MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 41 + ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 42 + WorkerCount: c.Spindlestream.WorkerCount, 43 + QueueSize: c.Spindlestream.QueueSize, 44 + Logger: logger, 45 + Dev: c.Core.Dev, 46 + CursorStore: &cursorStore, 47 + } 48 + 49 + return ec.NewConsumer(cfg), nil 50 + } 51 + 52 + func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 53 + return func(ctx context.Context, source ec.Source, msg ec.Message) error { 54 + switch msg.Nsid { 55 + case tangled.PipelineStatusNSID: 56 + return ingestPipelineStatus(ctx, logger, d, source, msg) 57 + } 58 + 59 + return nil 60 + } 61 + } 62 + 63 + func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 64 + var record tangled.PipelineStatus 65 + err := json.Unmarshal(msg.EventJson, &record) 66 + if err != nil { 67 + return err 68 + } 69 + 70 + pipelineUri, err := syntax.ParseATURI(record.Pipeline) 71 + if err != nil { 72 + return err 73 + } 74 + 75 + exitCode := 0 76 + if record.ExitCode != nil { 77 + exitCode = int(*record.ExitCode) 78 + } 79 + 80 + status := db.PipelineStatus{ 81 + Spindle: source.Key(), 82 + Rkey: msg.Rkey, 83 + PipelineKnot: pipelineUri.Authority().String(), 84 + PipelineRkey: pipelineUri.RecordKey().String(), 85 + Created: time.Now(), 86 + Workflow: record.Workflow, 87 + Status: record.Status, 88 + Error: record.Error, 89 + ExitCode: exitCode, 90 + } 91 + 92 + return db.AddPipelineStatus(d, status) 93 + }
+26 -14
appview/state/state.go
··· 28 "tangled.sh/tangled.sh/core/appview/oauth" 29 "tangled.sh/tangled.sh/core/appview/pages" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 "tangled.sh/tangled.sh/core/jetstream" 32 "tangled.sh/tangled.sh/core/knotclient" 33 "tangled.sh/tangled.sh/core/rbac" 34 ) 35 36 type State struct { 37 - db *db.DB 38 - oauth *oauth.OAuth 39 - enforcer *rbac.Enforcer 40 - tidClock syntax.TIDClock 41 - pages *pages.Pages 42 - sess *session.SessionStore 43 - idResolver *idresolver.Resolver 44 - posthog posthog.Client 45 - jc *jetstream.JetstreamClient 46 - config *config.Config 47 - repoResolver *reporesolver.RepoResolver 48 - knotstream *knotclient.EventConsumer 49 } 50 51 func Make(ctx context.Context, config *config.Config) (*State, error) { ··· 109 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 110 } 111 112 - knotstream, err := KnotstreamConsumer(ctx, config, d, enforcer, posthog) 113 if err != nil { 114 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 115 } 116 knotstream.Start(ctx) 117 118 state := &State{ 119 d, ··· 128 config, 129 repoResolver, 130 knotstream, 131 } 132 133 return state, nil ··· 366 } 367 368 // add this knot to knotstream 369 - go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain}) 370 371 w.Write([]byte("check success")) 372 }
··· 28 "tangled.sh/tangled.sh/core/appview/oauth" 29 "tangled.sh/tangled.sh/core/appview/pages" 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 + "tangled.sh/tangled.sh/core/eventconsumer" 32 "tangled.sh/tangled.sh/core/jetstream" 33 "tangled.sh/tangled.sh/core/knotclient" 34 "tangled.sh/tangled.sh/core/rbac" 35 ) 36 37 type State struct { 38 + db *db.DB 39 + oauth *oauth.OAuth 40 + enforcer *rbac.Enforcer 41 + tidClock syntax.TIDClock 42 + pages *pages.Pages 43 + sess *session.SessionStore 44 + idResolver *idresolver.Resolver 45 + posthog posthog.Client 46 + jc *jetstream.JetstreamClient 47 + config *config.Config 48 + repoResolver *reporesolver.RepoResolver 49 + knotstream *eventconsumer.Consumer 50 + spindlestream *eventconsumer.Consumer 51 } 52 53 func Make(ctx context.Context, config *config.Config) (*State, error) { ··· 111 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 112 } 113 114 + knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 115 if err != nil { 116 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 117 } 118 knotstream.Start(ctx) 119 + 120 + spindlestream, err := Spindlestream(ctx, config, d, enforcer) 121 + if err != nil { 122 + return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 123 + } 124 + spindlestream.Start(ctx) 125 126 state := &State{ 127 d, ··· 136 config, 137 repoResolver, 138 knotstream, 139 + spindlestream, 140 } 141 142 return state, nil ··· 375 } 376 377 // add this knot to knotstream 378 + go s.knotstream.AddSource( 379 + context.Background(), 380 + eventconsumer.NewKnotSource(domain), 381 + ) 382 383 w.Write([]byte("check success")) 384 }