Monorepo for Tangled
at master 374 lines 10 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/appview/cloudflare" 12 "tangled.org/core/appview/notify" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview/cache" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/sites" 20 ec "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/log" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 "tangled.org/core/workflow" 26 27 "github.com/bluesky-social/indigo/atproto/syntax" 28 "github.com/go-git/go-git/v5/plumbing" 29 "github.com/posthog/posthog-go" 30) 31 32func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 33 logger := log.FromContext(ctx) 34 logger = log.SubLogger(logger, "knotstream") 35 36 knots, err := db.GetRegistrations( 37 d, 38 orm.FilterIsNot("registered", "null"), 39 ) 40 if err != nil { 41 return nil, err 42 } 43 44 srcs := make(map[ec.Source]struct{}) 45 for _, k := range knots { 46 s := ec.NewKnotSource(k.Domain) 47 srcs[s] = struct{}{} 48 } 49 50 cache := cache.New(c.Redis.Addr) 51 cursorStore := cursor.NewRedisCursorStore(cache) 52 53 cfg := ec.ConsumerConfig{ 54 Sources: srcs, 55 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 56 RetryInterval: c.Knotstream.RetryInterval, 57 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 58 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 59 WorkerCount: c.Knotstream.WorkerCount, 60 QueueSize: c.Knotstream.QueueSize, 61 Logger: logger, 62 Dev: c.Core.Dev, 63 CursorStore: &cursorStore, 64 } 65 66 return ec.NewConsumer(cfg), nil 67} 68 69func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 70 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 71 switch msg.Nsid { 72 case tangled.GitRefUpdateNSID: 73 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 74 case tangled.PipelineNSID: 75 return ingestPipeline(d, source, msg) 76 } 77 78 return nil 79 } 80} 81 82func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error { 83 logger := log.FromContext(ctx) 84 85 var record tangled.GitRefUpdate 86 err := json.Unmarshal(msg.EventJson, &record) 87 if err != nil { 88 return err 89 } 90 91 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 92 if err != nil { 93 return err 94 } 95 if !slices.Contains(knownKnots, source.Key()) { 96 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 97 } 98 99 repo, err := db.GetRepo( 100 d, 101 orm.FilterEq("did", record.RepoDid), 102 orm.FilterEq("name", record.RepoName), 103 orm.FilterEq("knot", source.Key()), 104 ) 105 if err != nil { 106 return fmt.Errorf("repo %s/%s on knot %s not found", record.RepoDid, record.RepoName, source.Key()) 107 } 108 109 logger.Info("processing gitRefUpdate event", 110 "repo_did", record.RepoDid, 111 "repo_name", record.RepoName, 112 "ref", record.Ref, 113 "old_sha", record.OldSha, 114 "new_sha", record.NewSha) 115 116 notifier.Push(ctx, repo, record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 117 118 errPunchcard := populatePunchcard(d, record) 119 errLanguages := updateRepoLanguages(d, record) 120 121 var errPosthog error 122 if !dev && record.CommitterDid != "" { 123 errPosthog = pc.Enqueue(posthog.Capture{ 124 DistinctId: record.CommitterDid, 125 Event: "git_ref_update", 126 }) 127 } 128 129 // Trigger a sites redeploy if this push is to the configured sites branch. 130 if cfClient.Enabled() { 131 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 132 } 133 134 return errors.Join(errPunchcard, errLanguages, errPosthog) 135} 136 137// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 138// branch configured for this repo and, if so, syncs the site to R2 139func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) { 140 logger := log.FromContext(ctx) 141 142 ref := plumbing.ReferenceName(record.Ref) 143 if !ref.IsBranch() { 144 return 145 } 146 pushedBranch := ref.Short() 147 148 repos, err := db.GetRepos( 149 d, 150 orm.FilterEq("did", record.RepoDid), 151 orm.FilterEq("name", record.RepoName), 152 ) 153 if err != nil || len(repos) != 1 { 154 return 155 } 156 repo := repos[0] 157 158 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String()) 159 if err != nil || siteConfig == nil { 160 return 161 } 162 if siteConfig.Branch != pushedBranch { 163 return 164 } 165 166 scheme := "https" 167 if c.Core.Dev { 168 scheme = "http" 169 } 170 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key()) 171 172 deploy := &models.SiteDeploy{ 173 RepoAt: repo.RepoAt().String(), 174 Branch: siteConfig.Branch, 175 Dir: siteConfig.Dir, 176 CommitSHA: record.NewSha, 177 Trigger: models.SiteDeployTriggerPush, 178 } 179 180 deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir) 181 if deployErr != nil { 182 logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr) 183 deploy.Status = models.SiteDeployStatusFailure 184 deploy.Error = deployErr.Error() 185 } else { 186 deploy.Status = models.SiteDeployStatusSuccess 187 } 188 189 if err := db.AddSiteDeploy(d, deploy); err != nil { 190 logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err) 191 } 192 193 if deployErr == nil { 194 logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName) 195 } 196} 197 198func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 199 if record.CommitterDid == "" { 200 return nil 201 } 202 203 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 204 if err != nil { 205 return err 206 } 207 208 count := 0 209 for _, ke := range knownEmails { 210 if record.Meta == nil { 211 continue 212 } 213 if record.Meta.CommitCount == nil { 214 continue 215 } 216 for _, ce := range record.Meta.CommitCount.ByEmail { 217 if ce == nil { 218 continue 219 } 220 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 221 count += int(ce.Count) 222 } 223 } 224 } 225 226 punch := models.Punch{ 227 Did: record.CommitterDid, 228 Date: time.Now(), 229 Count: count, 230 } 231 return db.AddPunch(d, punch) 232} 233 234func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 235 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 236 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 237 } 238 239 repos, err := db.GetRepos( 240 d, 241 orm.FilterEq("did", record.RepoDid), 242 orm.FilterEq("name", record.RepoName), 243 ) 244 if err != nil { 245 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 246 } 247 if len(repos) != 1 { 248 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 249 } 250 repo := repos[0] 251 252 ref := plumbing.ReferenceName(record.Ref) 253 if !ref.IsBranch() { 254 return fmt.Errorf("%s is not a valid reference name", ref) 255 } 256 257 var langs []models.RepoLanguage 258 for _, l := range record.Meta.LangBreakdown.Inputs { 259 if l == nil { 260 continue 261 } 262 263 langs = append(langs, models.RepoLanguage{ 264 RepoAt: repo.RepoAt(), 265 Ref: ref.Short(), 266 IsDefaultRef: record.Meta.IsDefaultRef, 267 Language: l.Lang, 268 Bytes: l.Size, 269 }) 270 } 271 272 tx, err := d.Begin() 273 if err != nil { 274 return err 275 } 276 defer tx.Rollback() 277 278 // update appview's cache 279 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 280 if err != nil { 281 fmt.Printf("failed; %s\n", err) 282 // non-fatal 283 } 284 285 return tx.Commit() 286} 287 288func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error { 289 var record tangled.Pipeline 290 err := json.Unmarshal(msg.EventJson, &record) 291 if err != nil { 292 return err 293 } 294 295 if record.TriggerMetadata == nil { 296 return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 297 } 298 299 if record.TriggerMetadata.Repo == nil { 300 return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 301 } 302 303 repo, err := db.GetRepo( 304 d, 305 orm.FilterEq("did", record.TriggerMetadata.Repo.Did), 306 orm.FilterEq("name", record.TriggerMetadata.Repo.Repo), 307 orm.FilterEq("knot", source.Key()), 308 ) 309 if err != nil { 310 return fmt.Errorf( 311 "failed to look for repo in DB: nsid %s, rkey %s, %s/%s, knot %s, %w", 312 msg.Nsid, 313 msg.Rkey, 314 record.TriggerMetadata.Repo.Did, 315 record.TriggerMetadata.Repo.Did, 316 source.Key(), 317 err, 318 ) 319 } 320 321 // does this repo have a spindle configured? 322 if repo.Spindle == "" { 323 return fmt.Errorf("repo does not have a spindle configured yet: nsid %s, rkey %s", msg.Nsid, msg.Rkey) 324 } 325 326 // trigger info 327 var trigger models.Trigger 328 var sha string 329 trigger.Kind = workflow.TriggerKind(record.TriggerMetadata.Kind) 330 switch trigger.Kind { 331 case workflow.TriggerKindPush: 332 trigger.PushRef = &record.TriggerMetadata.Push.Ref 333 trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha 334 trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha 335 sha = *trigger.PushNewSha 336 case workflow.TriggerKindPullRequest: 337 trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch 338 trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch 339 trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha 340 trigger.PRAction = &record.TriggerMetadata.PullRequest.Action 341 sha = *trigger.PRSourceSha 342 } 343 344 tx, err := d.Begin() 345 if err != nil { 346 return fmt.Errorf("failed to start txn: %w", err) 347 } 348 349 triggerId, err := db.AddTrigger(tx, trigger) 350 if err != nil { 351 return fmt.Errorf("failed to add trigger entry: %w", err) 352 } 353 354 pipeline := models.Pipeline{ 355 Rkey: msg.Rkey, 356 Knot: source.Key(), 357 RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did), 358 RepoName: record.TriggerMetadata.Repo.Repo, 359 TriggerId: int(triggerId), 360 Sha: sha, 361 } 362 363 err = db.AddPipeline(tx, pipeline) 364 if err != nil { 365 return fmt.Errorf("failed to add pipeline: %w", err) 366 } 367 368 err = tx.Commit() 369 if err != nil { 370 return fmt.Errorf("failed to commit txn: %w", err) 371 } 372 373 return nil 374}