forked from
tangled.org/core
Monorepo for Tangled
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/appview/cloudflare"
12 "tangled.org/core/appview/notify"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview/cache"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/sites"
20 ec "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/log"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25 "tangled.org/core/workflow"
26
27 "github.com/bluesky-social/indigo/atproto/syntax"
28 "github.com/go-git/go-git/v5/plumbing"
29 "github.com/posthog/posthog-go"
30)
31
32func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
33 logger := log.FromContext(ctx)
34 logger = log.SubLogger(logger, "knotstream")
35
36 knots, err := db.GetRegistrations(
37 d,
38 orm.FilterIsNot("registered", "null"),
39 )
40 if err != nil {
41 return nil, err
42 }
43
44 srcs := make(map[ec.Source]struct{})
45 for _, k := range knots {
46 s := ec.NewKnotSource(k.Domain)
47 srcs[s] = struct{}{}
48 }
49
50 cache := cache.New(c.Redis.Addr)
51 cursorStore := cursor.NewRedisCursorStore(cache)
52
53 cfg := ec.ConsumerConfig{
54 Sources: srcs,
55 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
56 RetryInterval: c.Knotstream.RetryInterval,
57 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
58 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
59 WorkerCount: c.Knotstream.WorkerCount,
60 QueueSize: c.Knotstream.QueueSize,
61 Logger: logger,
62 Dev: c.Core.Dev,
63 CursorStore: &cursorStore,
64 }
65
66 return ec.NewConsumer(cfg), nil
67}
68
69func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
70 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
71 switch msg.Nsid {
72 case tangled.GitRefUpdateNSID:
73 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
74 case tangled.PipelineNSID:
75 return ingestPipeline(d, source, msg)
76 }
77
78 return nil
79 }
80}
81
82func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error {
83 logger := log.FromContext(ctx)
84
85 var record tangled.GitRefUpdate
86 err := json.Unmarshal(msg.EventJson, &record)
87 if err != nil {
88 return err
89 }
90
91 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
92 if err != nil {
93 return err
94 }
95 if !slices.Contains(knownKnots, source.Key()) {
96 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
97 }
98
99 repo, err := db.GetRepo(
100 d,
101 orm.FilterEq("did", record.RepoDid),
102 orm.FilterEq("name", record.RepoName),
103 orm.FilterEq("knot", source.Key()),
104 )
105 if err != nil {
106 return fmt.Errorf("repo %s/%s on knot %s not found", record.RepoDid, record.RepoName, source.Key())
107 }
108
109 logger.Info("processing gitRefUpdate event",
110 "repo_did", record.RepoDid,
111 "repo_name", record.RepoName,
112 "ref", record.Ref,
113 "old_sha", record.OldSha,
114 "new_sha", record.NewSha)
115
116 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
117
118 errPunchcard := populatePunchcard(d, record)
119 errLanguages := updateRepoLanguages(d, record)
120
121 var errPosthog error
122 if !dev && record.CommitterDid != "" {
123 errPosthog = pc.Enqueue(posthog.Capture{
124 DistinctId: record.CommitterDid,
125 Event: "git_ref_update",
126 })
127 }
128
129 // Trigger a sites redeploy if this push is to the configured sites branch.
130 if cfClient.Enabled() {
131 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
132 }
133
134 return errors.Join(errPunchcard, errLanguages, errPosthog)
135}
136
137// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
138// branch configured for this repo and, if so, syncs the site to R2
139func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) {
140 logger := log.FromContext(ctx)
141
142 ref := plumbing.ReferenceName(record.Ref)
143 if !ref.IsBranch() {
144 return
145 }
146 pushedBranch := ref.Short()
147
148 repos, err := db.GetRepos(
149 d,
150 orm.FilterEq("did", record.RepoDid),
151 orm.FilterEq("name", record.RepoName),
152 )
153 if err != nil || len(repos) != 1 {
154 return
155 }
156 repo := repos[0]
157
158 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String())
159 if err != nil || siteConfig == nil {
160 return
161 }
162 if siteConfig.Branch != pushedBranch {
163 return
164 }
165
166 scheme := "https"
167 if c.Core.Dev {
168 scheme = "http"
169 }
170 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key())
171
172 deploy := &models.SiteDeploy{
173 RepoAt: repo.RepoAt().String(),
174 Branch: siteConfig.Branch,
175 Dir: siteConfig.Dir,
176 CommitSHA: record.NewSha,
177 Trigger: models.SiteDeployTriggerPush,
178 }
179
180 deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir)
181 if deployErr != nil {
182 logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr)
183 deploy.Status = models.SiteDeployStatusFailure
184 deploy.Error = deployErr.Error()
185 } else {
186 deploy.Status = models.SiteDeployStatusSuccess
187 }
188
189 if err := db.AddSiteDeploy(d, deploy); err != nil {
190 logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err)
191 }
192
193 if deployErr == nil {
194 logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName)
195 }
196}
197
198func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
199 if record.CommitterDid == "" {
200 return nil
201 }
202
203 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
204 if err != nil {
205 return err
206 }
207
208 count := 0
209 for _, ke := range knownEmails {
210 if record.Meta == nil {
211 continue
212 }
213 if record.Meta.CommitCount == nil {
214 continue
215 }
216 for _, ce := range record.Meta.CommitCount.ByEmail {
217 if ce == nil {
218 continue
219 }
220 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
221 count += int(ce.Count)
222 }
223 }
224 }
225
226 punch := models.Punch{
227 Did: record.CommitterDid,
228 Date: time.Now(),
229 Count: count,
230 }
231 return db.AddPunch(d, punch)
232}
233
234func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
235 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
236 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
237 }
238
239 repos, err := db.GetRepos(
240 d,
241 orm.FilterEq("did", record.RepoDid),
242 orm.FilterEq("name", record.RepoName),
243 )
244 if err != nil {
245 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
246 }
247 if len(repos) != 1 {
248 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
249 }
250 repo := repos[0]
251
252 ref := plumbing.ReferenceName(record.Ref)
253 if !ref.IsBranch() {
254 return fmt.Errorf("%s is not a valid reference name", ref)
255 }
256
257 var langs []models.RepoLanguage
258 for _, l := range record.Meta.LangBreakdown.Inputs {
259 if l == nil {
260 continue
261 }
262
263 langs = append(langs, models.RepoLanguage{
264 RepoAt: repo.RepoAt(),
265 Ref: ref.Short(),
266 IsDefaultRef: record.Meta.IsDefaultRef,
267 Language: l.Lang,
268 Bytes: l.Size,
269 })
270 }
271
272 tx, err := d.Begin()
273 if err != nil {
274 return err
275 }
276 defer tx.Rollback()
277
278 // update appview's cache
279 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
280 if err != nil {
281 fmt.Printf("failed; %s\n", err)
282 // non-fatal
283 }
284
285 return tx.Commit()
286}
287
288func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
289 var record tangled.Pipeline
290 err := json.Unmarshal(msg.EventJson, &record)
291 if err != nil {
292 return err
293 }
294
295 if record.TriggerMetadata == nil {
296 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
297 }
298
299 if record.TriggerMetadata.Repo == nil {
300 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
301 }
302
303 repo, err := db.GetRepo(
304 d,
305 orm.FilterEq("did", record.TriggerMetadata.Repo.Did),
306 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo),
307 orm.FilterEq("knot", source.Key()),
308 )
309 if err != nil {
310 return fmt.Errorf(
311 "failed to look for repo in DB: nsid %s, rkey %s, %s/%s, knot %s, %w",
312 msg.Nsid,
313 msg.Rkey,
314 record.TriggerMetadata.Repo.Did,
315 record.TriggerMetadata.Repo.Did,
316 source.Key(),
317 err,
318 )
319 }
320
321 // does this repo have a spindle configured?
322 if repo.Spindle == "" {
323 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
324 }
325
326 // trigger info
327 var trigger models.Trigger
328 var sha string
329 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind)
330 switch trigger.Kind {
331 case workflow.TriggerKindPush:
332 trigger.PushRef = &record.TriggerMetadata.Push.Ref
333 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
334 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
335 sha = *trigger.PushNewSha
336 case workflow.TriggerKindPullRequest:
337 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
338 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
339 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
340 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
341 sha = *trigger.PRSourceSha
342 }
343
344 tx, err := d.Begin()
345 if err != nil {
346 return fmt.Errorf("failed to start txn: %w", err)
347 }
348
349 triggerId, err := db.AddTrigger(tx, trigger)
350 if err != nil {
351 return fmt.Errorf("failed to add trigger entry: %w", err)
352 }
353
354 pipeline := models.Pipeline{
355 Rkey: msg.Rkey,
356 Knot: source.Key(),
357 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
358 RepoName: record.TriggerMetadata.Repo.Repo,
359 TriggerId: int(triggerId),
360 Sha: sha,
361 }
362
363 err = db.AddPipeline(tx, pipeline)
364 if err != nil {
365 return fmt.Errorf("failed to add pipeline: %w", err)
366 }
367
368 err = tx.Commit()
369 if err != nil {
370 return fmt.Errorf("failed to commit txn: %w", err)
371 }
372
373 return nil
374}