forked from tangled.org/core
Monorepo for Tangled

lexicons,appview,spindle: add workflow cancel

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 3a99e385 27fef613

verified
Changed files
+264
api
appview
pages
templates
repo
pipelines
pipelines
lexicons
spindle
+34
api/tangled/pipelinecancelPipeline.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + package tangled 4 + 5 + // schema: sh.tangled.pipeline.cancelPipeline 6 + 7 + import ( 8 + "context" 9 + 10 + "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + const ( 14 + PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline" 15 + ) 16 + 17 + // PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call. 18 + type PipelineCancelPipeline_Input struct { 19 + // pipeline: pipeline at-uri 20 + Pipeline string `json:"pipeline" cborgen:"pipeline"` 21 + // repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet 22 + Repo string `json:"repo" cborgen:"repo"` 23 + // workflow: workflow name 24 + Workflow string `json:"workflow" cborgen:"workflow"` 25 + } 26 + 27 + // PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline". 28 + func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error { 29 + if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil { 30 + return err 31 + } 32 + 33 + return nil 34 + }
+10
appview/pages/templates/repo/pipelines/workflow.html
··· 12 12 {{ block "sidebar" . }} {{ end }} 13 13 </div> 14 14 <div class="col-span-1 md:col-span-3"> 15 + <div class="flex justify-end mb-2"> 16 + <button 17 + class="btn" 18 + hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel" 19 + hx-swap="none" 20 + {{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}} 21 + disabled 22 + {{- end }} 23 + >Cancel</button> 24 + </div> 15 25 {{ block "logs" . }} {{ end }} 16 26 </div> 17 27 </section>
+82
appview/pipelines/pipelines.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 + "fmt" 7 8 "log/slog" 8 9 "net/http" 9 10 "strings" 10 11 "time" 11 12 13 + "tangled.org/core/api/tangled" 12 14 "tangled.org/core/appview/config" 13 15 "tangled.org/core/appview/db" 16 + "tangled.org/core/appview/models" 14 17 "tangled.org/core/appview/oauth" 15 18 "tangled.org/core/appview/pages" 16 19 "tangled.org/core/appview/reporesolver" ··· 41 44 r.Get("/", p.Index) 42 45 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 43 46 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 47 + r.Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 44 48 45 49 return r 46 50 } ··· 314 318 } 315 319 } 316 320 } 321 + } 322 + 323 + func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 324 + l := p.logger.With("handler", "Cancel") 325 + 326 + var ( 327 + pipelineId = chi.URLParam(r, "pipeline") 328 + workflow = chi.URLParam(r, "workflow") 329 + ) 330 + if pipelineId == "" || workflow == "" { 331 + http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 332 + return 333 + } 334 + 335 + f, err := p.repoResolver.Resolve(r) 336 + if err != nil { 337 + l.Error("failed to get repo and knot", "err", err) 338 + http.Error(w, "bad repo/knot", http.StatusBadRequest) 339 + return 340 + } 341 + 342 + pipeline, err := func() (models.Pipeline, error) { 343 + ps, err := db.GetPipelineStatuses( 344 + p.db, 345 + 1, 346 + orm.FilterEq("repo_owner", f.Did), 347 + orm.FilterEq("repo_name", f.Name), 348 + orm.FilterEq("knot", f.Knot), 349 + orm.FilterEq("id", pipelineId), 350 + ) 351 + if err != nil { 352 + return models.Pipeline{}, err 353 + } 354 + if len(ps) != 1 { 355 + return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 356 + } 357 + return ps[0], nil 358 + }() 359 + if err != nil { 360 + l.Error("pipeline query failed", "err", err) 361 + http.Error(w, "pipeline not found", http.StatusNotFound) 362 + } 363 + var ( 364 + spindle = f.Spindle 365 + knot = f.Knot 366 + rkey = pipeline.Rkey 367 + ) 368 + 369 + if spindle == "" || knot == "" || rkey == "" { 370 + http.Error(w, "invalid repo info", http.StatusBadRequest) 371 + return 372 + } 373 + 374 + spindleClient, err := p.oauth.ServiceClient( 375 + r, 376 + oauth.WithService(f.Spindle), 377 + oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 378 + oauth.WithExp(60), 379 + oauth.WithDev(p.config.Core.Dev), 380 + oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 381 + ) 382 + 383 + err = tangled.PipelineCancelPipeline( 384 + r.Context(), 385 + spindleClient, 386 + &tangled.PipelineCancelPipeline_Input{ 387 + Repo: string(f.RepoAt()), 388 + Pipeline: pipeline.AtUri().String(), 389 + Workflow: workflow, 390 + }, 391 + ) 392 + errorId := "pipeline-action" 393 + if err != nil { 394 + l.Error("failed to cancel pipeline", "err", err) 395 + p.pages.Notice(w, errorId, "Failed to add secret.") 396 + return 397 + } 398 + l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 317 399 } 318 400 319 401 // either a message or an error
+33
lexicons/pipeline/cancelPipeline.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "sh.tangled.pipeline.cancelPipeline", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Cancel a running pipeline", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["repo", "pipeline", "workflow"], 13 + "properties": { 14 + "repo": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" 18 + }, 19 + "pipeline": { 20 + "type": "string", 21 + "format": "at-uri", 22 + "description": "pipeline at-uri" 23 + }, 24 + "workflow": { 25 + "type": "string", 26 + "description": "workflow name" 27 + } 28 + } 29 + } 30 + } 31 + } 32 + } 33 + }
+4
spindle/db/events.go
··· 150 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 151 } 152 152 153 + func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 + return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155 + } 156 + 153 157 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 154 158 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 155 159 }
+1
spindle/server.go
··· 268 268 Config: s.cfg, 269 269 Resolver: s.res, 270 270 Vault: s.vault, 271 + Notifier: s.Notifier(), 271 272 ServiceAuth: serviceAuth, 272 273 } 273 274
+97
spindle/xrpc/pipeline_cancelPipeline.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/xrpc" 12 + securejoin "github.com/cyphar/filepath-securejoin" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/rbac" 15 + "tangled.org/core/spindle/models" 16 + xrpcerr "tangled.org/core/xrpc/errors" 17 + ) 18 + 19 + func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 20 + l := x.Logger 21 + fail := func(e xrpcerr.XrpcError) { 22 + l.Error("failed", "kind", e.Tag, "error", e.Message) 23 + writeError(w, e, http.StatusBadRequest) 24 + } 25 + l.Debug("cancel pipeline") 26 + 27 + actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 28 + if !ok { 29 + fail(xrpcerr.MissingActorDidError) 30 + return 31 + } 32 + 33 + var input tangled.PipelineCancelPipeline_Input 34 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 35 + fail(xrpcerr.GenericError(err)) 36 + return 37 + } 38 + 39 + aturi := syntax.ATURI(input.Pipeline) 40 + wid := models.WorkflowId{ 41 + PipelineId: models.PipelineId{ 42 + Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 43 + Rkey: aturi.RecordKey().String(), 44 + }, 45 + Name: input.Workflow, 46 + } 47 + l.Debug("cancel pipeline", "wid", wid) 48 + 49 + // unfortunately we have to resolve repo-at here 50 + repoAt, err := syntax.ParseATURI(input.Repo) 51 + if err != nil { 52 + fail(xrpcerr.InvalidRepoError(input.Repo)) 53 + return 54 + } 55 + 56 + ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 57 + if err != nil || ident.Handle.IsInvalidHandle() { 58 + fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 59 + return 60 + } 61 + 62 + xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 63 + resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 64 + if err != nil { 65 + fail(xrpcerr.GenericError(err)) 66 + return 67 + } 68 + 69 + repo := resp.Value.Val.(*tangled.Repo) 70 + didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 71 + if err != nil { 72 + fail(xrpcerr.GenericError(err)) 73 + return 74 + } 75 + 76 + // TODO: fine-grained role based control 77 + isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 78 + if err != nil || !isRepoOwner { 79 + fail(xrpcerr.AccessControlError(actorDid.String())) 80 + return 81 + } 82 + for _, engine := range x.Engines { 83 + l.Debug("destorying workflow", "wid", wid) 84 + err = engine.DestroyWorkflow(r.Context(), wid) 85 + if err != nil { 86 + fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) 87 + return 88 + } 89 + err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 90 + if err != nil { 91 + fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) 92 + return 93 + } 94 + } 95 + 96 + w.WriteHeader(http.StatusOK) 97 + }
+3
spindle/xrpc/xrpc.go
··· 10 10 11 11 "tangled.org/core/api/tangled" 12 12 "tangled.org/core/idresolver" 13 + "tangled.org/core/notifier" 13 14 "tangled.org/core/rbac" 14 15 "tangled.org/core/spindle/config" 15 16 "tangled.org/core/spindle/db" ··· 29 30 Config *config.Config 30 31 Resolver *idresolver.Resolver 31 32 Vault secrets.Manager 33 + Notifier *notifier.Notifier 32 34 ServiceAuth *serviceauth.ServiceAuth 33 35 } 34 36 ··· 41 43 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 42 44 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 43 45 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 46 + r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) 44 47 }) 45 48 46 49 // service query endpoints (no auth required)