Monorepo for Tangled tangled.org

spindle: create pipeline events from spindle

spindle will emit `sh.tangled.pipeline` event on:
- `sh.tangled.git.refUpdate` events from knot stream
- live create/update events of `sh.tangled.repo.pull` records

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me dd7b57ae 794e403e

verified
Changed files
+281 -74
knotserver
nix
modules
spindle
+1
knotserver/internal.go
··· 176 176 } 177 177 178 178 for _, line := range lines { 179 + // TODO: pass pushOptions to refUpdate 179 180 err := h.insertRefUpdate(line, gitUserDid, repoDid, repoName) 180 181 if err != nil { 181 182 l.Error("failed to insert op", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir)
+3
nix/modules/spindle.nix
··· 136 136 "sh.tangled.repo" 137 137 "sh.tangled.repo.collaborator" 138 138 "sh.tangled.spindle.member" 139 + "sh.tangled.repo.pull" 139 140 ]}" 141 + # temporary hack to listen for repo.pull from non-tangled users 142 + "TAP_SIGNAL_COLLECTION=sh.tangled.repo.pull" 140 143 ]; 141 144 ExecStart = "${getExe cfg.tap-package} run"; 142 145 };
+14
spindle/db/events.go
··· 70 70 return evts, nil 71 71 } 72 72 73 + func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(pipeline) 75 + if err != nil { 76 + return err 77 + } 78 + event := Event{ 79 + Rkey: rkey, 80 + Nsid: tangled.PipelineNSID, 81 + Created: time.Now().UnixNano(), 82 + EventJson: string(eventJson), 83 + } 84 + return d.insertEvent(event, n) 85 + } 86 + 73 87 func (d *DB) createStatusEvent( 74 88 workflowId models.WorkflowId, 75 89 statusKind models.StatusKind,
+174 -72
spindle/server.go
··· 4 4 "context" 5 5 _ "embed" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "log/slog" 9 10 "maps" ··· 12 13 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/go-chi/chi/v5" 16 + "github.com/go-git/go-git/v5/plumbing/object" 15 17 "github.com/hashicorp/go-version" 16 18 "tangled.org/core/api/tangled" 17 19 "tangled.org/core/eventconsumer" 18 20 "tangled.org/core/eventconsumer/cursor" 19 21 "tangled.org/core/idresolver" 22 + kgit "tangled.org/core/knotserver/git" 20 23 "tangled.org/core/log" 21 24 "tangled.org/core/notifier" 22 25 "tangled.org/core/rbac2" ··· 30 33 "tangled.org/core/spindle/secrets" 31 34 "tangled.org/core/spindle/xrpc" 32 35 "tangled.org/core/tap" 36 + "tangled.org/core/tid" 37 + "tangled.org/core/workflow" 33 38 "tangled.org/core/xrpc/serviceauth" 34 39 ) 35 40 ··· 126 131 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 127 132 } 128 133 129 - // for each incoming sh.tangled.pipeline, we execute 130 - // spindle.processPipeline, which in turn enqueues the pipeline 131 - // job in the above registered queue. 134 + // spindle listen to knot stream for sh.tangled.git.refUpdate 135 + // which will sync the local workflow files in spindle and enqueues the 136 + // pipeline job for on-push workflows 132 137 ccfg := eventconsumer.NewConsumerConfig() 133 138 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 134 139 ccfg.Dev = cfg.Server.Dev 135 - ccfg.ProcessFunc = spindle.processPipeline 140 + ccfg.ProcessFunc = spindle.processKnotStream 136 141 ccfg.CursorStore = cursorStore 137 142 knownKnots, err := d.Knots() 138 143 if err != nil { ··· 263 268 return x.Router() 264 269 } 265 270 266 - func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 271 + func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 267 272 l := log.FromContext(ctx).With("handler", "processKnotStream") 268 273 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 269 274 if msg.Nsid == tangled.PipelineNSID { ··· 301 306 Rkey: msg.Rkey, 302 307 } 303 308 304 - workflows := make(map[models.Engine][]models.Workflow) 305 - 306 - // Build pipeline environment variables once for all workflows 307 - pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 308 - 309 - for _, w := range tpl.Workflows { 310 - if w != nil { 311 - if _, ok := s.engs[w.Engine]; !ok { 312 - err = s.db.StatusFailed(models.WorkflowId{ 313 - PipelineId: pipelineId, 314 - Name: w.Name, 315 - }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 316 - if err != nil { 317 - return fmt.Errorf("db.StatusFailed: %w", err) 318 - } 319 - 320 - continue 321 - } 322 - 323 - eng := s.engs[w.Engine] 324 - 325 - if _, ok := workflows[eng]; !ok { 326 - workflows[eng] = []models.Workflow{} 327 - } 328 - 329 - ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 330 - if err != nil { 331 - return fmt.Errorf("init workflow: %w", err) 332 - } 333 - 334 - // inject TANGLED_* env vars after InitWorkflow 335 - // This prevents user-defined env vars from overriding them 336 - if ewf.Environment == nil { 337 - ewf.Environment = make(map[string]string) 338 - } 339 - maps.Copy(ewf.Environment, pipelineEnv) 340 - 341 - workflows[eng] = append(workflows[eng], *ewf) 342 - 343 - err = s.db.StatusPending(models.WorkflowId{ 344 - PipelineId: pipelineId, 345 - Name: w.Name, 346 - }, s.n) 347 - if err != nil { 348 - return fmt.Errorf("db.StatusPending: %w", err) 349 - } 350 - } 351 - } 352 - 353 - ok := s.jq.Enqueue(queue.Job{ 354 - Run: func() error { 355 - engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 356 - RepoOwner: tpl.TriggerMetadata.Repo.Did, 357 - RepoName: tpl.TriggerMetadata.Repo.Repo, 358 - Workflows: workflows, 359 - }, pipelineId) 360 - return nil 361 - }, 362 - OnFail: func(jobError error) { 363 - s.l.Error("pipeline run failed", "error", jobError) 364 - }, 365 - }) 366 - if ok { 367 - s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 368 - } else { 369 - s.l.Error("failed to enqueue pipeline: queue is full") 309 + err = s.processPipeline(ctx, tpl, pipelineId) 310 + if err != nil { 311 + return err 370 312 } 371 313 } else if msg.Nsid == tangled.GitRefUpdateNSID { 372 314 event := tangled.GitRefUpdate{} ··· 391 333 } 392 334 l.Info("synced git repo") 393 335 394 - // TODO: plan the pipeline 336 + compiler := workflow.Compiler{ 337 + Trigger: tangled.Pipeline_TriggerMetadata{ 338 + Kind: string(workflow.TriggerKindPush), 339 + Push: &tangled.Pipeline_PushTriggerData{ 340 + Ref: event.Ref, 341 + OldSha: event.OldSha, 342 + NewSha: event.NewSha, 343 + }, 344 + Repo: &tangled.Pipeline_TriggerRepo{ 345 + Did: repo.Did.String(), 346 + Knot: repo.Knot, 347 + Repo: repo.Name, 348 + }, 349 + }, 350 + } 351 + 352 + // load workflow definitions from rev (without spindle context) 353 + rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 354 + if err != nil { 355 + return fmt.Errorf("loading pipeline: %w", err) 356 + } 357 + if len(rawPipeline) == 0 { 358 + l.Info("no workflow definition find for the repo. skipping the event") 359 + return nil 360 + } 361 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 362 + // TODO: pass compile error to workflow log 363 + for _, w := range compiler.Diagnostics.Errors { 364 + l.Error(w.String()) 365 + } 366 + for _, w := range compiler.Diagnostics.Warnings { 367 + l.Warn(w.String()) 368 + } 369 + 370 + pipelineId := models.PipelineId{ 371 + Knot: tpl.TriggerMetadata.Repo.Knot, 372 + Rkey: tid.TID(), 373 + } 374 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 375 + l.Error("failed to create pipeline event", "err", err) 376 + return nil 377 + } 378 + err = s.processPipeline(ctx, tpl, pipelineId) 379 + if err != nil { 380 + return err 381 + } 395 382 } 396 383 384 + return nil 385 + } 386 + 387 + func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 388 + if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 389 + return nil, fmt.Errorf("syncing git repo: %w", err) 390 + } 391 + gr, err := kgit.Open(repoPath, rev) 392 + if err != nil { 393 + return nil, fmt.Errorf("opening git repo: %w", err) 394 + } 395 + 396 + workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 397 + if errors.Is(err, object.ErrDirectoryNotFound) { 398 + // return empty RawPipeline when directory doesn't exist 399 + return nil, nil 400 + } else if err != nil { 401 + return nil, fmt.Errorf("loading file tree: %w", err) 402 + } 403 + 404 + var rawPipeline workflow.RawPipeline 405 + for _, e := range workflowDir { 406 + if !e.IsFile() { 407 + continue 408 + } 409 + 410 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 411 + contents, err := gr.RawContent(fpath) 412 + if err != nil { 413 + return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 414 + } 415 + 416 + rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 417 + Name: e.Name, 418 + Contents: contents, 419 + }) 420 + } 421 + 422 + return rawPipeline, nil 423 + } 424 + 425 + func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 426 + // Build pipeline environment variables once for all workflows 427 + pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 428 + 429 + // filter & init workflows 430 + workflows := make(map[models.Engine][]models.Workflow) 431 + for _, w := range tpl.Workflows { 432 + if w == nil { 433 + continue 434 + } 435 + if _, ok := s.engs[w.Engine]; !ok { 436 + err := s.db.StatusFailed(models.WorkflowId{ 437 + PipelineId: pipelineId, 438 + Name: w.Name, 439 + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 440 + if err != nil { 441 + return fmt.Errorf("db.StatusFailed: %w", err) 442 + } 443 + 444 + continue 445 + } 446 + 447 + eng := s.engs[w.Engine] 448 + 449 + if _, ok := workflows[eng]; !ok { 450 + workflows[eng] = []models.Workflow{} 451 + } 452 + 453 + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 454 + if err != nil { 455 + return fmt.Errorf("init workflow: %w", err) 456 + } 457 + 458 + // inject TANGLED_* env vars after InitWorkflow 459 + // This prevents user-defined env vars from overriding them 460 + if ewf.Environment == nil { 461 + ewf.Environment = make(map[string]string) 462 + } 463 + maps.Copy(ewf.Environment, pipelineEnv) 464 + 465 + workflows[eng] = append(workflows[eng], *ewf) 466 + } 467 + 468 + // enqueue pipeline 469 + ok := s.jq.Enqueue(queue.Job{ 470 + Run: func() error { 471 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 472 + RepoOwner: tpl.TriggerMetadata.Repo.Did, 473 + RepoName: tpl.TriggerMetadata.Repo.Repo, 474 + Workflows: workflows, 475 + }, pipelineId) 476 + return nil 477 + }, 478 + OnFail: func(jobError error) { 479 + s.l.Error("pipeline run failed", "error", jobError) 480 + }, 481 + }) 482 + if !ok { 483 + return fmt.Errorf("failed to enqueue pipeline: queue is full") 484 + } 485 + s.l.Info("pipeline enqueued successfully", "id", pipelineId) 486 + 487 + // emit StatusPending for all workflows here (after successful enqueue) 488 + for _, ewfs := range workflows { 489 + for _, ewf := range ewfs { 490 + err := s.db.StatusPending(models.WorkflowId{ 491 + PipelineId: pipelineId, 492 + Name: ewf.Name, 493 + }, s.n) 494 + if err != nil { 495 + return fmt.Errorf("db.StatusPending: %w", err) 496 + } 497 + } 498 + } 397 499 return nil 398 500 } 399 501
+89 -2
spindle/tap.go
··· 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 13 "tangled.org/core/spindle/git" 14 + "tangled.org/core/spindle/models" 14 15 "tangled.org/core/tap" 16 + "tangled.org/core/tid" 17 + "tangled.org/core/workflow" 15 18 ) 16 19 17 20 func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { ··· 281 284 282 285 l.Info("processing pull record") 283 286 287 + // only listen to live events 288 + if !evt.Record.Live { 289 + l.Info("skipping backfill event", "event", evt.Record.AtUri()) 290 + return nil 291 + } 292 + 284 293 switch evt.Record.Action { 285 294 case tap.RecordCreateAction, tap.RecordUpdateAction: 286 - // TODO 295 + record := tangled.RepoPull{} 296 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 297 + l.Error("invalid record", "err", err) 298 + return fmt.Errorf("parsing record: %w", err) 299 + } 300 + 301 + // ignore legacy records 302 + if record.Target == nil { 303 + l.Info("ignoring pull record: target repo is nil") 304 + return nil 305 + } 306 + 307 + // ignore patch-based and fork-based PRs 308 + if record.Source == nil || record.Source.Repo != nil { 309 + l.Info("ignoring pull record: not a branch-based pull request") 310 + return nil 311 + } 312 + 313 + // skip if target repo is unknown 314 + repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo)) 315 + if err != nil { 316 + l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 317 + return fmt.Errorf("target repo is unknown") 318 + } 319 + 320 + compiler := workflow.Compiler{ 321 + Trigger: tangled.Pipeline_TriggerMetadata{ 322 + Kind: string(workflow.TriggerKindPullRequest), 323 + PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 324 + Action: "create", 325 + SourceBranch: record.Source.Branch, 326 + SourceSha: record.Source.Sha, 327 + TargetBranch: record.Target.Branch, 328 + }, 329 + Repo: &tangled.Pipeline_TriggerRepo{ 330 + Did: repo.Did.String(), 331 + Knot: repo.Knot, 332 + Repo: repo.Name, 333 + }, 334 + }, 335 + } 336 + 337 + repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 338 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 339 + 340 + // load workflow definitions from rev (without spindle context) 341 + rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha) 342 + if err != nil { 343 + // don't retry 344 + l.Error("failed loading pipeline", "err", err) 345 + return nil 346 + } 347 + if len(rawPipeline) == 0 { 348 + l.Info("no workflow definition find for the repo. skipping the event") 349 + return nil 350 + } 351 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 352 + // TODO: pass compile error to workflow log 353 + for _, w := range compiler.Diagnostics.Errors { 354 + l.Error(w.String()) 355 + } 356 + for _, w := range compiler.Diagnostics.Warnings { 357 + l.Warn(w.String()) 358 + } 359 + 360 + pipelineId := models.PipelineId{ 361 + Knot: tpl.TriggerMetadata.Repo.Knot, 362 + Rkey: tid.TID(), 363 + } 364 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 365 + l.Error("failed to create pipeline event", "err", err) 366 + return nil 367 + } 368 + err = s.processPipeline(ctx, tpl, pipelineId) 369 + if err != nil { 370 + // don't retry 371 + l.Error("failed processing pipeline", "err", err) 372 + return nil 373 + } 287 374 case tap.RecordDeleteAction: 288 - // TODO 375 + // no-op 289 376 } 290 377 return nil 291 378 }