Monorepo for Tangled tangled.org

lexicons,appview,spindle: add workflow cancel

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 6188b80a 76d4bef1

verified
Changed files
+275 -4
api
appview
pages
templates
repo
pipelines
pipelines
state
lexicons
spindle
+34
api/tangled/pipelinecancelPipeline.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package tangled 4 + 5 + // schema: sh.tangled.pipeline.cancelPipeline 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + const ( 14 + PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline" 15 + ) 16 + 17 + // PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call. 18 + type PipelineCancelPipeline_Input struct { 19 + // pipeline: pipeline at-uri 20 + Pipeline string `json:"pipeline" cborgen:"pipeline"` 21 + // repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet 22 + Repo string `json:"repo" cborgen:"repo"` 23 + // workflow: workflow name 24 + Workflow string `json:"workflow" cborgen:"workflow"` 25 + } 26 + 27 + // PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline". 28 + func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error { 29 + if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil { 30 + return err 31 + } 32 + 33 + return nil 34 + }
+14
appview/pages/templates/repo/pipelines/workflow.html
··· 12 12 {{ block "sidebar" . }} {{ end }} 13 13 </div> 14 14 <div class="col-span-1 md:col-span-3"> 15 + <!-- TODO(boltless): explictly check for pipeline cancel permission --> 16 + {{ if $.RepoInfo.Roles.IsOwner }} 17 + <div class="flex justify-between mb-2"> 18 + <div id="workflow-error" class="text-red-500 dark:text-red-400"></div> 19 + <button 20 + class="btn" 21 + hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel" 22 + hx-swap="none" 23 + {{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}} 24 + disabled 25 + {{- end }} 26 + >Cancel</button> 27 + </div> 28 + {{ end }} 15 29 {{ block "logs" . }} {{ end }} 16 30 </div> 17 31 </section>
+86 -1
appview/pipelines/pipelines.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 + "fmt" 7 8 "log/slog" 8 9 "net/http" 9 10 "strings" 10 11 "time" 11 12 13 + "tangled.org/core/api/tangled" 12 14 "tangled.org/core/appview/config" 13 15 "tangled.org/core/appview/db" 16 + "tangled.org/core/appview/middleware" 17 + "tangled.org/core/appview/models" 14 18 "tangled.org/core/appview/oauth" 15 19 "tangled.org/core/appview/pages" 16 20 "tangled.org/core/appview/reporesolver" ··· 36 40 logger *slog.Logger 37 41 } 38 42 39 - func (p *Pipelines) Router() http.Handler { 43 + func (p *Pipelines) Router(mw *middleware.Middleware) http.Handler { 40 44 r := chi.NewRouter() 41 45 r.Get("/", p.Index) 42 46 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 43 47 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 48 + r. 49 + With(mw.RepoPermissionMiddleware("repo:owner")). 50 + Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 44 51 45 52 return r 46 53 } ··· 314 321 } 315 322 } 316 323 } 324 + } 325 + 326 + func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 327 + l := p.logger.With("handler", "Cancel") 328 + 329 + var ( 330 + pipelineId = chi.URLParam(r, "pipeline") 331 + workflow = chi.URLParam(r, "workflow") 332 + ) 333 + if pipelineId == "" || workflow == "" { 334 + http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 335 + return 336 + } 337 + 338 + f, err := p.repoResolver.Resolve(r) 339 + if err != nil { 340 + l.Error("failed to get repo and knot", "err", err) 341 + http.Error(w, "bad repo/knot", http.StatusBadRequest) 342 + return 343 + } 344 + 345 + pipeline, err := func() (models.Pipeline, error) { 346 + ps, err := db.GetPipelineStatuses( 347 + p.db, 348 + 1, 349 + orm.FilterEq("repo_owner", f.Did), 350 + orm.FilterEq("repo_name", f.Name), 351 + orm.FilterEq("knot", f.Knot), 352 + orm.FilterEq("id", pipelineId), 353 + ) 354 + if err != nil { 355 + return models.Pipeline{}, err 356 + } 357 + if len(ps) != 1 { 358 + return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 359 + } 360 + return ps[0], nil 361 + }() 362 + if err != nil { 363 + l.Error("pipeline query failed", "err", err) 364 + http.Error(w, "pipeline not found", http.StatusNotFound) 365 + } 366 + var ( 367 + spindle = f.Spindle 368 + knot = f.Knot 369 + rkey = pipeline.Rkey 370 + ) 371 + 372 + if spindle == "" || knot == "" || rkey == "" { 373 + http.Error(w, "invalid repo info", http.StatusBadRequest) 374 + return 375 + } 376 + 377 + spindleClient, err := p.oauth.ServiceClient( 378 + r, 379 + oauth.WithService(f.Spindle), 380 + oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 381 + oauth.WithDev(p.config.Core.Dev), 382 + oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 383 + ) 384 + 385 + err = tangled.PipelineCancelPipeline( 386 + r.Context(), 387 + spindleClient, 388 + &tangled.PipelineCancelPipeline_Input{ 389 + Repo: string(f.RepoAt()), 390 + Pipeline: pipeline.AtUri().String(), 391 + Workflow: workflow, 392 + }, 393 + ) 394 + err = fmt.Errorf("boo! new error") 395 + errorId := "workflow-error" 396 + if err != nil { 397 + l.Error("failed to cancel workflow", "err", err) 398 + p.pages.Notice(w, errorId, "Failed to cancel workflow") 399 + return 400 + } 401 + l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 317 402 } 318 403 319 404 // either a message or an error
+3 -3
appview/state/router.go
··· 96 96 r.Mount("/", s.RepoRouter(mw)) 97 97 r.Mount("/issues", s.IssuesRouter(mw)) 98 98 r.Mount("/pulls", s.PullsRouter(mw)) 99 - r.Mount("/pipelines", s.PipelinesRouter()) 99 + r.Mount("/pipelines", s.PipelinesRouter(mw)) 100 100 r.Mount("/labels", s.LabelsRouter()) 101 101 102 102 // These routes get proxied to the knot ··· 313 313 return repo.Router(mw) 314 314 } 315 315 316 - func (s *State) PipelinesRouter() http.Handler { 316 + func (s *State) PipelinesRouter(mw *middleware.Middleware) http.Handler { 317 317 pipes := pipelines.New( 318 318 s.oauth, 319 319 s.repoResolver, ··· 325 325 s.enforcer, 326 326 log.SubLogger(s.logger, "pipelines"), 327 327 ) 328 - return pipes.Router() 328 + return pipes.Router(mw) 329 329 } 330 330 331 331 func (s *State) LabelsRouter() http.Handler {
+33
lexicons/pipeline/cancelPipeline.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "sh.tangled.pipeline.cancelPipeline", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Cancel a running pipeline", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["repo", "pipeline", "workflow"], 13 + "properties": { 14 + "repo": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" 18 + }, 19 + "pipeline": { 20 + "type": "string", 21 + "format": "at-uri", 22 + "description": "pipeline at-uri" 23 + }, 24 + "workflow": { 25 + "type": "string", 26 + "description": "workflow name" 27 + } 28 + } 29 + } 30 + } 31 + } 32 + } 33 + }
+4
spindle/db/events.go
··· 150 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 151 } 152 152 153 + func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 + return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155 + } 156 + 153 157 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 154 158 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 155 159 }
+1
spindle/server.go
··· 268 268 Config: s.cfg, 269 269 Resolver: s.res, 270 270 Vault: s.vault, 271 + Notifier: s.Notifier(), 271 272 ServiceAuth: serviceAuth, 272 273 } 273 274
+97
spindle/xrpc/pipeline_cancelPipeline.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/xrpc" 12 + securejoin "github.com/cyphar/filepath-securejoin" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/rbac" 15 + "tangled.org/core/spindle/models" 16 + xrpcerr "tangled.org/core/xrpc/errors" 17 + ) 18 + 19 + func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 20 + l := x.Logger 21 + fail := func(e xrpcerr.XrpcError) { 22 + l.Error("failed", "kind", e.Tag, "error", e.Message) 23 + writeError(w, e, http.StatusBadRequest) 24 + } 25 + l.Debug("cancel pipeline") 26 + 27 + actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 28 + if !ok { 29 + fail(xrpcerr.MissingActorDidError) 30 + return 31 + } 32 + 33 + var input tangled.PipelineCancelPipeline_Input 34 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 35 + fail(xrpcerr.GenericError(err)) 36 + return 37 + } 38 + 39 + aturi := syntax.ATURI(input.Pipeline) 40 + wid := models.WorkflowId{ 41 + PipelineId: models.PipelineId{ 42 + Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 43 + Rkey: aturi.RecordKey().String(), 44 + }, 45 + Name: input.Workflow, 46 + } 47 + l.Debug("cancel pipeline", "wid", wid) 48 + 49 + // unfortunately we have to resolve repo-at here 50 + repoAt, err := syntax.ParseATURI(input.Repo) 51 + if err != nil { 52 + fail(xrpcerr.InvalidRepoError(input.Repo)) 53 + return 54 + } 55 + 56 + ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 57 + if err != nil || ident.Handle.IsInvalidHandle() { 58 + fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 59 + return 60 + } 61 + 62 + xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 63 + resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 64 + if err != nil { 65 + fail(xrpcerr.GenericError(err)) 66 + return 67 + } 68 + 69 + repo := resp.Value.Val.(*tangled.Repo) 70 + didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 71 + if err != nil { 72 + fail(xrpcerr.GenericError(err)) 73 + return 74 + } 75 + 76 + // TODO: fine-grained role based control 77 + isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 78 + if err != nil || !isRepoOwner { 79 + fail(xrpcerr.AccessControlError(actorDid.String())) 80 + return 81 + } 82 + for _, engine := range x.Engines { 83 + l.Debug("destorying workflow", "wid", wid) 84 + err = engine.DestroyWorkflow(r.Context(), wid) 85 + if err != nil { 86 + fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) 87 + return 88 + } 89 + err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 90 + if err != nil { 91 + fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) 92 + return 93 + } 94 + } 95 + 96 + w.WriteHeader(http.StatusOK) 97 + }
+3
spindle/xrpc/xrpc.go
··· 10 10 11 11 "tangled.org/core/api/tangled" 12 12 "tangled.org/core/idresolver" 13 + "tangled.org/core/notifier" 13 14 "tangled.org/core/rbac" 14 15 "tangled.org/core/spindle/config" 15 16 "tangled.org/core/spindle/db" ··· 29 30 Config *config.Config 30 31 Resolver *idresolver.Resolver 31 32 Vault secrets.Manager 33 + Notifier *notifier.Notifier 32 34 ServiceAuth *serviceauth.ServiceAuth 33 35 } 34 36 ··· 41 43 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 42 44 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 43 45 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 46 + r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) 44 47 }) 45 48 46 49 // service query endpoints (no auth required)