forked from tangled.org/core
Monorepo for Tangled

Compare changes

Choose any two refs to compare.

Changed files
+98 -325
api
appview
lexicons
spindle
-34
api/tangled/pipelinecancelPipeline.go
··· 1 - // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 - 3 - package tangled 4 - 5 - // schema: sh.tangled.pipeline.cancelPipeline 6 - 7 - import ( 8 - "context" 9 - 10 - "github.com/bluesky-social/indigo/lex/util" 11 - ) 12 - 13 - const ( 14 - PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline" 15 - ) 16 - 17 - // PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call. 18 - type PipelineCancelPipeline_Input struct { 19 - // pipeline: pipeline at-uri 20 - Pipeline string `json:"pipeline" cborgen:"pipeline"` 21 - // repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet 22 - Repo string `json:"repo" cborgen:"repo"` 23 - // workflow: workflow name 24 - Workflow string `json:"workflow" cborgen:"workflow"` 25 - } 26 - 27 - // PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline". 28 - func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error { 29 - if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil { 30 - return err 31 - } 32 - 33 - return nil 34 - }
+2
appview/db/follow.go
··· 167 167 if err != nil { 168 168 return nil, err 169 169 } 170 + defer rows.Close() 171 + 170 172 for rows.Next() { 171 173 var follow models.Follow 172 174 var followedAt string
+1
appview/db/issues.go
··· 452 452 if err != nil { 453 453 return nil, err 454 454 } 455 + defer rows.Close() 455 456 456 457 for rows.Next() { 457 458 var comment models.IssueComment
+1 -1
appview/db/language.go
··· 28 28 whereClause, 29 29 ) 30 30 rows, err := e.Query(query, args...) 31 - 32 31 if err != nil { 33 32 return nil, fmt.Errorf("failed to execute query: %w ", err) 34 33 } 34 + defer rows.Close() 35 35 36 36 var langs []models.RepoLanguage 37 37 for rows.Next() {
+6 -6
appview/db/pipeline.go
··· 6 6 "strings" 7 7 "time" 8 8 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 9 "tangled.org/core/appview/models" 11 10 "tangled.org/core/orm" 12 11 ) ··· 217 216 } 218 217 defer rows.Close() 219 218 220 - pipelines := make(map[syntax.ATURI]models.Pipeline) 219 + pipelines := make(map[string]models.Pipeline) 221 220 for rows.Next() { 222 221 var p models.Pipeline 223 222 var t models.Trigger ··· 254 253 p.Trigger = &t 255 254 p.Statuses = make(map[string]models.WorkflowStatus) 256 255 257 - pipelines[p.AtUri()] = p 256 + k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 257 + pipelines[k] = p 258 258 } 259 259 260 260 // get all statuses ··· 314 314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 315 315 } 316 316 317 - pipelineAt := ps.PipelineAt() 317 + key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 318 318 319 319 // extract 320 - pipeline, ok := pipelines[pipelineAt] 320 + pipeline, ok := pipelines[key] 321 321 if !ok { 322 322 continue 323 323 } ··· 331 331 332 332 // reassign 333 333 pipeline.Statuses[ps.Workflow] = statuses 334 - pipelines[pipelineAt] = pipeline 334 + pipelines[key] = pipeline 335 335 } 336 336 337 337 var all []models.Pipeline
+5
appview/db/profile.go
··· 230 230 if err != nil { 231 231 return nil, err 232 232 } 233 + defer rows.Close() 233 234 234 235 profileMap := make(map[string]*models.Profile) 235 236 for rows.Next() { ··· 270 271 if err != nil { 271 272 return nil, err 272 273 } 274 + defer rows.Close() 275 + 273 276 idxs := make(map[string]int) 274 277 for did := range profileMap { 275 278 idxs[did] = 0 ··· 290 293 if err != nil { 291 294 return nil, err 292 295 } 296 + defer rows.Close() 297 + 293 298 idxs = make(map[string]int) 294 299 for did := range profileMap { 295 300 idxs[did] = 0
+1
appview/db/registration.go
··· 38 38 if err != nil { 39 39 return nil, err 40 40 } 41 + defer rows.Close() 41 42 42 43 for rows.Next() { 43 44 var createdAt string
+11 -1
appview/db/repos.go
··· 56 56 limitClause, 57 57 ) 58 58 rows, err := e.Query(repoQuery, args...) 59 - 60 59 if err != nil { 61 60 return nil, fmt.Errorf("failed to execute repo query: %w ", err) 62 61 } 62 + defer rows.Close() 63 63 64 64 for rows.Next() { 65 65 var repo models.Repo ··· 128 128 if err != nil { 129 129 return nil, fmt.Errorf("failed to execute labels query: %w ", err) 130 130 } 131 + defer rows.Close() 132 + 131 133 for rows.Next() { 132 134 var repoat, labelat string 133 135 if err := rows.Scan(&repoat, &labelat); err != nil { ··· 165 167 if err != nil { 166 168 return nil, fmt.Errorf("failed to execute lang query: %w ", err) 167 169 } 170 + defer rows.Close() 171 + 168 172 for rows.Next() { 169 173 var repoat, lang string 170 174 if err := rows.Scan(&repoat, &lang); err != nil { ··· 191 195 if err != nil { 192 196 return nil, fmt.Errorf("failed to execute star-count query: %w ", err) 193 197 } 198 + defer rows.Close() 199 + 194 200 for rows.Next() { 195 201 var repoat string 196 202 var count int ··· 220 226 if err != nil { 221 227 return nil, fmt.Errorf("failed to execute issue-count query: %w ", err) 222 228 } 229 + defer rows.Close() 230 + 223 231 for rows.Next() { 224 232 var repoat string 225 233 var open, closed int ··· 261 269 if err != nil { 262 270 return nil, fmt.Errorf("failed to execute pulls-count query: %w ", err) 263 271 } 272 + defer rows.Close() 273 + 264 274 for rows.Next() { 265 275 var repoat string 266 276 var open, merged, closed, deleted int
+1
appview/db/star.go
··· 165 165 if err != nil { 166 166 return nil, err 167 167 } 168 + defer rows.Close() 168 169 169 170 starMap := make(map[string][]models.Star) 170 171 for rows.Next() {
-10
appview/models/pipeline.go
··· 1 1 package models 2 2 3 3 import ( 4 - "fmt" 5 4 "slices" 6 5 "time" 7 6 8 7 "github.com/bluesky-social/indigo/atproto/syntax" 9 8 "github.com/go-git/go-git/v5/plumbing" 10 - "tangled.org/core/api/tangled" 11 9 spindle "tangled.org/core/spindle/models" 12 10 "tangled.org/core/workflow" 13 11 ) ··· 25 23 // populate when querying for reverse mappings 26 24 Trigger *Trigger 27 25 Statuses map[string]WorkflowStatus 28 - } 29 - 30 - func (p *Pipeline) AtUri() syntax.ATURI { 31 - return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", p.Knot, tangled.PipelineNSID, p.Rkey)) 32 26 } 33 27 34 28 type WorkflowStatus struct { ··· 134 128 Error *string 135 129 ExitCode int 136 130 } 137 - 138 - func (ps *PipelineStatus) PipelineAt() syntax.ATURI { 139 - return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", ps.PipelineKnot, tangled.PipelineNSID, ps.PipelineRkey)) 140 - }
-1
appview/notify/merged_notifier.go
··· 39 39 v.Call(in) 40 40 }(n) 41 41 } 42 - wg.Wait() 43 42 } 44 43 45 44 func (m *mergedNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
+14
appview/pages/markup/extension/atlink.go
··· 36 36 } 37 37 38 38 var atRegexp = regexp.MustCompile(`(^|\s|\()(@)([a-zA-Z0-9.-]+)(\b)`) 39 + var markdownLinkRegexp = regexp.MustCompile(`(?ms)\[.*\]\(.*\)`) 39 40 40 41 type atParser struct{} 41 42 ··· 55 56 if m == nil { 56 57 return nil 57 58 } 59 + 60 + if !util.IsSpaceRune(block.PrecendingCharacter()) { 61 + return nil 62 + } 63 + 64 + // Check for all links in the markdown to see if the handle found is inside one 65 + linksIndexes := markdownLinkRegexp.FindAllIndex(block.Source(), -1) 66 + for _, linkMatch := range linksIndexes { 67 + if linkMatch[0] < segment.Start && segment.Start < linkMatch[1] { 68 + return nil 69 + } 70 + } 71 + 58 72 atSegment := text.NewSegment(segment.Start, segment.Start+m[1]) 59 73 block.Advance(m[1]) 60 74 node := &AtNode{}
+1 -1
appview/pages/pages.go
··· 640 640 } 641 641 642 642 func (p *Pages) StarBtnFragment(w io.Writer, params StarBtnFragmentParams) error { 643 - return p.executePlain("fragments/starBtn", w, params) 643 + return p.executePlain("fragments/starBtn-oob", w, params) 644 644 } 645 645 646 646 type RepoIndexParams struct {
+5
appview/pages/templates/fragments/starBtn-oob.html
··· 1 + {{ define "fragments/starBtn-oob" }} 2 + <div hx-swap-oob='outerHTML:#starBtn[data-star-subject-at="{{ .SubjectAt }}"]'> 3 + {{ template "fragments/starBtn" . }} 4 + </div> 5 + {{ end }}
+1 -3
appview/pages/templates/fragments/starBtn.html
··· 1 1 {{ define "fragments/starBtn" }} 2 + {{/* NOTE: this fragment is always replaced with hx-swap-oob */}} 2 3 <button 3 4 id="starBtn" 4 5 class="btn disabled:opacity-50 disabled:cursor-not-allowed flex gap-2 items-center group" ··· 10 11 {{ end }} 11 12 12 13 hx-trigger="click" 13 - hx-target="this" 14 - hx-swap="outerHTML" 15 - hx-swap-oob='outerHTML:#starBtn[data-star-subject-at="{{ .SubjectAt }}"]' 16 14 hx-disabled-elt="#starBtn" 17 15 > 18 16 {{ if .IsStarred }}
+6 -6
appview/pages/templates/repo/fragments/backlinks.html
··· 14 14 <div class="flex gap-2 items-center"> 15 15 {{ if .State.IsClosed }} 16 16 <span class="text-gray-500 dark:text-gray-400"> 17 - {{ i "ban" "w-4 h-4" }} 17 + {{ i "ban" "size-3" }} 18 18 </span> 19 19 {{ else if eq .Kind.String "issues" }} 20 20 <span class="text-green-600 dark:text-green-500"> 21 - {{ i "circle-dot" "w-4 h-4" }} 21 + {{ i "circle-dot" "size-3" }} 22 22 </span> 23 23 {{ else if .State.IsOpen }} 24 24 <span class="text-green-600 dark:text-green-500"> 25 - {{ i "git-pull-request" "w-4 h-4" }} 25 + {{ i "git-pull-request" "size-3" }} 26 26 </span> 27 27 {{ else if .State.IsMerged }} 28 28 <span class="text-purple-600 dark:text-purple-500"> 29 - {{ i "git-merge" "w-4 h-4" }} 29 + {{ i "git-merge" "size-3" }} 30 30 </span> 31 31 {{ else }} 32 32 <span class="text-gray-600 dark:text-gray-300"> 33 - {{ i "git-pull-request-closed" "w-4 h-4" }} 33 + {{ i "git-pull-request-closed" "size-3" }} 34 34 </span> 35 35 {{ end }} 36 - <a href="{{ . }}"><span class="text-gray-500 dark:text-gray-400">#{{ .SubjectId }}</span> {{ .Title }}</a> 36 + <a href="{{ . }}" class="line-clamp-1 text-sm"><span class="text-gray-500 dark:text-gray-400">#{{ .SubjectId }}</span> {{ .Title }}</a> 37 37 </div> 38 38 {{ if not (eq $.RepoInfo.FullName $repoUrl) }} 39 39 <div>
-10
appview/pages/templates/repo/pipelines/workflow.html
··· 12 12 {{ block "sidebar" . }} {{ end }} 13 13 </div> 14 14 <div class="col-span-1 md:col-span-3"> 15 - <div class="flex justify-end mb-2"> 16 - <button 17 - class="btn" 18 - hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel" 19 - hx-swap="none" 20 - {{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}} 21 - disabled 22 - {{- end }} 23 - >Cancel</button> 24 - </div> 25 15 {{ block "logs" . }} {{ end }} 26 16 </div> 27 17 </section>
+1 -1
appview/pages/templates/strings/string.html
··· 17 17 <span class="select-none">/</span> 18 18 <a href="/strings/{{ $ownerId }}/{{ .String.Rkey }}" class="font-bold">{{ .String.Filename }}</a> 19 19 </div> 20 - <div class="flex gap-2 text-base"> 20 + <div class="flex gap-2 items-stretch text-base"> 21 21 {{ if and .LoggedInUser (eq .LoggedInUser.Did .String.Did) }} 22 22 <a class="btn flex items-center gap-2 no-underline hover:no-underline p-2 group" 23 23 hx-boost="true"
-82
appview/pipelines/pipelines.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 - "fmt" 8 7 "log/slog" 9 8 "net/http" 10 9 "strings" 11 10 "time" 12 11 13 - "tangled.org/core/api/tangled" 14 12 "tangled.org/core/appview/config" 15 13 "tangled.org/core/appview/db" 16 - "tangled.org/core/appview/models" 17 14 "tangled.org/core/appview/oauth" 18 15 "tangled.org/core/appview/pages" 19 16 "tangled.org/core/appview/reporesolver" ··· 44 41 r.Get("/", p.Index) 45 42 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 46 43 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 47 - r.Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 48 44 49 45 return r 50 46 } ··· 318 314 } 319 315 } 320 316 } 321 - } 322 - 323 - func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 324 - l := p.logger.With("handler", "Cancel") 325 - 326 - var ( 327 - pipelineId = chi.URLParam(r, "pipeline") 328 - workflow = chi.URLParam(r, "workflow") 329 - ) 330 - if pipelineId == "" || workflow == "" { 331 - http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 332 - return 333 - } 334 - 335 - f, err := p.repoResolver.Resolve(r) 336 - if err != nil { 337 - l.Error("failed to get repo and knot", "err", err) 338 - http.Error(w, "bad repo/knot", http.StatusBadRequest) 339 - return 340 - } 341 - 342 - pipeline, err := func() (models.Pipeline, error) { 343 - ps, err := db.GetPipelineStatuses( 344 - p.db, 345 - 1, 346 - orm.FilterEq("repo_owner", f.Did), 347 - orm.FilterEq("repo_name", f.Name), 348 - orm.FilterEq("knot", f.Knot), 349 - orm.FilterEq("id", pipelineId), 350 - ) 351 - if err != nil { 352 - return models.Pipeline{}, err 353 - } 354 - if len(ps) != 1 { 355 - return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 356 - } 357 - return ps[0], nil 358 - }() 359 - if err != nil { 360 - l.Error("pipeline query failed", "err", err) 361 - http.Error(w, "pipeline not found", http.StatusNotFound) 362 - } 363 - var ( 364 - spindle = f.Spindle 365 - knot = f.Knot 366 - rkey = pipeline.Rkey 367 - ) 368 - 369 - if spindle == "" || knot == "" || rkey == "" { 370 - http.Error(w, "invalid repo info", http.StatusBadRequest) 371 - return 372 - } 373 - 374 - spindleClient, err := p.oauth.ServiceClient( 375 - r, 376 - oauth.WithService(f.Spindle), 377 - oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 378 - oauth.WithExp(60), 379 - oauth.WithDev(false), 380 - oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 381 - ) 382 - 383 - err = tangled.PipelineCancelPipeline( 384 - r.Context(), 385 - spindleClient, 386 - &tangled.PipelineCancelPipeline_Input{ 387 - Repo: string(f.RepoAt()), 388 - Pipeline: pipeline.AtUri().String(), 389 - Workflow: workflow, 390 - }, 391 - ) 392 - errorId := "pipeline-action" 393 - if err != nil { 394 - l.Error("failed to cancel pipeline", "err", err) 395 - p.pages.Notice(w, errorId, "Failed to add secret.") 396 - return 397 - } 398 - l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 399 317 } 400 318 401 319 // either a message or an error
+8
appview/pulls/pulls.go
··· 1366 1366 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1367 1367 return 1368 1368 } 1369 + 1369 1370 } 1370 1371 1371 1372 if err = tx.Commit(); err != nil { 1372 1373 log.Println("failed to create pull request", err) 1373 1374 s.pages.Notice(w, "pull", "Failed to create pull request. Try again later.") 1374 1375 return 1376 + } 1377 + 1378 + // notify about each pull 1379 + // 1380 + // this is performed after tx.Commit, because it could result in a locked DB otherwise 1381 + for _, p := range stack { 1382 + s.notifier.NewPull(r.Context(), p) 1375 1383 } 1376 1384 1377 1385 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, repo)
-33
lexicons/pipeline/cancelPipeline.json
··· 1 - { 2 - "lexicon": 1, 3 - "id": "sh.tangled.pipeline.cancelPipeline", 4 - "defs": { 5 - "main": { 6 - "type": "procedure", 7 - "description": "Cancel a running pipeline", 8 - "input": { 9 - "encoding": "application/json", 10 - "schema": { 11 - "type": "object", 12 - "required": ["repo", "pipeline", "workflow"], 13 - "properties": { 14 - "repo": { 15 - "type": "string", 16 - "format": "at-uri", 17 - "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" 18 - }, 19 - "pipeline": { 20 - "type": "string", 21 - "format": "at-uri", 22 - "description": "pipeline at-uri" 23 - }, 24 - "workflow": { 25 - "type": "string", 26 - "description": "workflow name" 27 - } 28 - } 29 - } 30 - } 31 - } 32 - } 33 - }
+18 -6
spindle/db/events.go
··· 18 18 EventJson string `json:"event"` 19 19 } 20 20 21 - func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error { 21 + func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 22 22 _, err := d.Exec( 23 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 24 event.Rkey, ··· 70 70 return evts, nil 71 71 } 72 72 73 + func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(s) 75 + if err != nil { 76 + return err 77 + } 78 + 79 + event := Event{ 80 + Rkey: rkey, 81 + Nsid: tangled.PipelineStatusNSID, 82 + Created: time.Now().UnixNano(), 83 + EventJson: string(eventJson), 84 + } 85 + 86 + return d.InsertEvent(event, n) 87 + } 88 + 73 89 func (d *DB) createStatusEvent( 74 90 workflowId models.WorkflowId, 75 91 statusKind models.StatusKind, ··· 100 116 EventJson: string(eventJson), 101 117 } 102 118 103 - return d.insertEvent(event, n) 119 + return d.InsertEvent(event, n) 104 120 105 121 } 106 122 ··· 148 164 149 165 func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 150 166 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 - } 152 - 153 - func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 - return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155 167 } 156 168 157 169 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
+1
spindle/db/repos.go
··· 16 16 if err != nil { 17 17 return nil, err 18 18 } 19 + defer rows.Close() 19 20 20 21 var knots []string 21 22 for rows.Next() {
+10 -24
spindle/engines/nixery/engine.go
··· 179 179 return err 180 180 } 181 181 e.registerCleanup(wid, func(ctx context.Context) error { 182 - err := e.docker.NetworkRemove(ctx, networkName(wid)) 183 - if err != nil { 184 - return fmt.Errorf("removing network: %w", err) 185 - } 186 - return nil 182 + return e.docker.NetworkRemove(ctx, networkName(wid)) 187 183 }) 188 184 189 185 addl := wf.Data.(addlFields) ··· 233 229 return fmt.Errorf("creating container: %w", err) 234 230 } 235 231 e.registerCleanup(wid, func(ctx context.Context) error { 236 - err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 232 + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 237 233 if err != nil { 238 - return fmt.Errorf("stopping container: %w", err) 234 + return err 239 235 } 240 236 241 - err = e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 237 + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 242 238 RemoveVolumes: true, 243 239 RemoveLinks: false, 244 240 Force: false, 245 241 }) 246 - if err != nil { 247 - return fmt.Errorf("removing container: %w", err) 248 - } 249 - return nil 250 242 }) 251 243 252 244 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) ··· 402 394 } 403 395 404 396 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 405 - fns := e.drainCleanups(wid) 397 + e.cleanupMu.Lock() 398 + key := wid.String() 399 + 400 + fns := e.cleanup[key] 401 + delete(e.cleanup, key) 402 + e.cleanupMu.Unlock() 406 403 407 404 for _, fn := range fns { 408 405 if err := fn(ctx); err != nil { ··· 418 415 419 416 key := wid.String() 420 417 e.cleanup[key] = append(e.cleanup[key], fn) 421 - } 422 - 423 - func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 424 - e.cleanupMu.Lock() 425 - key := wid.String() 426 - 427 - fns := e.cleanup[key] 428 - delete(e.cleanup, key) 429 - e.cleanupMu.Unlock() 430 - 431 - return fns 432 418 } 433 419 434 420 func networkName(wid models.WorkflowId) string {
+1 -1
spindle/models/pipeline_env.go
··· 20 20 // Standard CI environment variable 21 21 env["CI"] = "true" 22 22 23 - env["TANGLED_PIPELINE_ID"] = pipelineId.AtUri().String() 23 + env["TANGLED_PIPELINE_ID"] = pipelineId.Rkey 24 24 25 25 // Repo info 26 26 if tr.Repo != nil {
+4 -5
spindle/server.go
··· 268 268 Config: s.cfg, 269 269 Resolver: s.res, 270 270 Vault: s.vault, 271 - Notifier: s.Notifier(), 272 271 ServiceAuth: serviceAuth, 273 272 } 274 273 ··· 303 302 tpl.TriggerMetadata.Repo.Repo, 304 303 ) 305 304 if err != nil { 306 - return fmt.Errorf("failed to get repo: %w", err) 305 + return err 307 306 } 308 307 309 308 pipelineId := models.PipelineId{ ··· 324 323 Name: w.Name, 325 324 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 326 325 if err != nil { 327 - return fmt.Errorf("db.StatusFailed: %w", err) 326 + return err 328 327 } 329 328 330 329 continue ··· 338 337 339 338 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 340 339 if err != nil { 341 - return fmt.Errorf("init workflow: %w", err) 340 + return err 342 341 } 343 342 344 343 // inject TANGLED_* env vars after InitWorkflow ··· 355 354 Name: w.Name, 356 355 }, s.n) 357 356 if err != nil { 358 - return fmt.Errorf("db.StatusPending: %w", err) 357 + return err 359 358 } 360 359 } 361 360 }
-97
spindle/xrpc/pipeline_cancelPipeline.go
··· 1 - package xrpc 2 - 3 - import ( 4 - "encoding/json" 5 - "fmt" 6 - "net/http" 7 - "strings" 8 - 9 - "github.com/bluesky-social/indigo/api/atproto" 10 - "github.com/bluesky-social/indigo/atproto/syntax" 11 - "github.com/bluesky-social/indigo/xrpc" 12 - securejoin "github.com/cyphar/filepath-securejoin" 13 - "tangled.org/core/api/tangled" 14 - "tangled.org/core/rbac" 15 - "tangled.org/core/spindle/models" 16 - xrpcerr "tangled.org/core/xrpc/errors" 17 - ) 18 - 19 - func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 20 - l := x.Logger 21 - fail := func(e xrpcerr.XrpcError) { 22 - l.Error("failed", "kind", e.Tag, "error", e.Message) 23 - writeError(w, e, http.StatusBadRequest) 24 - } 25 - l.Debug("cancel pipeline") 26 - 27 - actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 28 - if !ok { 29 - fail(xrpcerr.MissingActorDidError) 30 - return 31 - } 32 - 33 - var input tangled.PipelineCancelPipeline_Input 34 - if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 35 - fail(xrpcerr.GenericError(err)) 36 - return 37 - } 38 - 39 - aturi := syntax.ATURI(input.Pipeline) 40 - wid := models.WorkflowId{ 41 - PipelineId: models.PipelineId{ 42 - Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 43 - Rkey: aturi.RecordKey().String(), 44 - }, 45 - Name: input.Workflow, 46 - } 47 - l.Debug("cancel pipeline", "wid", wid) 48 - 49 - // unfortunately we have to resolve repo-at here 50 - repoAt, err := syntax.ParseATURI(input.Repo) 51 - if err != nil { 52 - fail(xrpcerr.InvalidRepoError(input.Repo)) 53 - return 54 - } 55 - 56 - ident, err := x.Resolver.ResolveIdent(r.Context(), repoAt.Authority().String()) 57 - if err != nil || ident.Handle.IsInvalidHandle() { 58 - fail(xrpcerr.GenericError(fmt.Errorf("failed to resolve handle: %w", err))) 59 - return 60 - } 61 - 62 - xrpcc := xrpc.Client{Host: ident.PDSEndpoint()} 63 - resp, err := atproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 64 - if err != nil { 65 - fail(xrpcerr.GenericError(err)) 66 - return 67 - } 68 - 69 - repo := resp.Value.Val.(*tangled.Repo) 70 - didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 71 - if err != nil { 72 - fail(xrpcerr.GenericError(err)) 73 - return 74 - } 75 - 76 - // TODO: fine-grained role based control 77 - isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid.String(), rbac.ThisServer, didSlashRepo) 78 - if err != nil || !isRepoOwner { 79 - fail(xrpcerr.AccessControlError(actorDid.String())) 80 - return 81 - } 82 - for _, engine := range x.Engines { 83 - l.Debug("destorying workflow", "wid", wid) 84 - err = engine.DestroyWorkflow(r.Context(), wid) 85 - if err != nil { 86 - fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) 87 - return 88 - } 89 - err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 90 - if err != nil { 91 - fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) 92 - return 93 - } 94 - } 95 - 96 - w.WriteHeader(http.StatusOK) 97 - }
-3
spindle/xrpc/xrpc.go
··· 10 10 11 11 "tangled.org/core/api/tangled" 12 12 "tangled.org/core/idresolver" 13 - "tangled.org/core/notifier" 14 13 "tangled.org/core/rbac" 15 14 "tangled.org/core/spindle/config" 16 15 "tangled.org/core/spindle/db" ··· 30 29 Config *config.Config 31 30 Resolver *idresolver.Resolver 32 31 Vault secrets.Manager 33 - Notifier *notifier.Notifier 34 32 ServiceAuth *serviceauth.ServiceAuth 35 33 } 36 34 ··· 43 41 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 44 42 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 45 43 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 46 - r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) 47 44 }) 48 45 49 46 // service query endpoints (no auth required)