From 6188b80a26b7afe305196e20a44baa5fce7e185c Mon Sep 17 00:00:00 2001 From: Seongmin Lee Date: Fri, 12 Dec 2025 17:04:15 +0900 Subject: [PATCH] lexicons,appview,spindle: add workflow cancel Change-Id: okmkyytolvkotutokmkqvkrkmvympntz Signed-off-by: Seongmin Lee --- api/tangled/pipelinecancelPipeline.go | 34 +++++++ .../templates/repo/pipelines/workflow.html | 14 +++ appview/pipelines/pipelines.go | 87 ++++++++++++++++- appview/state/router.go | 6 +- lexicons/pipeline/cancelPipeline.json | 33 +++++++ spindle/db/events.go | 4 + spindle/server.go | 1 + spindle/xrpc/pipeline_cancelPipeline.go | 97 +++++++++++++++++++ spindle/xrpc/xrpc.go | 3 + 9 files changed, 275 insertions(+), 4 deletions(-) create mode 100644 api/tangled/pipelinecancelPipeline.go create mode 100644 lexicons/pipeline/cancelPipeline.json create mode 100644 spindle/xrpc/pipeline_cancelPipeline.go diff --git a/api/tangled/pipelinecancelPipeline.go b/api/tangled/pipelinecancelPipeline.go new file mode 100644 index 00000000..975550d0 --- /dev/null +++ b/api/tangled/pipelinecancelPipeline.go @@ -0,0 +1,34 @@ +// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. + +package tangled + +// schema: sh.tangled.pipeline.cancelPipeline + +import ( + "context" + + "github.com/bluesky-social/indigo/lex/util" +) + +const ( + PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline" +) + +// PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call. +type PipelineCancelPipeline_Input struct { + // pipeline: pipeline at-uri + Pipeline string `json:"pipeline" cborgen:"pipeline"` + // repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet + Repo string `json:"repo" cborgen:"repo"` + // workflow: workflow name + Workflow string `json:"workflow" cborgen:"workflow"` +} + +// PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline". +func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error { + if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil { + return err + } + + return nil +} diff --git a/appview/pages/templates/repo/pipelines/workflow.html b/appview/pages/templates/repo/pipelines/workflow.html index 3cf562c4..8e69ba06 100644 --- a/appview/pages/templates/repo/pipelines/workflow.html +++ b/appview/pages/templates/repo/pipelines/workflow.html @@ -12,6 +12,20 @@ {{ block "sidebar" . }} {{ end }}
+ + {{ if $.RepoInfo.Roles.IsOwner }} +
+
+ +
+ {{ end }} {{ block "logs" . }} {{ end }}
diff --git a/appview/pipelines/pipelines.go b/appview/pipelines/pipelines.go index d28ca623..8280992d 100644 --- a/appview/pipelines/pipelines.go +++ b/appview/pipelines/pipelines.go @@ -4,13 +4,17 @@ import ( "bytes" "context" "encoding/json" + "fmt" "log/slog" "net/http" "strings" "time" + "tangled.org/core/api/tangled" "tangled.org/core/appview/config" "tangled.org/core/appview/db" + "tangled.org/core/appview/middleware" + "tangled.org/core/appview/models" "tangled.org/core/appview/oauth" "tangled.org/core/appview/pages" "tangled.org/core/appview/reporesolver" @@ -36,11 +40,14 @@ type Pipelines struct { logger *slog.Logger } -func (p *Pipelines) Router() http.Handler { +func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { r := chi.NewRouter() r.Get("/", p.Index) r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) + r. + With(mw.RepoPermissionMiddleware("repo:owner")). + Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) return r } @@ -316,6 +323,84 @@ func (p *Pipelines) Logs(w http.ResponseWriter, r *http.Request) { } } +func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { + l := p.logger.With("handler", "Cancel") + + var ( + pipelineId = chi.URLParam(r, "pipeline") + workflow = chi.URLParam(r, "workflow") + ) + if pipelineId == "" || workflow == "" { + http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) + return + } + + f, err := p.repoResolver.Resolve(r) + if err != nil { + l.Error("failed to get repo and knot", "err", err) + http.Error(w, "bad repo/knot", http.StatusBadRequest) + return + } + + pipeline, err := func() (models.Pipeline, error) { + ps, err := db.GetPipelineStatuses( + p.db, + 1, + orm.FilterEq("repo_owner", f.Did), + orm.FilterEq("repo_name", f.Name), + orm.FilterEq("knot", f.Knot), + orm.FilterEq("id", pipelineId), + ) + if err != nil { + return models.Pipeline{}, err + } + if len(ps) != 1 { + return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) + } + return ps[0], nil + }() + if err != nil { + l.Error("pipeline query failed", "err", err) + http.Error(w, "pipeline not found", http.StatusNotFound) + } + var ( + spindle = f.Spindle + knot = f.Knot + rkey = pipeline.Rkey + ) + + if spindle == "" || knot == "" || rkey == "" { + http.Error(w, "invalid repo info", http.StatusBadRequest) + return + } + + spindleClient, err := p.oauth.ServiceClient( + r, + oauth.WithService(f.Spindle), + oauth.WithLxm(tangled.PipelineCancelPipelineNSID), + oauth.WithDev(p.config.Core.Dev), + oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time + ) + + err = tangled.PipelineCancelPipeline( + r.Context(), + spindleClient, + &tangled.PipelineCancelPipeline_Input{ + Repo: string(f.RepoAt()), + Pipeline: pipeline.AtUri().String(), + Workflow: workflow, + }, + ) + err = fmt.Errorf("boo! new error") + errorId := "workflow-error" + if err != nil { + l.Error("failed to cancel workflow", "err", err) + p.pages.Notice(w, errorId, "Failed to cancel workflow") + return + } + l.Debug("canceled pipeline", "uri", pipeline.AtUri()) +} + // either a message or an error type logEvent struct { msg []byte diff --git a/appview/state/router.go b/appview/state/router.go index b11f12c2..d1dc53a9 100644 --- a/appview/state/router.go +++ b/appview/state/router.go @@ -96,7 +96,7 @@ func (s *State) UserRouter(mw *middleware.Middleware) http.Handler { r.Mount("/", s.RepoRouter(mw)) r.Mount("/issues", s.IssuesRouter(mw)) r.Mount("/pulls", s.PullsRouter(mw)) - r.Mount("/pipelines", s.PipelinesRouter()) + r.Mount("/pipelines", s.PipelinesRouter(mw)) r.Mount("/labels", s.LabelsRouter()) // These routes get proxied to the knot @@ -313,7 +313,7 @@ func (s *State) RepoRouter(mw *middleware.Middleware) http.Handler { return repo.Router(mw) } -func (s *State) PipelinesRouter() http.Handler { +func (s *State) PipelinesRouter(mw *middleware.Middleware) http.Handler { pipes := pipelines.New( s.oauth, s.repoResolver, @@ -325,7 +325,7 @@ func (s *State) PipelinesRouter() http.Handler { s.enforcer, log.SubLogger(s.logger, "pipelines"), ) - return pipes.Router() + return pipes.Router(mw) } func (s *State) LabelsRouter() http.Handler { diff --git a/lexicons/pipeline/cancelPipeline.json b/lexicons/pipeline/cancelPipeline.json new file mode 100644 index 00000000..f1ecf48f --- /dev/null +++ b/lexicons/pipeline/cancelPipeline.json @@ -0,0 +1,33 @@ +{ + "lexicon": 1, + "id": "sh.tangled.pipeline.cancelPipeline", + "defs": { + "main": { + "type": "procedure", + "description": "Cancel a running pipeline", + "input": { + "encoding": "application/json", + "schema": { + "type": "object", + "required": ["repo", "pipeline", "workflow"], + "properties": { + "repo": { + "type": "string", + "format": "at-uri", + "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" + }, + "pipeline": { + "type": "string", + "format": "at-uri", + "description": "pipeline at-uri" + }, + "workflow": { + "type": "string", + "description": "workflow name" + } + } + } + } + } + } +} diff --git a/spindle/db/events.go b/spindle/db/events.go index ca2dee45..06f34554 100644 --- a/spindle/db/events.go +++ b/spindle/db/events.go @@ -150,6 +150,10 @@ func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, ex return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) } +func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { + return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) +} + func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) } diff --git a/spindle/server.go b/spindle/server.go index bf51ccf1..e0d0b1ca 100644 --- a/spindle/server.go +++ b/spindle/server.go @@ -268,6 +268,7 @@ func (s *Spindle) XrpcRouter() http.Handler { Config: s.cfg, Resolver: s.res, Vault: s.vault, + Notifier: s.Notifier(), ServiceAuth: serviceAuth, } diff --git a/spindle/xrpc/pipeline_cancelPipeline.go b/spindle/xrpc/pipeline_cancelPipeline.go new file mode 100644 index 00000000..37b72868 --- /dev/null +++ b/spindle/xrpc/pipeline_cancelPipeline.go @@ -0,0 +1,97 @@ +package xrpc + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/bluesky-social/indigo/api/atproto" + "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/bluesky-social/indigo/xrpc" + securejoin "github.com/cyphar/filepath-securejoin" + "tangled.org/core/api/tangled" + "tangled.org/core/rbac" + "tangled.org/core/spindle/models" + xrpcerr "tangled.org/core/xrpc/errors" +) + +func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { + l := x.Logger + fail := func(e xrpcerr.XrpcError) { + l.Error("failed", "kind", e.Tag, "error", e.Message) + writeError(w, e, http.StatusBadRequest) + } + l.Debug("cancel pipeline") + + actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) + if !ok { + fail(xrpcerr.MissingActorDidError) + return + } + + var input tangled.PipelineCancelPipeline_Input + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + fail(xrpcerr.GenericError(err)) + return + } + + aturi := syntax.ATURI(input.Pipeline) + wid := models.WorkflowId{ + PipelineId: models.PipelineId{ + Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), + Rkey: aturi.RecordKey().String(), + }, + Name: input.Workflow, + } + l.Debug("cancel pipeline", "wid", wid) + + // unfortunately we have to resolve repo-at here + repoAt, err := syntax.ParseATURI(input.Repo) + if err != nil { + fail(xrpcerr.InvalidRepoError(input.Repo)) + return + } + + ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) + if err != nil || ident.Handle.IsInvalidHandle() { + fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) + return + } + + xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} + resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) + if err != nil { + fail(xrpcerr.GenericError(err)) + return + } + + repo := resp.Value.Val.(*tangled.Repo) + didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) + if err != nil { + fail(xrpcerr.GenericError(err)) + return + } + + // TODO: fine-grained role based control + isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) + if err != nil || !isRepoOwner { + fail(xrpcerr.AccessControlError(actorDid.String())) + return + } + for _, engine := range x.Engines { + l.Debug("destorying workflow", "wid", wid) + err = engine.DestroyWorkflow(r.Context(), wid) + if err != nil { + fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) + return + } + err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) + if err != nil { + fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) + return + } + } + + w.WriteHeader(http.StatusOK) +} diff --git a/spindle/xrpc/xrpc.go b/spindle/xrpc/xrpc.go index 46caed92..1adb2862 100644 --- a/spindle/xrpc/xrpc.go +++ b/spindle/xrpc/xrpc.go @@ -10,6 +10,7 @@ import ( "tangled.org/core/api/tangled" "tangled.org/core/idresolver" + "tangled.org/core/notifier" "tangled.org/core/rbac" "tangled.org/core/spindle/config" "tangled.org/core/spindle/db" @@ -29,6 +30,7 @@ type Xrpc struct { Config *config.Config Resolver *idresolver.Resolver Vault secrets.Manager + Notifier *notifier.Notifier ServiceAuth *serviceauth.ServiceAuth } @@ -41,6 +43,7 @@ func (x *Xrpc) Router() http.Handler { r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) + r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) }) // service query endpoints (no auth required) -- 2.43.0