Monorepo for Tangled tangled.org
1package spindle 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/eventconsumer" 12 "tangled.org/core/spindle/db" 13 "tangled.org/core/spindle/git" 14 "tangled.org/core/spindle/models" 15 "tangled.org/core/tap" 16 "tangled.org/core/tid" 17 "tangled.org/core/workflow" 18) 19 20func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 21 l := s.l.With("component", "tapIndexer") 22 23 var err error 24 switch evt.Type { 25 case tap.EvtRecord: 26 switch evt.Record.Collection.String() { 27 case tangled.SpindleMemberNSID: 28 err = s.processMember(ctx, evt) 29 case tangled.RepoNSID: 30 err = s.processRepo(ctx, evt) 31 case tangled.RepoCollaboratorNSID: 32 err = s.processCollaborator(ctx, evt) 33 case tangled.RepoPullNSID: 34 err = s.processPull(ctx, evt) 35 } 36 case tap.EvtIdentity: 37 // no-op 38 } 39 40 if err != nil { 41 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 42 return err 43 } 44 return nil 45} 46 47// NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 48 49func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 50 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 51 52 l.Info("processing spindle.member record") 53 54 // only listen to members 55 if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 56 l.Warn("forbidden request: member invite not allowed", "did", evt.Record.Did, "error", err) 57 return nil 58 } 59 60 switch evt.Record.Action { 61 case tap.RecordCreateAction, tap.RecordUpdateAction: 62 record := tangled.SpindleMember{} 63 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 64 return fmt.Errorf("parsing record: %w", err) 65 } 66 67 domain := s.cfg.Server.Hostname 68 if record.Instance != domain { 69 l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 70 return nil 71 } 72 73 created, err := time.Parse(record.CreatedAt, time.RFC3339) 74 if err != nil { 75 created = time.Now() 76 } 77 if err := db.AddSpindleMember(s.db, db.SpindleMember{ 78 Did: evt.Record.Did, 79 Rkey: evt.Record.Rkey.String(), 80 Instance: record.Instance, 81 Subject: syntax.DID(record.Subject), 82 Created: created, 83 }); err != nil { 84 l.Error("failed to add member", "error", err) 85 return fmt.Errorf("adding member to db: %w", err) 86 } 87 if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 88 return fmt.Errorf("adding member to rbac: %w", err) 89 } 90 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 91 return fmt.Errorf("adding did to tap", err) 92 } 93 94 l.Info("added member", "member", record.Subject) 95 return nil 96 97 case tap.RecordDeleteAction: 98 var ( 99 did = evt.Record.Did.String() 100 rkey = evt.Record.Rkey.String() 101 ) 102 member, err := db.GetSpindleMember(s.db, did, rkey) 103 if err != nil { 104 return fmt.Errorf("finding member: %w", err) 105 } 106 107 if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 108 return fmt.Errorf("removing member from db: %w", err) 109 } 110 if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 111 return fmt.Errorf("removing member from rbac: %w", err) 112 } 113 if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 114 return fmt.Errorf("removing did from tap: %w", err) 115 } 116 117 l.Info("removed member", "member", member.Subject) 118 return nil 119 } 120 return nil 121} 122 123func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 124 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 125 126 l.Info("processing repo.collaborator record") 127 128 // only listen to members 129 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 130 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 131 return nil 132 } 133 134 switch evt.Record.Action { 135 case tap.RecordCreateAction, tap.RecordUpdateAction: 136 record := tangled.RepoCollaborator{} 137 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 138 l.Error("invalid record", "err", err) 139 return fmt.Errorf("parsing record: %w", err) 140 } 141 142 // retry later if target repo is not ingested yet 143 if _, err := s.db.GetRepo(syntax.ATURI(record.Repo)); err != nil { 144 l.Warn("target repo is not ingested yet", "repo", record.Repo, "err", err) 145 return fmt.Errorf("target repo is unknown") 146 } 147 148 // check perms for this user 149 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 150 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 151 return nil 152 } 153 154 if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 155 Did: evt.Record.Did, 156 Rkey: evt.Record.Rkey, 157 Repo: syntax.ATURI(record.Repo), 158 Subject: syntax.DID(record.Subject), 159 }); err != nil { 160 return fmt.Errorf("adding collaborator to db: %w", err) 161 } 162 if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 163 return fmt.Errorf("adding collaborator to rbac: %w", err) 164 } 165 if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 166 return fmt.Errorf("adding did to tap: %w", err) 167 } 168 169 l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 170 return nil 171 172 case tap.RecordDeleteAction: 173 // get existing collaborator 174 collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 175 if err != nil { 176 return fmt.Errorf("failed to get existing collaborator info: %w", err) 177 } 178 179 // check perms for this user 180 if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 181 l.Warn("forbidden request collaborator invite not allowed", "did", evt.Record.Did, "err", err) 182 return nil 183 } 184 185 if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 186 return fmt.Errorf("removing collaborator from db: %w", err) 187 } 188 if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 189 return fmt.Errorf("removing collaborator from rbac: %w", err) 190 } 191 if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 192 return fmt.Errorf("removing did from tap: %w", err) 193 } 194 195 l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 196 return nil 197 } 198 return nil 199} 200 201func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 202 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 203 204 l.Info("processing repo record") 205 206 // only listen to members 207 if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 208 l.Warn("forbidden request: not spindle member", "did", evt.Record.Did, "err", err) 209 return nil 210 } 211 212 switch evt.Record.Action { 213 case tap.RecordCreateAction, tap.RecordUpdateAction: 214 record := tangled.Repo{} 215 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 216 return fmt.Errorf("parsing record: %w", err) 217 } 218 219 domain := s.cfg.Server.Hostname 220 if record.Spindle == nil || *record.Spindle != domain { 221 if record.Spindle == nil { 222 l.Info("spindle isn't configured", "name", record.Name) 223 } else { 224 l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 225 } 226 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 227 return fmt.Errorf("deleting repo from db: %w", err) 228 } 229 return nil 230 } 231 232 repo := &db.Repo{ 233 Did: evt.Record.Did, 234 Rkey: evt.Record.Rkey, 235 Name: record.Name, 236 Knot: record.Knot, 237 } 238 239 if err := s.db.PutRepo(repo); err != nil { 240 return fmt.Errorf("adding repo to db: %w", err) 241 } 242 243 if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 244 return fmt.Errorf("adding repo to rbac") 245 } 246 247 // add this knot to the event consumer 248 src := eventconsumer.NewKnotSource(record.Knot) 249 s.ks.AddSource(context.Background(), src) 250 251 // setup sparse sync 252 repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 253 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 254 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { 255 return fmt.Errorf("setting up sparse-clone git repo: %w", err) 256 } 257 258 l.Info("added repo", "repo", evt.Record.AtUri()) 259 return nil 260 261 case tap.RecordDeleteAction: 262 // check perms for this user 263 if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 264 l.Warn("forbidden request: not repo owner", "did", evt.Record.Did, "err", err) 265 return nil 266 } 267 268 if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 269 return fmt.Errorf("deleting repo from db: %w", err) 270 } 271 272 if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 273 return fmt.Errorf("deleting repo from rbac: %w", err) 274 } 275 276 l.Info("deleted repo", "repo", evt.Record.AtUri()) 277 return nil 278 } 279 return nil 280} 281 282func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 283 l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 284 285 l.Info("processing pull record") 286 287 // only listen to live events 288 if !evt.Record.Live { 289 l.Info("skipping backfill event", "event", evt.Record.AtUri()) 290 return nil 291 } 292 293 switch evt.Record.Action { 294 case tap.RecordCreateAction, tap.RecordUpdateAction: 295 record := tangled.RepoPull{} 296 if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 297 l.Error("invalid record", "err", err) 298 return fmt.Errorf("parsing record: %w", err) 299 } 300 301 // ignore legacy records 302 if record.Target == nil { 303 l.Info("ignoring pull record: target repo is nil") 304 return nil 305 } 306 307 // ignore patch-based and fork-based PRs 308 if record.Source == nil || record.Source.Repo != nil { 309 l.Info("ignoring pull record: not a branch-based pull request") 310 return nil 311 } 312 313 // skip if target repo is unknown 314 repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo)) 315 if err != nil { 316 l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 317 return fmt.Errorf("target repo is unknown") 318 } 319 320 compiler := workflow.Compiler{ 321 Trigger: tangled.Pipeline_TriggerMetadata{ 322 Kind: string(workflow.TriggerKindPullRequest), 323 PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 324 Action: "create", 325 SourceBranch: record.Source.Branch, 326 SourceSha: record.Source.Sha, 327 TargetBranch: record.Target.Branch, 328 }, 329 Repo: &tangled.Pipeline_TriggerRepo{ 330 Did: repo.Did.String(), 331 Knot: repo.Knot, 332 Repo: repo.Name, 333 }, 334 }, 335 } 336 337 repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 338 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 339 340 // load workflow definitions from rev (without spindle context) 341 rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha) 342 if err != nil { 343 // don't retry 344 l.Error("failed loading pipeline", "err", err) 345 return nil 346 } 347 if len(rawPipeline) == 0 { 348 l.Info("no workflow definition find for the repo. skipping the event") 349 return nil 350 } 351 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 352 // TODO: pass compile error to workflow log 353 for _, w := range compiler.Diagnostics.Errors { 354 l.Error(w.String()) 355 } 356 for _, w := range compiler.Diagnostics.Warnings { 357 l.Warn(w.String()) 358 } 359 360 pipelineId := models.PipelineId{ 361 Knot: tpl.TriggerMetadata.Repo.Knot, 362 Rkey: tid.TID(), 363 } 364 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 365 l.Error("failed to create pipeline event", "err", err) 366 return nil 367 } 368 err = s.processPipeline(ctx, tpl, pipelineId) 369 if err != nil { 370 // don't retry 371 l.Error("failed processing pipeline", "err", err) 372 return nil 373 } 374 case tap.RecordDeleteAction: 375 // no-op 376 } 377 return nil 378} 379 380func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 381 known, err := s.db.IsKnownDid(syntax.DID(did)) 382 if err != nil { 383 return fmt.Errorf("ensuring did known state: %w", err) 384 } 385 if !known { 386 if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 387 return fmt.Errorf("removing did from tap: %w", err) 388 } 389 } 390 return nil 391}