Monorepo for Tangled tangled.org

appview/state: integrate webhooks with gitRefUpdate events #1068

merged opened by anirudh.fi targeting master from icy/qlyxxp

Trigger webhooks on git push events from knotstream.

Signed-off-by: Anirudh Oppiliappan anirudh@tangled.org

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:hwevmowznbiukdf6uk5dwrrq/sh.tangled.repo.pull/3menyebs7t622
+46 -25
Diff #1
+30 -13
appview/state/knotstream.go
··· 5 "encoding/json" 6 "errors" 7 "fmt" 8 - "slices" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/appview/cache" 13 "tangled.org/core/appview/config" ··· 25 "github.com/posthog/posthog-go" 26 ) 27 28 - func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) { 29 logger := log.FromContext(ctx) 30 logger = log.SubLogger(logger, "knotstream") 31 ··· 48 49 cfg := ec.ConsumerConfig{ 50 Sources: srcs, 51 - ProcessFunc: knotIngester(d, enforcer, posthog, c.Core.Dev), 52 RetryInterval: c.Knotstream.RetryInterval, 53 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 54 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 62 return ec.NewConsumer(cfg), nil 63 } 64 65 - func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc { 66 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 67 switch msg.Nsid { 68 case tangled.GitRefUpdateNSID: 69 - return ingestRefUpdate(d, enforcer, posthog, dev, source, msg) 70 case tangled.PipelineNSID: 71 return ingestPipeline(d, source, msg) 72 } ··· 75 } 76 } 77 78 - func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error { 79 var record tangled.GitRefUpdate 80 err := json.Unmarshal(msg.EventJson, &record) 81 if err != nil { 82 return err 83 } 84 85 - knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 86 - if err != nil { 87 - return err 88 - } 89 - if !slices.Contains(knownKnots, source.Key()) { 90 - return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 91 } 92 93 err1 := populatePunchcard(d, record) 94 err2 := updateRepoLanguages(d, record) 95 96 var err3 error 97 - if !dev { 98 err3 = pc.Enqueue(posthog.Capture{ 99 DistinctId: record.CommitterDid, 100 Event: "git_ref_update", ··· 105 } 106 107 func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 108 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 109 if err != nil { 110 return err
··· 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 + "tangled.org/core/appview/notify" 11 + 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/cache" 14 "tangled.org/core/appview/config" ··· 26 "github.com/posthog/posthog-go" 27 ) 28 29 + func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 30 logger := log.FromContext(ctx) 31 logger = log.SubLogger(logger, "knotstream") 32 ··· 49 50 cfg := ec.ConsumerConfig{ 51 Sources: srcs, 52 + ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 53 RetryInterval: c.Knotstream.RetryInterval, 54 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 55 ConnectionTimeout: c.Knotstream.ConnectionTimeout, ··· 63 return ec.NewConsumer(cfg), nil 64 } 65 66 + func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 67 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 68 switch msg.Nsid { 69 case tangled.GitRefUpdateNSID: 70 + return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 71 case tangled.PipelineNSID: 72 return ingestPipeline(d, source, msg) 73 } ··· 76 } 77 } 78 79 + func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 80 + logger := log.FromContext(ctx) 81 + 82 var record tangled.GitRefUpdate 83 err := json.Unmarshal(msg.EventJson, &record) 84 if err != nil { 85 return err 86 } 87 88 + logger.Info("processing gitRefUpdate event", 89 + "repo_did", record.RepoDid, 90 + "repo_name", record.RepoName, 91 + "ref", record.Ref, 92 + "old_sha", record.OldSha, 93 + "new_sha", record.NewSha) 94 + 95 + // trigger webhook notifications first (before other ops that might fail) 96 + repos, err := db.GetRepos( 97 + d, 98 + 0, 99 + orm.FilterEq("did", record.RepoDid), 100 + orm.FilterEq("name", record.RepoName), 101 + ) 102 + if err == nil && len(repos) == 1 { 103 + notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 104 } 105 106 err1 := populatePunchcard(d, record) 107 err2 := updateRepoLanguages(d, record) 108 109 var err3 error 110 + if !dev && record.CommitterDid != "" { 111 err3 = pc.Enqueue(posthog.Capture{ 112 DistinctId: record.CommitterDid, 113 Event: "git_ref_update", ··· 118 } 119 120 func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 121 + if record.CommitterDid == "" { 122 + return nil 123 + } 124 + 125 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 126 if err != nil { 127 return err
+16 -12
appview/state/state.go
··· 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 - knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 155 - if err != nil { 156 - return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 157 - } 158 - knotstream.Start(ctx) 159 - 160 - spindlestream, err := Spindlestream(ctx, config, d, enforcer) 161 - if err != nil { 162 - return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 163 - } 164 - spindlestream.Start(ctx) 165 - 166 var notifiers []notify.Notifier 167 168 // Always add the database notifier ··· 173 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 174 } 175 notifiers = append(notifiers, indexer) 176 notifier := notify.NewMergedNotifier(notifiers) 177 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 178 179 state := &State{ 180 d, 181 notifier,
··· 151 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 152 } 153 154 var notifiers []notify.Notifier 155 156 // Always add the database notifier ··· 161 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 162 } 163 notifiers = append(notifiers, indexer) 164 + 165 + // Add webhook notifier 166 + notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 167 + 168 notifier := notify.NewMergedNotifier(notifiers) 169 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 170 171 + knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 172 + if err != nil { 173 + return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 174 + } 175 + knotstream.Start(ctx) 176 + 177 + spindlestream, err := Spindlestream(ctx, config, d, enforcer) 178 + if err != nil { 179 + return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 180 + } 181 + spindlestream.Start(ctx) 182 + 183 state := &State{ 184 d, 185 notifier,

History

6 rounds 1 comment
sign up or login to add to the discussion
1 commit
expand
appview/state: integrate webhooks with gitRefUpdate events
3/3 success
expand
expand 0 comments
pull request successfully merged
1 commit
expand
appview/state: integrate webhooks with gitRefUpdate events
3/3 success
expand
expand 0 comments
1 commit
expand
appview/state: integrate webhooks with gitRefUpdate events
3/3 success
expand
expand 0 comments
1 commit
expand
appview/state: integrate webhooks with gitRefUpdate events
3/3 success
expand
expand 0 comments
1 commit
expand
appview/state: integrate webhooks with gitRefUpdate events
3/3 success
expand
expand 1 comment
  • here why was the knownKnots check removed?
  • the other ops can fail, but they are non-returning fails, the errors are Joined, we can do the same here (collect the error and join in the final statement

changeset lgtm otherwise!

1 commit
expand
appview/state: integrate webhooks with gitRefUpdate events
3/3 success
expand
expand 0 comments