forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

appview: consumer events from spindles

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li e259dbc9 03269e90

verified
Changed files
+215 -41
appview
config
pages
templates
spindles
state
+11 -10
appview/config/config.go
··· 25 25 Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"` 26 26 } 27 27 28 - type KnotstreamConfig struct { 28 + type ConsumerConfig struct { 29 29 RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"` 30 30 MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"` 31 31 ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"` ··· 74 74 } 75 75 76 76 type Config struct { 77 - Core CoreConfig `env:",prefix=TANGLED_"` 78 - Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` 79 - Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"` 80 - Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` 81 - Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` 82 - Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` 83 - Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` 84 - OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` 85 - Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` 77 + Core CoreConfig `env:",prefix=TANGLED_"` 78 + Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` 79 + Knotstream ConsumerConfig `env:",prefix=TANGLED_KNOTSTREAM_"` 80 + Spindlestream ConsumerConfig `env:",prefix=TANGLED_SPINDLESTREAM_"` 81 + Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` 82 + Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` 83 + Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` 84 + Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` 85 + OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` 86 + Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` 86 87 } 87 88 88 89 func LoadConfig(ctx context.Context) (*Config, error) {
+3 -3
appview/pages/templates/spindles/index.html
··· 14 14 {{ end }} 15 15 16 16 {{ define "all" }} 17 - <section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm w-full flex flex-col gap-2"> 17 + <section class="rounded w-full flex flex-col gap-2"> 18 18 <h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">your spindles</h2> 19 19 <div class="flex flex-col rounded border border-gray-200 dark:border-gray-700 w-full"> 20 20 {{ range $spindle := .Spindles }} ··· 30 30 {{ end }} 31 31 32 32 {{ define "register" }} 33 - <section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm w-full lg:w-fit flex flex-col gap-2"> 33 + <section class="rounded w-full lg:w-fit flex flex-col gap-2"> 34 34 <h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">register a spindle</h2> 35 - <p class="mb-2 dark:text-gray-300">Enter the hostname of your spindle to get started</p> 35 + <p class="mb-2 dark:text-gray-300">Enter the hostname of your spindle to get started.</p> 36 36 <form 37 37 hx-post="/spindles/register" 38 38 class="max-w-2xl mb-2 space-y-4"
+82 -14
appview/state/knotstream.go
··· 11 11 "tangled.sh/tangled.sh/core/appview/cache" 12 12 "tangled.sh/tangled.sh/core/appview/config" 13 13 "tangled.sh/tangled.sh/core/appview/db" 14 - kc "tangled.sh/tangled.sh/core/knotclient" 15 - "tangled.sh/tangled.sh/core/knotclient/cursor" 14 + ec "tangled.sh/tangled.sh/core/eventconsumer" 15 + "tangled.sh/tangled.sh/core/eventconsumer/cursor" 16 16 "tangled.sh/tangled.sh/core/log" 17 17 "tangled.sh/tangled.sh/core/rbac" 18 + "tangled.sh/tangled.sh/core/workflow" 18 19 20 + "github.com/bluesky-social/indigo/atproto/syntax" 19 21 "github.com/posthog/posthog-go" 20 22 ) 21 23 22 - func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) { 24 + func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 23 25 knots, err := db.GetCompletedRegistrations(d) 24 26 if err != nil { 25 27 return nil, err 26 28 } 27 29 28 - srcs := make(map[kc.EventSource]struct{}) 30 + srcs := make(map[ec.Source]struct{}) 29 31 for _, k := range knots { 30 - s := kc.EventSource{k} 32 + s := ec.NewKnotSource(k) 31 33 srcs[s] = struct{}{} 32 34 } 33 35 ··· 35 37 cache := cache.New(c.Redis.Addr) 36 38 cursorStore := cursor.NewRedisCursorStore(cache) 37 39 38 - cfg := kc.ConsumerConfig{ 40 + cfg := ec.ConsumerConfig{ 39 41 Sources: srcs, 40 - ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev), 42 + ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev), 41 43 RetryInterval: c.Knotstream.RetryInterval, 42 44 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 43 45 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 48 50 CursorStore: &cursorStore, 49 51 } 50 52 51 - return kc.NewEventConsumer(cfg), nil 53 + return ec.NewConsumer(cfg), nil 52 54 } 53 55 54 - func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc { 55 - return func(ctx context.Context, source kc.EventSource, msg kc.Message) error { 56 + func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 57 + return func(ctx context.Context, source ec.Source, msg ec.Message) error { 56 58 switch msg.Nsid { 57 59 case tangled.GitRefUpdateNSID: 58 60 return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 59 61 case tangled.PipelineNSID: 60 - // TODO 62 + return ingestPipeline(d, source, msg) 61 63 } 62 64 63 65 return nil 64 66 } 65 67 } 66 68 67 - func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error { 69 + func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 68 70 var record tangled.GitRefUpdate 69 71 err := json.Unmarshal(msg.EventJson, &record) 70 72 if err != nil { ··· 75 77 if err != nil { 76 78 return err 77 79 } 78 - if !slices.Contains(knownKnots, source.Knot) { 79 - return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) 80 + if !slices.Contains(knownKnots, source.Key()) { 81 + return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 80 82 } 81 83 82 84 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) ··· 122 124 123 125 return nil 124 126 } 127 + 128 + func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 129 + var record tangled.Pipeline 130 + err := json.Unmarshal(msg.EventJson, &record) 131 + if err != nil { 132 + return err 133 + } 134 + 135 + if record.TriggerMetadata == nil { 136 + return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 137 + } 138 + 139 + if record.TriggerMetadata.Repo == nil { 140 + return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 141 + } 142 + 143 + // trigger info 144 + var trigger db.Trigger 145 + var sha string 146 + switch record.TriggerMetadata.Kind { 147 + case workflow.TriggerKindPush: 148 + trigger.Kind = workflow.TriggerKindPush 149 + trigger.PushRef = &record.TriggerMetadata.Push.Ref 150 + trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 151 + trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 152 + sha = *trigger.PushNewSha 153 + case workflow.TriggerKindPullRequest: 154 + trigger.Kind = workflow.TriggerKindPush 155 + trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 156 + trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 157 + trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 158 + trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 159 + sha = *trigger.PRSourceSha 160 + } 161 + 162 + tx, err := d.Begin() 163 + if err != nil { 164 + return err 165 + } 166 + 167 + triggerId, err := db.AddTrigger(tx, trigger) 168 + if err != nil { 169 + return err 170 + } 171 + 172 + pipeline := db.Pipeline{ 173 + Rkey: msg.Rkey, 174 + Knot: source.Key(), 175 + RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 176 + RepoName: record.TriggerMetadata.Repo.Repo, 177 + TriggerId: int(triggerId), 178 + Sha: sha, 179 + } 180 + 181 + err = db.AddPipeline(tx, pipeline) 182 + if err != nil { 183 + return err 184 + } 185 + 186 + err = tx.Commit() 187 + if err != nil { 188 + return err 189 + } 190 + 191 + return err 192 + }
+93
appview/state/spindlestream.go
··· 1 + package state 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "log/slog" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.sh/tangled.sh/core/api/tangled" 11 + "tangled.sh/tangled.sh/core/appview/cache" 12 + "tangled.sh/tangled.sh/core/appview/config" 13 + "tangled.sh/tangled.sh/core/appview/db" 14 + ec "tangled.sh/tangled.sh/core/eventconsumer" 15 + "tangled.sh/tangled.sh/core/eventconsumer/cursor" 16 + "tangled.sh/tangled.sh/core/log" 17 + "tangled.sh/tangled.sh/core/rbac" 18 + ) 19 + 20 + func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 21 + spindles, err := db.GetSpindles(d) 22 + if err != nil { 23 + return nil, err 24 + } 25 + 26 + srcs := make(map[ec.Source]struct{}) 27 + for _, s := range spindles { 28 + src := ec.NewSpindleSource(s.Instance) 29 + srcs[src] = struct{}{} 30 + } 31 + 32 + logger := log.New("spindlestream") 33 + cache := cache.New(c.Redis.Addr) 34 + cursorStore := cursor.NewRedisCursorStore(cache) 35 + 36 + cfg := ec.ConsumerConfig{ 37 + Sources: srcs, 38 + ProcessFunc: spindleIngester(ctx, logger, d), 39 + RetryInterval: c.Spindlestream.RetryInterval, 40 + MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 41 + ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 42 + WorkerCount: c.Spindlestream.WorkerCount, 43 + QueueSize: c.Spindlestream.QueueSize, 44 + Logger: logger, 45 + Dev: c.Core.Dev, 46 + CursorStore: &cursorStore, 47 + } 48 + 49 + return ec.NewConsumer(cfg), nil 50 + } 51 + 52 + func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 53 + return func(ctx context.Context, source ec.Source, msg ec.Message) error { 54 + switch msg.Nsid { 55 + case tangled.PipelineStatusNSID: 56 + return ingestPipelineStatus(ctx, logger, d, source, msg) 57 + } 58 + 59 + return nil 60 + } 61 + } 62 + 63 + func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 64 + var record tangled.PipelineStatus 65 + err := json.Unmarshal(msg.EventJson, &record) 66 + if err != nil { 67 + return err 68 + } 69 + 70 + pipelineUri, err := syntax.ParseATURI(record.Pipeline) 71 + if err != nil { 72 + return err 73 + } 74 + 75 + exitCode := 0 76 + if record.ExitCode != nil { 77 + exitCode = int(*record.ExitCode) 78 + } 79 + 80 + status := db.PipelineStatus{ 81 + Spindle: source.Key(), 82 + Rkey: msg.Rkey, 83 + PipelineKnot: pipelineUri.Authority().String(), 84 + PipelineRkey: pipelineUri.RecordKey().String(), 85 + Created: time.Now(), 86 + Workflow: record.Workflow, 87 + Status: record.Status, 88 + Error: record.Error, 89 + ExitCode: exitCode, 90 + } 91 + 92 + return db.AddPipelineStatus(d, status) 93 + }
+26 -14
appview/state/state.go
··· 28 28 "tangled.sh/tangled.sh/core/appview/oauth" 29 29 "tangled.sh/tangled.sh/core/appview/pages" 30 30 "tangled.sh/tangled.sh/core/appview/reporesolver" 31 + "tangled.sh/tangled.sh/core/eventconsumer" 31 32 "tangled.sh/tangled.sh/core/jetstream" 32 33 "tangled.sh/tangled.sh/core/knotclient" 33 34 "tangled.sh/tangled.sh/core/rbac" 34 35 ) 35 36 36 37 type State struct { 37 - db *db.DB 38 - oauth *oauth.OAuth 39 - enforcer *rbac.Enforcer 40 - tidClock syntax.TIDClock 41 - pages *pages.Pages 42 - sess *session.SessionStore 43 - idResolver *idresolver.Resolver 44 - posthog posthog.Client 45 - jc *jetstream.JetstreamClient 46 - config *config.Config 47 - repoResolver *reporesolver.RepoResolver 48 - knotstream *knotclient.EventConsumer 38 + db *db.DB 39 + oauth *oauth.OAuth 40 + enforcer *rbac.Enforcer 41 + tidClock syntax.TIDClock 42 + pages *pages.Pages 43 + sess *session.SessionStore 44 + idResolver *idresolver.Resolver 45 + posthog posthog.Client 46 + jc *jetstream.JetstreamClient 47 + config *config.Config 48 + repoResolver *reporesolver.RepoResolver 49 + knotstream *eventconsumer.Consumer 50 + spindlestream *eventconsumer.Consumer 49 51 } 50 52 51 53 func Make(ctx context.Context, config *config.Config) (*State, error) { ··· 109 111 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 110 112 } 111 113 112 - knotstream, err := KnotstreamConsumer(ctx, config, d, enforcer, posthog) 114 + knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 113 115 if err != nil { 114 116 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 115 117 } 116 118 knotstream.Start(ctx) 119 + 120 + spindlestream, err := Spindlestream(ctx, config, d, enforcer) 121 + if err != nil { 122 + return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 123 + } 124 + spindlestream.Start(ctx) 117 125 118 126 state := &State{ 119 127 d, ··· 128 136 config, 129 137 repoResolver, 130 138 knotstream, 139 + spindlestream, 131 140 } 132 141 133 142 return state, nil ··· 366 375 } 367 376 368 377 // add this knot to knotstream 369 - go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain}) 378 + go s.knotstream.AddSource( 379 + context.Background(), 380 + eventconsumer.NewKnotSource(domain), 381 + ) 370 382 371 383 w.Write([]byte("check success")) 372 384 }