Monorepo for Tangled tangled.org

spindle: create pipeline events from spindle #985

open opened by boltless.me targeting master from sl/spindle-rewrite

spindle will emit sh.tangled.pipeline event on:

  • sh.tangled.git.refUpdate events from knot stream
  • live create/update events of sh.tangled.repo.pull records

Signed-off-by: Seongmin Lee git@boltless.me

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:xasnlahkri4ewmbuzly2rlc5/sh.tangled.repo.pull/3mckguakffh22
+281 -74
Diff #2
+1
knotserver/internal.go
··· 176 176 } 177 177 178 178 for _, line := range lines { 179 + // TODO: pass pushOptions to refUpdate 179 180 err := h.insertRefUpdate(line, gitUserDid, repoDid, repoName) 180 181 if err != nil { 181 182 l.Error("failed to insert op", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir)
+3
nix/modules/spindle.nix
··· 136 136 "sh.tangled.repo" 137 137 "sh.tangled.repo.collaborator" 138 138 "sh.tangled.spindle.member" 139 + "sh.tangled.repo.pull" 139 140 ]}" 141 + # temporary hack to listen for repo.pull from non-tangled users 142 + "TAP_SIGNAL_COLLECTION=sh.tangled.repo.pull" 140 143 ]; 141 144 ExecStart = "${getExe cfg.tap-package} run"; 142 145 };
+14
spindle/db/events.go
··· 70 70 return evts, nil 71 71 } 72 72 73 + func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(pipeline) 75 + if err != nil { 76 + return err 77 + } 78 + event := Event{ 79 + Rkey: rkey, 80 + Nsid: tangled.PipelineNSID, 81 + Created: time.Now().UnixNano(), 82 + EventJson: string(eventJson), 83 + } 84 + return d.insertEvent(event, n) 85 + } 86 + 73 87 func (d *DB) createStatusEvent( 74 88 workflowId models.WorkflowId, 75 89 statusKind models.StatusKind,
+174 -72
spindle/server.go
··· 4 4 "context" 5 5 _ "embed" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "log/slog" 9 10 "maps" ··· 13 14 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 15 16 "github.com/go-chi/chi/v5" 17 + "github.com/go-git/go-git/v5/plumbing/object" 16 18 "github.com/hashicorp/go-version" 17 19 "tangled.org/core/api/tangled" 18 20 "tangled.org/core/eventconsumer" 19 21 "tangled.org/core/eventconsumer/cursor" 20 22 "tangled.org/core/idresolver" 23 + kgit "tangled.org/core/knotserver/git" 21 24 "tangled.org/core/log" 22 25 "tangled.org/core/notifier" 23 26 "tangled.org/core/rbac2" ··· 31 34 "tangled.org/core/spindle/secrets" 32 35 "tangled.org/core/spindle/xrpc" 33 36 "tangled.org/core/tap" 37 + "tangled.org/core/tid" 38 + "tangled.org/core/workflow" 34 39 "tangled.org/core/xrpc/serviceauth" 35 40 ) 36 41 ··· 130 135 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 131 136 } 132 137 133 - // for each incoming sh.tangled.pipeline, we execute 134 - // spindle.processPipeline, which in turn enqueues the pipeline 135 - // job in the above registered queue. 138 + // spindle listen to knot stream for sh.tangled.git.refUpdate 139 + // which will sync the local workflow files in spindle and enqueues the 140 + // pipeline job for on-push workflows 136 141 ccfg := eventconsumer.NewConsumerConfig() 137 142 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 138 143 ccfg.Dev = cfg.Server.Dev 139 - ccfg.ProcessFunc = spindle.processPipeline 144 + ccfg.ProcessFunc = spindle.processKnotStream 140 145 ccfg.CursorStore = cursorStore 141 146 knownKnots, err := d.Knots() 142 147 if err != nil { ··· 281 286 return x.Router() 282 287 } 283 288 284 - func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 289 + func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 285 290 l := log.FromContext(ctx).With("handler", "processKnotStream") 286 291 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 287 292 if msg.Nsid == tangled.PipelineNSID { ··· 319 324 Rkey: msg.Rkey, 320 325 } 321 326 322 - workflows := make(map[models.Engine][]models.Workflow) 323 - 324 - // Build pipeline environment variables once for all workflows 325 - pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 326 - 327 - for _, w := range tpl.Workflows { 328 - if w != nil { 329 - if _, ok := s.engs[w.Engine]; !ok { 330 - err = s.db.StatusFailed(models.WorkflowId{ 331 - PipelineId: pipelineId, 332 - Name: w.Name, 333 - }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 334 - if err != nil { 335 - return fmt.Errorf("db.StatusFailed: %w", err) 336 - } 337 - 338 - continue 339 - } 340 - 341 - eng := s.engs[w.Engine] 342 - 343 - if _, ok := workflows[eng]; !ok { 344 - workflows[eng] = []models.Workflow{} 345 - } 346 - 347 - ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 348 - if err != nil { 349 - return fmt.Errorf("init workflow: %w", err) 350 - } 351 - 352 - // inject TANGLED_* env vars after InitWorkflow 353 - // This prevents user-defined env vars from overriding them 354 - if ewf.Environment == nil { 355 - ewf.Environment = make(map[string]string) 356 - } 357 - maps.Copy(ewf.Environment, pipelineEnv) 358 - 359 - workflows[eng] = append(workflows[eng], *ewf) 360 - 361 - err = s.db.StatusPending(models.WorkflowId{ 362 - PipelineId: pipelineId, 363 - Name: w.Name, 364 - }, s.n) 365 - if err != nil { 366 - return fmt.Errorf("db.StatusPending: %w", err) 367 - } 368 - } 369 - } 370 - 371 - ok := s.jq.Enqueue(queue.Job{ 372 - Run: func() error { 373 - engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 374 - RepoOwner: tpl.TriggerMetadata.Repo.Did, 375 - RepoName: tpl.TriggerMetadata.Repo.Repo, 376 - Workflows: workflows, 377 - }, pipelineId) 378 - return nil 379 - }, 380 - OnFail: func(jobError error) { 381 - s.l.Error("pipeline run failed", "error", jobError) 382 - }, 383 - }) 384 - if ok { 385 - s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 386 - } else { 387 - s.l.Error("failed to enqueue pipeline: queue is full") 327 + err = s.processPipeline(ctx, tpl, pipelineId) 328 + if err != nil { 329 + return err 388 330 } 389 331 } else if msg.Nsid == tangled.GitRefUpdateNSID { 390 332 event := tangled.GitRefUpdate{} ··· 409 351 } 410 352 l.Info("synced git repo") 411 353 412 - // TODO: plan the pipeline 354 + compiler := workflow.Compiler{ 355 + Trigger: tangled.Pipeline_TriggerMetadata{ 356 + Kind: string(workflow.TriggerKindPush), 357 + Push: &tangled.Pipeline_PushTriggerData{ 358 + Ref: event.Ref, 359 + OldSha: event.OldSha, 360 + NewSha: event.NewSha, 361 + }, 362 + Repo: &tangled.Pipeline_TriggerRepo{ 363 + Did: repo.Did.String(), 364 + Knot: repo.Knot, 365 + Repo: repo.Name, 366 + }, 367 + }, 368 + } 369 + 370 + // load workflow definitions from rev (without spindle context) 371 + rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 372 + if err != nil { 373 + return fmt.Errorf("loading pipeline: %w", err) 374 + } 375 + if len(rawPipeline) == 0 { 376 + l.Info("no workflow definition find for the repo. skipping the event") 377 + return nil 378 + } 379 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 380 + // TODO: pass compile error to workflow log 381 + for _, w := range compiler.Diagnostics.Errors { 382 + l.Error(w.String()) 383 + } 384 + for _, w := range compiler.Diagnostics.Warnings { 385 + l.Warn(w.String()) 386 + } 387 + 388 + pipelineId := models.PipelineId{ 389 + Knot: tpl.TriggerMetadata.Repo.Knot, 390 + Rkey: tid.TID(), 391 + } 392 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 393 + l.Error("failed to create pipeline event", "err", err) 394 + return nil 395 + } 396 + err = s.processPipeline(ctx, tpl, pipelineId) 397 + if err != nil { 398 + return err 399 + } 400 + } 401 + 402 + return nil 403 + } 404 + 405 + func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 406 + if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 407 + return nil, fmt.Errorf("syncing git repo: %w", err) 408 + } 409 + gr, err := kgit.Open(repoPath, rev) 410 + if err != nil { 411 + return nil, fmt.Errorf("opening git repo: %w", err) 412 + } 413 + 414 + workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 415 + if errors.Is(err, object.ErrDirectoryNotFound) { 416 + // return empty RawPipeline when directory doesn't exist 417 + return nil, nil 418 + } else if err != nil { 419 + return nil, fmt.Errorf("loading file tree: %w", err) 413 420 } 414 421 422 + var rawPipeline workflow.RawPipeline 423 + for _, e := range workflowDir { 424 + if !e.IsFile() { 425 + continue 426 + } 427 + 428 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 429 + contents, err := gr.RawContent(fpath) 430 + if err != nil { 431 + return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 432 + } 433 + 434 + rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 435 + Name: e.Name, 436 + Contents: contents, 437 + }) 438 + } 439 + 440 + return rawPipeline, nil 441 + } 442 + 443 + func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 444 + // Build pipeline environment variables once for all workflows 445 + pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 446 + 447 + // filter & init workflows 448 + workflows := make(map[models.Engine][]models.Workflow) 449 + for _, w := range tpl.Workflows { 450 + if w == nil { 451 + continue 452 + } 453 + if _, ok := s.engs[w.Engine]; !ok { 454 + err := s.db.StatusFailed(models.WorkflowId{ 455 + PipelineId: pipelineId, 456 + Name: w.Name, 457 + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 458 + if err != nil { 459 + return fmt.Errorf("db.StatusFailed: %w", err) 460 + } 461 + 462 + continue 463 + } 464 + 465 + eng := s.engs[w.Engine] 466 + 467 + if _, ok := workflows[eng]; !ok { 468 + workflows[eng] = []models.Workflow{} 469 + } 470 + 471 + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 472 + if err != nil { 473 + return fmt.Errorf("init workflow: %w", err) 474 + } 475 + 476 + // inject TANGLED_* env vars after InitWorkflow 477 + // This prevents user-defined env vars from overriding them 478 + if ewf.Environment == nil { 479 + ewf.Environment = make(map[string]string) 480 + } 481 + maps.Copy(ewf.Environment, pipelineEnv) 482 + 483 + workflows[eng] = append(workflows[eng], *ewf) 484 + } 485 + 486 + // enqueue pipeline 487 + ok := s.jq.Enqueue(queue.Job{ 488 + Run: func() error { 489 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 490 + RepoOwner: tpl.TriggerMetadata.Repo.Did, 491 + RepoName: tpl.TriggerMetadata.Repo.Repo, 492 + Workflows: workflows, 493 + }, pipelineId) 494 + return nil 495 + }, 496 + OnFail: func(jobError error) { 497 + s.l.Error("pipeline run failed", "error", jobError) 498 + }, 499 + }) 500 + if !ok { 501 + return fmt.Errorf("failed to enqueue pipeline: queue is full") 502 + } 503 + s.l.Info("pipeline enqueued successfully", "id", pipelineId) 504 + 505 + // emit StatusPending for all workflows here (after successful enqueue) 506 + for _, ewfs := range workflows { 507 + for _, ewf := range ewfs { 508 + err := s.db.StatusPending(models.WorkflowId{ 509 + PipelineId: pipelineId, 510 + Name: ewf.Name, 511 + }, s.n) 512 + if err != nil { 513 + return fmt.Errorf("db.StatusPending: %w", err) 514 + } 515 + } 516 + } 415 517 return nil 416 518 } 417 519
+89 -2
spindle/tap.go
··· 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 13 "tangled.org/core/spindle/git" 14 + "tangled.org/core/spindle/models" 14 15 "tangled.org/core/tap" 16 + "tangled.org/core/tid" 17 + "tangled.org/core/workflow" 15 18 ) 16 19 17 20 func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { ··· 281 284 282 285 l.Info("processing pull record") 283 286 287 + // only listen to live events 288 + if !evt.Record.Live { 289 + l.Info("skipping backfill event", "event", evt.Record.AtUri()) 290 + return nil 291 + } 292 + 284 293 switch evt.Record.Action { 285 294 case tap.RecordCreateAction, tap.RecordUpdateAction: 286 - // TODO 295 + record := tangled.RepoPull{} 296 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 297 + l.Error("invalid record", "err", err) 298 + return fmt.Errorf("parsing record: %w", err) 299 + } 300 + 301 + // ignore legacy records 302 + if record.Target == nil { 303 + l.Info("ignoring pull record: target repo is nil") 304 + return nil 305 + } 306 + 307 + // ignore patch-based and fork-based PRs 308 + if record.Source == nil || record.Source.Repo != nil { 309 + l.Info("ignoring pull record: not a branch-based pull request") 310 + return nil 311 + } 312 + 313 + // skip if target repo is unknown 314 + repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo)) 315 + if err != nil { 316 + l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 317 + return fmt.Errorf("target repo is unknown") 318 + } 319 + 320 + compiler := workflow.Compiler{ 321 + Trigger: tangled.Pipeline_TriggerMetadata{ 322 + Kind: string(workflow.TriggerKindPullRequest), 323 + PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 324 + Action: "create", 325 + SourceBranch: record.Source.Branch, 326 + SourceSha: record.Source.Sha, 327 + TargetBranch: record.Target.Branch, 328 + }, 329 + Repo: &tangled.Pipeline_TriggerRepo{ 330 + Did: repo.Did.String(), 331 + Knot: repo.Knot, 332 + Repo: repo.Name, 333 + }, 334 + }, 335 + } 336 + 337 + repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 338 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 339 + 340 + // load workflow definitions from rev (without spindle context) 341 + rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha) 342 + if err != nil { 343 + // don't retry 344 + l.Error("failed loading pipeline", "err", err) 345 + return nil 346 + } 347 + if len(rawPipeline) == 0 { 348 + l.Info("no workflow definition find for the repo. skipping the event") 349 + return nil 350 + } 351 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 352 + // TODO: pass compile error to workflow log 353 + for _, w := range compiler.Diagnostics.Errors { 354 + l.Error(w.String()) 355 + } 356 + for _, w := range compiler.Diagnostics.Warnings { 357 + l.Warn(w.String()) 358 + } 359 + 360 + pipelineId := models.PipelineId{ 361 + Knot: tpl.TriggerMetadata.Repo.Knot, 362 + Rkey: tid.TID(), 363 + } 364 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 365 + l.Error("failed to create pipeline event", "err", err) 366 + return nil 367 + } 368 + err = s.processPipeline(ctx, tpl, pipelineId) 369 + if err != nil { 370 + // don't retry 371 + l.Error("failed processing pipeline", "err", err) 372 + return nil 373 + } 287 374 case tap.RecordDeleteAction: 288 - // TODO 375 + // no-op 289 376 } 290 377 return nil 291 378 }

History

3 rounds 0 comments
sign up or login to add to the discussion
1 commit
expand
spindle: create pipeline events from spindle
1/3 failed, 2/3 success
expand
no conflicts, ready to merge
expand 0 comments
1 commit
expand
spindle: create pipeline events from spindle
1/3 failed, 2/3 success
expand
expand 0 comments
1 commit
expand
spindle: create pipeline events from spindle
1/3 failed, 2/3 success
expand
expand 0 comments