forked from tangled.org/core
Monorepo for Tangled

Compare changes

Choose any two refs to compare.

-34
api/tangled/pipelinecancelPipeline.go
··· 1 - // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 - 3 - package tangled 4 - 5 - // schema: sh.tangled.pipeline.cancelPipeline 6 - 7 - import ( 8 - "context" 9 - 10 - "github.com/bluesky-social/indigo/lex/util" 11 - ) 12 - 13 - const ( 14 - PipelineCancelPipelineNSID = "sh.tangled.pipeline.cancelPipeline" 15 - ) 16 - 17 - // PipelineCancelPipeline_Input is the input argument to a sh.tangled.pipeline.cancelPipeline call. 18 - type PipelineCancelPipeline_Input struct { 19 - // pipeline: pipeline at-uri 20 - Pipeline string `json:"pipeline" cborgen:"pipeline"` 21 - // repo: repo at-uri, spindle can't resolve repo from pipeline at-uri yet 22 - Repo string `json:"repo" cborgen:"repo"` 23 - // workflow: workflow name 24 - Workflow string `json:"workflow" cborgen:"workflow"` 25 - } 26 - 27 - // PipelineCancelPipeline calls the XRPC method "sh.tangled.pipeline.cancelPipeline". 28 - func PipelineCancelPipeline(ctx context.Context, c util.LexClient, input *PipelineCancelPipeline_Input) error { 29 - if err := c.LexDo(ctx, util.Procedure, "application/json", "sh.tangled.pipeline.cancelPipeline", nil, input, nil); err != nil { 30 - return err 31 - } 32 - 33 - return nil 34 - }
···
+6 -6
appview/db/pipeline.go
··· 6 "strings" 7 "time" 8 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/orm" 12 ) ··· 217 } 218 defer rows.Close() 219 220 - pipelines := make(map[syntax.ATURI]models.Pipeline) 221 for rows.Next() { 222 var p models.Pipeline 223 var t models.Trigger ··· 254 p.Trigger = &t 255 p.Statuses = make(map[string]models.WorkflowStatus) 256 257 - pipelines[p.AtUri()] = p 258 } 259 260 // get all statuses ··· 314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 315 } 316 317 - pipelineAt := ps.PipelineAt() 318 319 // extract 320 - pipeline, ok := pipelines[pipelineAt] 321 if !ok { 322 continue 323 } ··· 331 332 // reassign 333 pipeline.Statuses[ps.Workflow] = statuses 334 - pipelines[pipelineAt] = pipeline 335 } 336 337 var all []models.Pipeline
··· 6 "strings" 7 "time" 8 9 "tangled.org/core/appview/models" 10 "tangled.org/core/orm" 11 ) ··· 216 } 217 defer rows.Close() 218 219 + pipelines := make(map[string]models.Pipeline) 220 for rows.Next() { 221 var p models.Pipeline 222 var t models.Trigger ··· 253 p.Trigger = &t 254 p.Statuses = make(map[string]models.WorkflowStatus) 255 256 + k := fmt.Sprintf("%s/%s", p.Knot, p.Rkey) 257 + pipelines[k] = p 258 } 259 260 // get all statuses ··· 314 return nil, fmt.Errorf("invalid status created timestamp %q: %w", created, err) 315 } 316 317 + key := fmt.Sprintf("%s/%s", ps.PipelineKnot, ps.PipelineRkey) 318 319 // extract 320 + pipeline, ok := pipelines[key] 321 if !ok { 322 continue 323 } ··· 331 332 // reassign 333 pipeline.Statuses[ps.Workflow] = statuses 334 + pipelines[key] = pipeline 335 } 336 337 var all []models.Pipeline
-10
appview/models/pipeline.go
··· 1 package models 2 3 import ( 4 - "fmt" 5 "slices" 6 "time" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/go-git/go-git/v5/plumbing" 10 - "tangled.org/core/api/tangled" 11 spindle "tangled.org/core/spindle/models" 12 "tangled.org/core/workflow" 13 ) ··· 25 // populate when querying for reverse mappings 26 Trigger *Trigger 27 Statuses map[string]WorkflowStatus 28 - } 29 - 30 - func (p *Pipeline) AtUri() syntax.ATURI { 31 - return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", p.Knot, tangled.PipelineNSID, p.Rkey)) 32 } 33 34 type WorkflowStatus struct { ··· 134 Error *string 135 ExitCode int 136 } 137 - 138 - func (ps *PipelineStatus) PipelineAt() syntax.ATURI { 139 - return syntax.ATURI(fmt.Sprintf("at://did:web:%s/%s/%s", ps.PipelineKnot, tangled.PipelineNSID, ps.PipelineRkey)) 140 - }
··· 1 package models 2 3 import ( 4 "slices" 5 "time" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "github.com/go-git/go-git/v5/plumbing" 9 spindle "tangled.org/core/spindle/models" 10 "tangled.org/core/workflow" 11 ) ··· 23 // populate when querying for reverse mappings 24 Trigger *Trigger 25 Statuses map[string]WorkflowStatus 26 } 27 28 type WorkflowStatus struct { ··· 128 Error *string 129 ExitCode int 130 }
+63 -22
appview/pages/templates/layouts/fragments/topbar.html
··· 47 {{ end }} 48 49 {{ define "profileDropdown" }} 50 - <details class="relative inline-block text-left nav-dropdown"> 51 - <summary class="cursor-pointer list-none flex items-center gap-1"> 52 - {{ $user := .Did }} 53 - <img 54 - src="{{ tinyAvatar $user }}" 55 - alt="" 56 - class="rounded-full h-6 w-6 border border-gray-300 dark:border-gray-700" 57 - /> 58 - <span class="hidden md:inline">{{ $user | resolve | truncateAt30 }}</span> 59 - </summary> 60 - <div class="absolute flex flex-col right-0 mt-4 p-4 rounded w-48 bg-white dark:bg-gray-800 dark:text-white border border-gray-200 dark:border-gray-700"> 61 - <a href="/{{ $user }}">profile</a> 62 - <a href="/{{ $user }}?tab=repos">repositories</a> 63 - <a href="/{{ $user }}?tab=strings">strings</a> 64 - <a href="/settings">settings</a> 65 <a href="#" 66 - hx-post="/logout" 67 - hx-swap="none" 68 - class="text-red-400 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300"> 69 - logout 70 - </a> 71 - </div> 72 - </details> 73 74 <script> 75 document.addEventListener('click', function(event) { ··· 80 } 81 }); 82 }); 83 </script> 84 {{ end }}
··· 47 {{ end }} 48 49 {{ define "profileDropdown" }} 50 + {{ $user := .Did }} 51 + <button type="button" popovertarget="navigation-popover" class="site-navigation-dropdown-trigger" aria-label="Open site navigation dropdown"> 52 + <img 53 + src="{{ tinyAvatar $user }}" 54 + alt="" 55 + class="rounded-full h-6 w-6 border border-gray-300 dark:border-gray-700" 56 + /> 57 + </button> 58 + <div popover="auto" id="navigation-popover" class="site-navigation-popover shadow-md border border-gray-200 rounded p-2 bg-white dark:bg-gray-800 dark:text-white dark:border-gray-700"> 59 + <div class="flex gap-2 py-2"> 60 + <img 61 + src="{{ tinyAvatar $user }}" 62 + alt="" 63 + class="rounded-full h-6 w-6 border border-gray-300 dark:border-gray-700" 64 + /> 65 + <p>{{ $user | resolve | truncateAt30 }}</p> 66 + </div> 67 + <hr class="h-1 w-full mb-1 mt-2 dark:border-gray-700" /> 68 + <ul id="navigation-menu-popover"> 69 + <li> 70 + <a href="/{{ $user }}"> 71 + {{ i "user" "w-4 h-4" }} 72 + <span>profile</span> 73 + </a> 74 + </li> 75 + <li> 76 + <a href="/{{ $user }}?tab=repos"> 77 + {{ i "book-marked" "w-4 h-4" }} 78 + <span>repositories</span> 79 + </a> 80 + </li> 81 + <li> 82 + <a href="/{{ $user }}?tab=strings"> 83 + {{ i "spool" "w-4 h-4" }} 84 + <span>strings</span> 85 + </a> 86 + </li> 87 + <li> 88 + <a href="/settings"> 89 + {{ i "settings" "w-4 h-4" }} 90 + <span>settings</span> 91 + </a> 92 + </li> 93 + <hr class="h-1 w-full mb-1 mt-2 dark:border-gray-700" /> 94 + <li> 95 <a href="#" 96 + hx-post="/logout" 97 + hx-swap="none" 98 + class="text-red-400 flex gap-2 items-center hover:bg-red-50 hover:text-red-700 px-2 py-2 rounded-sm dark:hover:bg-red-700 dark:hover:text-red-50"> 99 + {{ i "arrow-right-from-line" "w-4 h-4" }} 100 + <span>logout</span> 101 + </a> 102 + </li> 103 + </ul> 104 + </div> 105 106 <script> 107 document.addEventListener('click', function(event) { ··· 112 } 113 }); 114 }); 115 + 116 + const navigationPopoverLinks = document.querySelectorAll("#navigation-menu-popover li a"); 117 + const currentPageURL = window.location.href 118 + navigationPopoverLinks.forEach(link => { 119 + const navigationPopoverLinkURL = link.href 120 + if (navigationPopoverLinkURL === currentPageURL) { 121 + link.ariaCurrent = "page" 122 + } 123 + }) 124 </script> 125 {{ end }}
-10
appview/pages/templates/repo/pipelines/workflow.html
··· 12 {{ block "sidebar" . }} {{ end }} 13 </div> 14 <div class="col-span-1 md:col-span-3"> 15 - <div class="flex justify-end mb-2"> 16 - <button 17 - class="btn" 18 - hx-post="/{{ $.RepoInfo.FullName }}/pipelines/{{ .Pipeline.Id }}/workflow/{{ .Workflow }}/cancel" 19 - hx-swap="none" 20 - {{ if (index .Pipeline.Statuses .Workflow).Latest.Status.IsFinish -}} 21 - disabled 22 - {{- end }} 23 - >Cancel</button> 24 - </div> 25 {{ block "logs" . }} {{ end }} 26 </div> 27 </section>
··· 12 {{ block "sidebar" . }} {{ end }} 13 </div> 14 <div class="col-span-1 md:col-span-3"> 15 {{ block "logs" . }} {{ end }} 16 </div> 17 </section>
-82
appview/pipelines/pipelines.go
··· 4 "bytes" 5 "context" 6 "encoding/json" 7 - "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 - "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 - "tangled.org/core/appview/models" 17 "tangled.org/core/appview/oauth" 18 "tangled.org/core/appview/pages" 19 "tangled.org/core/appview/reporesolver" ··· 44 r.Get("/", p.Index) 45 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 46 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 47 - r.Post("/{pipeline}/workflow/{workflow}/cancel", p.Cancel) 48 49 return r 50 } ··· 318 } 319 } 320 } 321 - } 322 - 323 - func (p *Pipelines) Cancel(w http.ResponseWriter, r *http.Request) { 324 - l := p.logger.With("handler", "Cancel") 325 - 326 - var ( 327 - pipelineId = chi.URLParam(r, "pipeline") 328 - workflow = chi.URLParam(r, "workflow") 329 - ) 330 - if pipelineId == "" || workflow == "" { 331 - http.Error(w, "missing pipeline ID or workflow", http.StatusBadRequest) 332 - return 333 - } 334 - 335 - f, err := p.repoResolver.Resolve(r) 336 - if err != nil { 337 - l.Error("failed to get repo and knot", "err", err) 338 - http.Error(w, "bad repo/knot", http.StatusBadRequest) 339 - return 340 - } 341 - 342 - pipeline, err := func() (models.Pipeline, error) { 343 - ps, err := db.GetPipelineStatuses( 344 - p.db, 345 - 1, 346 - orm.FilterEq("repo_owner", f.Did), 347 - orm.FilterEq("repo_name", f.Name), 348 - orm.FilterEq("knot", f.Knot), 349 - orm.FilterEq("id", pipelineId), 350 - ) 351 - if err != nil { 352 - return models.Pipeline{}, err 353 - } 354 - if len(ps) != 1 { 355 - return models.Pipeline{}, fmt.Errorf("wrong pipeline count %d", len(ps)) 356 - } 357 - return ps[0], nil 358 - }() 359 - if err != nil { 360 - l.Error("pipeline query failed", "err", err) 361 - http.Error(w, "pipeline not found", http.StatusNotFound) 362 - } 363 - var ( 364 - spindle = f.Spindle 365 - knot = f.Knot 366 - rkey = pipeline.Rkey 367 - ) 368 - 369 - if spindle == "" || knot == "" || rkey == "" { 370 - http.Error(w, "invalid repo info", http.StatusBadRequest) 371 - return 372 - } 373 - 374 - spindleClient, err := p.oauth.ServiceClient( 375 - r, 376 - oauth.WithService(f.Spindle), 377 - oauth.WithLxm(tangled.PipelineCancelPipelineNSID), 378 - oauth.WithExp(60), 379 - oauth.WithDev(p.config.Core.Dev), 380 - oauth.WithTimeout(time.Second*30), // workflow cleanup usually takes time 381 - ) 382 - 383 - err = tangled.PipelineCancelPipeline( 384 - r.Context(), 385 - spindleClient, 386 - &tangled.PipelineCancelPipeline_Input{ 387 - Repo: string(f.RepoAt()), 388 - Pipeline: pipeline.AtUri().String(), 389 - Workflow: workflow, 390 - }, 391 - ) 392 - errorId := "pipeline-action" 393 - if err != nil { 394 - l.Error("failed to cancel pipeline", "err", err) 395 - p.pages.Notice(w, errorId, "Failed to add secret.") 396 - return 397 - } 398 - l.Debug("canceled pipeline", "uri", pipeline.AtUri()) 399 } 400 401 // either a message or an error
··· 4 "bytes" 5 "context" 6 "encoding/json" 7 "log/slog" 8 "net/http" 9 "strings" 10 "time" 11 12 "tangled.org/core/appview/config" 13 "tangled.org/core/appview/db" 14 "tangled.org/core/appview/oauth" 15 "tangled.org/core/appview/pages" 16 "tangled.org/core/appview/reporesolver" ··· 41 r.Get("/", p.Index) 42 r.Get("/{pipeline}/workflow/{workflow}", p.Workflow) 43 r.Get("/{pipeline}/workflow/{workflow}/logs", p.Logs) 44 45 return r 46 } ··· 314 } 315 } 316 } 317 } 318 319 // either a message or an error
+2 -31
flake.nix
··· 91 spindle = self.callPackage ./nix/pkgs/spindle.nix {}; 92 knot-unwrapped = self.callPackage ./nix/pkgs/knot-unwrapped.nix {}; 93 knot = self.callPackage ./nix/pkgs/knot.nix {}; 94 - did-method-plc = self.callPackage ./nix/pkgs/did-method-plc.nix {}; 95 - bluesky-jetstream = self.callPackage ./nix/pkgs/bluesky-jetstream.nix {}; 96 - bluesky-relay = self.callPackage ./nix/pkgs/bluesky-relay.nix {}; 97 - tap = self.callPackage ./nix/pkgs/tap.nix {}; 98 }); 99 in { 100 overlays.default = final: prev: { 101 - inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview did-method-plc bluesky-jetstream bluesky-relay tap; 102 }; 103 104 packages = forAllSystems (system: let ··· 107 staticPackages = mkPackageSet pkgs.pkgsStatic; 108 crossPackages = mkPackageSet pkgs.pkgsCross.gnu64.pkgsStatic; 109 in { 110 - inherit (packages) appview appview-static-files lexgen goat spindle knot knot-unwrapped sqlite-lib did-method-plc bluesky-jetstream bluesky-relay tap; 111 112 pkgsStatic-appview = staticPackages.appview; 113 pkgsStatic-knot = staticPackages.knot; ··· 306 imports = [./nix/modules/spindle.nix]; 307 308 services.tangled.spindle.package = lib.mkDefault self.packages.${pkgs.stdenv.hostPlatform.system}.spindle; 309 - services.tangled.spindle.tap-package = lib.mkDefault self.packages.${pkgs.system}.tap; 310 - }; 311 - nixosModules.did-method-plc = { 312 - lib, 313 - pkgs, 314 - ... 315 - }: { 316 - imports = [./nix/modules/did-method-plc.nix]; 317 - services.did-method-plc.package = lib.mkDefault self.packages.${pkgs.system}.did-method-plc; 318 - }; 319 - nixosModules.bluesky-relay = { 320 - lib, 321 - pkgs, 322 - ... 323 - }: { 324 - imports = [./nix/modules/bluesky-relay.nix]; 325 - services.bluesky-relay.package = lib.mkDefault self.packages.${pkgs.system}.bluesky-relay; 326 - }; 327 - nixosModules.bluesky-jetstream = { 328 - lib, 329 - pkgs, 330 - ... 331 - }: { 332 - imports = [./nix/modules/bluesky-jetstream.nix]; 333 - services.bluesky-jetstream.package = lib.mkDefault self.packages.${pkgs.system}.bluesky-jetstream; 334 }; 335 }; 336 }
··· 91 spindle = self.callPackage ./nix/pkgs/spindle.nix {}; 92 knot-unwrapped = self.callPackage ./nix/pkgs/knot-unwrapped.nix {}; 93 knot = self.callPackage ./nix/pkgs/knot.nix {}; 94 }); 95 in { 96 overlays.default = final: prev: { 97 + inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview; 98 }; 99 100 packages = forAllSystems (system: let ··· 103 staticPackages = mkPackageSet pkgs.pkgsStatic; 104 crossPackages = mkPackageSet pkgs.pkgsCross.gnu64.pkgsStatic; 105 in { 106 + inherit (packages) appview appview-static-files lexgen goat spindle knot knot-unwrapped sqlite-lib; 107 108 pkgsStatic-appview = staticPackages.appview; 109 pkgsStatic-knot = staticPackages.knot; ··· 302 imports = [./nix/modules/spindle.nix]; 303 304 services.tangled.spindle.package = lib.mkDefault self.packages.${pkgs.stdenv.hostPlatform.system}.spindle; 305 }; 306 }; 307 }
-1
go.mod
··· 131 github.com/hashicorp/go-secure-stdlib/parseutil v0.2.0 // indirect 132 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect 133 github.com/hashicorp/go-sockaddr v1.0.7 // indirect 134 - github.com/hashicorp/go-version v1.8.0 // indirect 135 github.com/hashicorp/golang-lru v1.0.2 // indirect 136 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 137 github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
··· 131 github.com/hashicorp/go-secure-stdlib/parseutil v0.2.0 // indirect 132 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect 133 github.com/hashicorp/go-sockaddr v1.0.7 // indirect 134 github.com/hashicorp/golang-lru v1.0.2 // indirect 135 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 136 github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
-2
go.sum
··· 264 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= 265 github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= 266 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= 267 - github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= 268 - github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= 269 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 270 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 271 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
··· 264 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= 265 github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= 266 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= 267 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 268 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 269 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+19
input.css
··· 89 @apply no-underline text-black hover:underline hover:text-gray-800 dark:text-white dark:hover:text-gray-300; 90 } 91 92 label { 93 @apply block text-gray-900 text-sm font-bold py-2 uppercase dark:text-gray-100; 94 } ··· 962 color: #f9fafb; 963 } 964 }
··· 89 @apply no-underline text-black hover:underline hover:text-gray-800 dark:text-white dark:hover:text-gray-300; 90 } 91 92 + #navigation-menu-popover li:not(:last-of-type) a { 93 + @apply flex gap-2 items-center px-2 pb-2 pt-1.5 rounded-sm hover:bg-green-50 hover:text-green-700 no-underline dark:hover:text-green-50 dark:hover:bg-green-700; 94 + } 95 + 96 + a[hx-post="/logout"] { 97 + @apply no-underline; 98 + } 99 + 100 label { 101 @apply block text-gray-900 text-sm font-bold py-2 uppercase dark:text-gray-100; 102 } ··· 970 color: #f9fafb; 971 } 972 } 973 + 974 + .site-navigation-dropdown-trigger { 975 + anchor-name: --dropdown-trigger; 976 + } 977 + 978 + .site-navigation-popover { 979 + margin: 0; 980 + inset: auto; 981 + position-anchor: --dropdown-trigger; 982 + position-area: bottom left; 983 + }
-33
lexicons/pipeline/cancelPipeline.json
··· 1 - { 2 - "lexicon": 1, 3 - "id": "sh.tangled.pipeline.cancelPipeline", 4 - "defs": { 5 - "main": { 6 - "type": "procedure", 7 - "description": "Cancel a running pipeline", 8 - "input": { 9 - "encoding": "application/json", 10 - "schema": { 11 - "type": "object", 12 - "required": ["repo", "pipeline", "workflow"], 13 - "properties": { 14 - "repo": { 15 - "type": "string", 16 - "format": "at-uri", 17 - "description": "repo at-uri, spindle can't resolve repo from pipeline at-uri yet" 18 - }, 19 - "pipeline": { 20 - "type": "string", 21 - "format": "at-uri", 22 - "description": "pipeline at-uri" 23 - }, 24 - "workflow": { 25 - "type": "string", 26 - "description": "workflow name" 27 - } 28 - } 29 - } 30 - } 31 - } 32 - } 33 - }
···
-3
nix/gomod2nix.toml
··· 304 [mod."github.com/hashicorp/go-sockaddr"] 305 version = "v1.0.7" 306 hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs=" 307 - [mod."github.com/hashicorp/go-version"] 308 - version = "v1.8.0" 309 - hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8=" 310 [mod."github.com/hashicorp/golang-lru"] 311 version = "v1.0.2" 312 hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
··· 304 [mod."github.com/hashicorp/go-sockaddr"] 305 version = "v1.0.7" 306 hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs=" 307 [mod."github.com/hashicorp/golang-lru"] 308 version = "v1.0.2" 309 hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
-64
nix/modules/bluesky-jetstream.nix
··· 1 - { 2 - config, 3 - pkgs, 4 - lib, 5 - ... 6 - }: let 7 - cfg = config.services.bluesky-jetstream; 8 - in 9 - with lib; { 10 - options.services.bluesky-jetstream = { 11 - enable = mkEnableOption "jetstream server"; 12 - package = mkPackageOption pkgs "bluesky-jetstream" {}; 13 - 14 - # dataDir = mkOption { 15 - # type = types.str; 16 - # default = "/var/lib/jetstream"; 17 - # description = "directory to store data (pebbleDB)"; 18 - # }; 19 - livenessTtl = mkOption { 20 - type = types.int; 21 - default = 15; 22 - description = "time to restart when no event detected (seconds)"; 23 - }; 24 - websocketUrl = mkOption { 25 - type = types.str; 26 - default = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"; 27 - description = "full websocket path to the ATProto SubscribeRepos XRPC endpoint"; 28 - }; 29 - }; 30 - config = mkIf cfg.enable { 31 - systemd.services.bluesky-jetstream = { 32 - description = "bluesky jetstream"; 33 - after = ["network.target" "pds.service"]; 34 - wantedBy = ["multi-user.target"]; 35 - 36 - serviceConfig = { 37 - User = "jetstream"; 38 - Group = "jetstream"; 39 - StateDirectory = "jetstream"; 40 - StateDirectoryMode = "0755"; 41 - # preStart = '' 42 - # mkdir -p "${cfg.dataDir}" 43 - # chown -R jetstream:jetstream "${cfg.dataDir}" 44 - # ''; 45 - # WorkingDirectory = cfg.dataDir; 46 - Environment = [ 47 - "JETSTREAM_DATA_DIR=/var/lib/jetstream/data" 48 - "JETSTREAM_LIVENESS_TTL=${toString cfg.livenessTtl}s" 49 - "JETSTREAM_WS_URL=${cfg.websocketUrl}" 50 - ]; 51 - ExecStart = getExe cfg.package; 52 - Restart = "always"; 53 - RestartSec = 5; 54 - }; 55 - }; 56 - users = { 57 - users.jetstream = { 58 - group = "jetstream"; 59 - isSystemUser = true; 60 - }; 61 - groups.jetstream = {}; 62 - }; 63 - }; 64 - }
···
-48
nix/modules/bluesky-relay.nix
··· 1 - { 2 - config, 3 - pkgs, 4 - lib, 5 - ... 6 - }: let 7 - cfg = config.services.bluesky-relay; 8 - in 9 - with lib; { 10 - options.services.bluesky-relay = { 11 - enable = mkEnableOption "relay server"; 12 - package = mkPackageOption pkgs "bluesky-relay" {}; 13 - }; 14 - config = mkIf cfg.enable { 15 - systemd.services.bluesky-relay = { 16 - description = "bluesky relay"; 17 - after = ["network.target" "pds.service"]; 18 - wantedBy = ["multi-user.target"]; 19 - 20 - serviceConfig = { 21 - User = "relay"; 22 - Group = "relay"; 23 - StateDirectory = "relay"; 24 - StateDirectoryMode = "0755"; 25 - Environment = [ 26 - "RELAY_ADMIN_PASSWORD=password" 27 - "RELAY_PLC_HOST=https://plc.tngl.boltless.dev" 28 - "DATABASE_URL=sqlite:///var/lib/relay/relay.sqlite" 29 - "RELAY_IP_BIND=:2470" 30 - "RELAY_PERSIST_DIR=/var/lib/relay" 31 - "RELAY_DISABLE_REQUEST_CRAWL=0" 32 - "RELAY_INITIAL_SEQ_NUMBER=1" 33 - "RELAY_ALLOW_INSECURE_HOSTS=1" 34 - ]; 35 - ExecStart = "${getExe cfg.package} serve"; 36 - Restart = "always"; 37 - RestartSec = 5; 38 - }; 39 - }; 40 - users = { 41 - users.relay = { 42 - group = "relay"; 43 - isSystemUser = true; 44 - }; 45 - groups.relay = {}; 46 - }; 47 - }; 48 - }
···
-76
nix/modules/did-method-plc.nix
··· 1 - { 2 - config, 3 - pkgs, 4 - lib, 5 - ... 6 - }: let 7 - cfg = config.services.did-method-plc; 8 - in 9 - with lib; { 10 - options.services.did-method-plc = { 11 - enable = mkEnableOption "did-method-plc server"; 12 - package = mkPackageOption pkgs "did-method-plc" {}; 13 - }; 14 - config = mkIf cfg.enable { 15 - services.postgresql = { 16 - enable = true; 17 - package = pkgs.postgresql_14; 18 - ensureDatabases = ["plc"]; 19 - ensureUsers = [ 20 - { 21 - name = "pg"; 22 - # ensurePermissions."DATABASE plc" = "ALL PRIVILEGES"; 23 - } 24 - ]; 25 - authentication = '' 26 - local all all trust 27 - host all all 127.0.0.1/32 trust 28 - ''; 29 - }; 30 - systemd.services.did-method-plc = { 31 - description = "did-method-plc"; 32 - 33 - after = ["postgresql.service"]; 34 - wants = ["postgresql.service"]; 35 - wantedBy = ["multi-user.target"]; 36 - 37 - environment = let 38 - db_creds_json = builtins.toJSON { 39 - username = "pg"; 40 - password = ""; 41 - host = "127.0.0.1"; 42 - port = 5432; 43 - }; 44 - in { 45 - # TODO: inherit from config 46 - DEBUG_MODE = "1"; 47 - LOG_ENABLED = "true"; 48 - LOG_LEVEL = "debug"; 49 - LOG_DESTINATION = "1"; 50 - ENABLE_MIGRATIONS = "true"; 51 - DB_CREDS_JSON = db_creds_json; 52 - DB_MIGRATE_CREDS_JSON = db_creds_json; 53 - PLC_VERSION = "0.0.1"; 54 - PORT = "8080"; 55 - }; 56 - 57 - serviceConfig = { 58 - ExecStart = getExe cfg.package; 59 - User = "plc"; 60 - Group = "plc"; 61 - StateDirectory = "plc"; 62 - StateDirectoryMode = "0755"; 63 - Restart = "always"; 64 - 65 - # Hardening 66 - }; 67 - }; 68 - users = { 69 - users.plc = { 70 - group = "plc"; 71 - isSystemUser = true; 72 - }; 73 - groups.plc = {}; 74 - }; 75 - }; 76 - }
···
-35
nix/modules/spindle.nix
··· 1 { 2 config, 3 - pkgs, 4 lib, 5 ... 6 }: let ··· 17 package = mkOption { 18 type = types.package; 19 description = "Package to use for the spindle"; 20 - }; 21 - tap-package = mkOption { 22 - type = types.package; 23 - description = "Package to use for the spindle"; 24 - }; 25 - 26 - atpRelayUrl = mkOption { 27 - type = types.str; 28 - default = "https://relay1.us-east.bsky.network"; 29 - description = "atproto relay"; 30 }; 31 32 server = { ··· 125 config = mkIf cfg.enable { 126 virtualisation.docker.enable = true; 127 128 - systemd.services.spindle-tap = { 129 - description = "spindle tap service"; 130 - after = ["network.target" "docker.service"]; 131 - wantedBy = ["multi-user.target"]; 132 - serviceConfig = { 133 - LogsDirectory = "spindle-tap"; 134 - StateDirectory = "spindle-tap"; 135 - Environment = [ 136 - "TAP_BIND=:2480" 137 - "TAP_PLC_URL=${cfg.server.plcUrl}" 138 - "TAP_RELAY_URL=${cfg.atpRelayUrl}" 139 - "TAP_COLLECTION_FILTERS=${concatStringsSep "," [ 140 - "sh.tangled.repo" 141 - "sh.tangled.repo.collaborator" 142 - "sh.tangled.spindle.member" 143 - ]}" 144 - ]; 145 - ExecStart = "${getExe cfg.tap-package} run"; 146 - }; 147 - }; 148 - 149 systemd.services.spindle = { 150 description = "spindle service"; 151 after = ["network.target" "docker.service"]; 152 wantedBy = ["multi-user.target"]; 153 - path = [ 154 - pkgs.git 155 - ]; 156 serviceConfig = { 157 LogsDirectory = "spindle"; 158 StateDirectory = "spindle";
··· 1 { 2 config, 3 lib, 4 ... 5 }: let ··· 16 package = mkOption { 17 type = types.package; 18 description = "Package to use for the spindle"; 19 }; 20 21 server = { ··· 114 config = mkIf cfg.enable { 115 virtualisation.docker.enable = true; 116 117 systemd.services.spindle = { 118 description = "spindle service"; 119 after = ["network.target" "docker.service"]; 120 wantedBy = ["multi-user.target"]; 121 serviceConfig = { 122 LogsDirectory = "spindle"; 123 StateDirectory = "spindle";
-20
nix/pkgs/bluesky-jetstream.nix
··· 1 - { 2 - buildGoModule, 3 - fetchFromGitHub, 4 - }: 5 - buildGoModule { 6 - pname = "bluesky-jetstream"; 7 - version = "0.1.0"; 8 - src = fetchFromGitHub { 9 - owner = "bluesky-social"; 10 - repo = "jetstream"; 11 - rev = "7d7efa58d7f14101a80ccc4f1085953948b7d5de"; 12 - sha256 = "sha256-1e9SL/8gaDPMA4YZed51ffzgpkptbMd0VTbTTDbPTFw="; 13 - }; 14 - subPackages = ["cmd/jetstream"]; 15 - vendorHash = "sha256-/21XJQH6fo9uPzlABUAbdBwt1O90odmppH6gXu2wkiQ="; 16 - doCheck = false; 17 - meta = { 18 - mainProgram = "jetstream"; 19 - }; 20 - }
···
-20
nix/pkgs/bluesky-relay.nix
··· 1 - { 2 - buildGoModule, 3 - fetchFromGitHub, 4 - }: 5 - buildGoModule { 6 - pname = "bluesky-relay"; 7 - version = "0.1.0"; 8 - src = fetchFromGitHub { 9 - owner = "boltlessengineer"; 10 - repo = "indigo"; 11 - rev = "b769ea60b7dde5e2bd0b8ee3ce8462a0c0e596fe"; 12 - sha256 = "sha256-jHRY825TBYaH1WkKFUoNbo4UlMSyuHvCGjYPiBnKo44="; 13 - }; 14 - subPackages = ["cmd/relay"]; 15 - vendorHash = "sha256-UOedwNYnM8Jx6B7Y9tFcZX8IeUBESAFAPTRYk7n0yo8="; 16 - doCheck = false; 17 - meta = { 18 - mainProgram = "relay"; 19 - }; 20 - }
···
-65
nix/pkgs/did-method-plc.nix
··· 1 - # inspired by https://github.com/NixOS/nixpkgs/blob/333bfb7c258fab089a834555ea1c435674c459b4/pkgs/by-name/ga/gatsby-cli/package.nix 2 - { 3 - lib, 4 - stdenv, 5 - fetchFromGitHub, 6 - fetchYarnDeps, 7 - yarnConfigHook, 8 - yarnBuildHook, 9 - nodejs, 10 - makeBinaryWrapper, 11 - }: 12 - stdenv.mkDerivation (finalAttrs: { 13 - pname = "did-method-plc"; 14 - version = "0.0.1"; 15 - 16 - src = fetchFromGitHub { 17 - owner = "did-method-plc"; 18 - repo = "did-method-plc"; 19 - rev = "158ba5535ac3da4fd4309954bde41deab0b45972"; 20 - sha256 = "sha256-O5smubbrnTDMCvL6iRyMXkddr5G7YHxkQRVMRULHanQ="; 21 - }; 22 - postPatch = '' 23 - # remove dd-trace dependency 24 - sed -i '3d' packages/server/service/index.js 25 - ''; 26 - 27 - yarnOfflineCache = fetchYarnDeps { 28 - yarnLock = finalAttrs.src + "/yarn.lock"; 29 - hash = "sha256-g8GzaAbWSnWwbQjJMV2DL5/ZlWCCX0sRkjjvX3tqU4Y="; 30 - }; 31 - 32 - nativeBuildInputs = [ 33 - yarnConfigHook 34 - yarnBuildHook 35 - nodejs 36 - makeBinaryWrapper 37 - ]; 38 - yarnBuildScript = "lerna"; 39 - yarnBuildFlags = [ 40 - "run" 41 - "build" 42 - "--scope" 43 - "@did-plc/server" 44 - "--include-dependencies" 45 - ]; 46 - 47 - installPhase = '' 48 - runHook preInstall 49 - 50 - mkdir -p $out/lib/node_modules/ 51 - mv packages/ $out/lib/packages/ 52 - mv node_modules/* $out/lib/node_modules/ 53 - 54 - makeWrapper ${lib.getExe nodejs} $out/bin/plc \ 55 - --add-flags $out/lib/packages/server/service/index.js \ 56 - --add-flags --enable-source-maps \ 57 - --set NODE_PATH $out/lib/node_modules 58 - 59 - runHook postInstall 60 - ''; 61 - 62 - meta = { 63 - mainProgram = "plc"; 64 - }; 65 - })
···
-20
nix/pkgs/tap.nix
··· 1 - { 2 - buildGoModule, 3 - fetchFromGitHub, 4 - }: 5 - buildGoModule { 6 - pname = "tap"; 7 - version = "0.1.0"; 8 - src = fetchFromGitHub { 9 - owner = "bluesky-social"; 10 - repo = "indigo"; 11 - rev = "f92cb29224fcc60f666b20ee3514e431a58ff811"; 12 - sha256 = "sha256-35ltXnq0SJeo3j33D7Nndbcnw5XWBJLRrmZ+nCmZVQw="; 13 - }; 14 - subPackages = ["cmd/tap"]; 15 - vendorHash = "sha256-UOedwNYnM8Jx6B7Y9tFcZX8IeUBESAFAPTRYk7n0yo8="; 16 - doCheck = false; 17 - meta = { 18 - mainProgram = "tap"; 19 - }; 20 - }
···
-2
nix/vm.nix
··· 19 20 plcUrl = envVarOr "TANGLED_VM_PLC_URL" "https://plc.directory"; 21 jetstream = envVarOr "TANGLED_VM_JETSTREAM_ENDPOINT" "wss://jetstream1.us-west.bsky.network/subscribe"; 22 - relayUrl = envVarOr "TANGLED_VM_RELAY_URL" "https://relay1.us-east.bsky.network"; 23 in 24 nixpkgs.lib.nixosSystem { 25 inherit system; ··· 96 }; 97 services.tangled.spindle = { 98 enable = true; 99 - atpRelayUrl = relayUrl; 100 server = { 101 owner = envVar "TANGLED_VM_SPINDLE_OWNER"; 102 hostname = envVarOr "TANGLED_VM_SPINDLE_HOST" "localhost:6555";
··· 19 20 plcUrl = envVarOr "TANGLED_VM_PLC_URL" "https://plc.directory"; 21 jetstream = envVarOr "TANGLED_VM_JETSTREAM_ENDPOINT" "wss://jetstream1.us-west.bsky.network/subscribe"; 22 in 23 nixpkgs.lib.nixosSystem { 24 inherit system; ··· 95 }; 96 services.tangled.spindle = { 97 enable = true; 98 server = { 99 owner = envVar "TANGLED_VM_SPINDLE_OWNER"; 100 hostname = envVarOr "TANGLED_VM_SPINDLE_HOST" "localhost:6555";
-10
orm/orm.go
··· 20 } 21 defer tx.Rollback() 22 23 - _, err = tx.Exec(` 24 - create table if not exists migrations ( 25 - id integer primary key autoincrement, 26 - name text unique 27 - ); 28 - `) 29 - if err != nil { 30 - return fmt.Errorf("creating migrations table: %w", err) 31 - } 32 - 33 var exists bool 34 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 35 if err != nil {
··· 20 } 21 defer tx.Rollback() 22 23 var exists bool 24 err = tx.QueryRow("select exists (select 1 from migrations where name = ?)", name).Scan(&exists) 25 if err != nil {
-144
rbac2/rbac2.go
··· 1 - package rbac2 2 - 3 - import ( 4 - "database/sql" 5 - "fmt" 6 - 7 - adapter "github.com/Blank-Xu/sql-adapter" 8 - "github.com/bluesky-social/indigo/atproto/syntax" 9 - "github.com/casbin/casbin/v2" 10 - "github.com/casbin/casbin/v2/model" 11 - "github.com/casbin/casbin/v2/util" 12 - "tangled.org/core/api/tangled" 13 - ) 14 - 15 - const ( 16 - Model = ` 17 - [request_definition] 18 - r = sub, dom, obj, act 19 - 20 - [policy_definition] 21 - p = sub, dom, obj, act 22 - 23 - [role_definition] 24 - g = _, _, _ 25 - 26 - [policy_effect] 27 - e = some(where (p.eft == allow)) 28 - 29 - [matchers] 30 - m = g(r.sub, p.sub, r.dom) && keyMatch4(r.dom, p.dom) && r.obj == p.obj && r.act == p.act 31 - ` 32 - ) 33 - 34 - type Enforcer struct { 35 - e *casbin.Enforcer 36 - } 37 - 38 - func NewEnforcer(path string) (*Enforcer, error) { 39 - m, err := model.NewModelFromString(Model) 40 - if err != nil { 41 - return nil, err 42 - } 43 - 44 - db, err := sql.Open("sqlite3", path+"?_foreign_keys=1") 45 - if err != nil { 46 - return nil, err 47 - } 48 - 49 - a, err := adapter.NewAdapter(db, "sqlite3", "acl") 50 - if err != nil { 51 - return nil, err 52 - } 53 - 54 - e, err := casbin.NewEnforcer(m, a) 55 - if err != nil { 56 - return nil, err 57 - } 58 - 59 - if err := seedTangledPolicies(e); err != nil { 60 - return nil, err 61 - } 62 - 63 - return &Enforcer{e}, nil 64 - } 65 - 66 - func seedTangledPolicies(e *casbin.Enforcer) error { 67 - // policies 68 - aturi := func(nsid string) string { 69 - return fmt.Sprintf("at://{did}/%s/{rkey}", nsid) 70 - } 71 - 72 - _, err := e.AddPoliciesEx([][]string{ 73 - // sub | dom | obj | act 74 - {"repo:owner", aturi(tangled.RepoNSID), "/", "write"}, 75 - {"repo:owner", aturi(tangled.RepoNSID), "/collaborator", "write"}, // invite 76 - {"repo:collaborator", aturi(tangled.RepoNSID), "/settings", "write"}, 77 - {"repo:collaborator", aturi(tangled.RepoNSID), "/git", "write"}, // git push 78 - 79 - {"server:owner", "/knot/{did}", "/member", "write"}, // invite 80 - {"server:member", "/knot/{did}", "/git", "write"}, 81 - 82 - {"server:owner", "/spindle/{did}", "/member", "write"}, // invite 83 - }) 84 - if err != nil { 85 - return err 86 - } 87 - 88 - // grouping policies 89 - // TODO(boltless): define our own matcher to replace keyMatch4 90 - e.AddNamedDomainMatchingFunc("g", "keyMatch4", util.KeyMatch4) 91 - _, err = e.AddGroupingPoliciesEx([][]string{ 92 - // sub | role | dom 93 - {"repo:owner", "repo:collaborator", aturi(tangled.RepoNSID)}, 94 - 95 - // using '/knot/' prefix here because knot/spindle identifiers don't 96 - // include the collection type 97 - {"server:owner", "server:member", "/knot/{did}"}, 98 - {"server:owner", "server:member", "/spindle/{did}"}, 99 - }) 100 - return err 101 - } 102 - 103 - func (e *Enforcer) hasImplicitRoleForUser(name string, role string, domain ...string) (bool, error) { 104 - roles, err := e.e.GetImplicitRolesForUser(name, domain...) 105 - if err != nil { 106 - return false, err 107 - } 108 - for _, r := range roles { 109 - if r == role { 110 - return true, nil 111 - } 112 - } 113 - return false, nil 114 - } 115 - 116 - // setRoleForUser sets single user role for specified domain. 117 - // All existing users with that role will be removed. 118 - func (e *Enforcer) setRoleForUser(name string, role string, domain ...string) error { 119 - currentUsers, err := e.e.GetUsersForRole(role, domain...) 120 - if err != nil { 121 - return err 122 - } 123 - 124 - for _, oldUser := range currentUsers { 125 - _, err = e.e.DeleteRoleForUser(oldUser, role, domain...) 126 - if err != nil { 127 - return err 128 - } 129 - } 130 - 131 - _, err = e.e.AddRoleForUser(name, role, domain...) 132 - return err 133 - } 134 - 135 - // validateAtUri enforeces AT-URI to have valid did as authority and match collection NSID. 136 - func validateAtUri(uri syntax.ATURI, expected string) error { 137 - if !uri.Authority().IsDID() { 138 - return fmt.Errorf("expected at-uri with did") 139 - } 140 - if expected != "" && uri.Collection().String() != expected { 141 - return fmt.Errorf("incorrect repo at-uri collection nsid '%s' (expected '%s')", uri.Collection(), expected) 142 - } 143 - return nil 144 - }
···
-115
rbac2/rbac2_test.go
··· 1 - package rbac2_test 2 - 3 - import ( 4 - "testing" 5 - 6 - "github.com/bluesky-social/indigo/atproto/syntax" 7 - _ "github.com/mattn/go-sqlite3" 8 - "github.com/stretchr/testify/assert" 9 - "tangled.org/core/rbac2" 10 - ) 11 - 12 - func setup(t *testing.T) *rbac2.Enforcer { 13 - enforcer, err := rbac2.NewEnforcer(":memory:") 14 - assert.NoError(t, err) 15 - 16 - return enforcer 17 - } 18 - 19 - func TestRepoOwnerPermissions(t *testing.T) { 20 - var ( 21 - e = setup(t) 22 - ok bool 23 - err error 24 - fooRepo = syntax.ATURI("at://did:plc:foo/sh.tangled.repo/reporkey") 25 - fooUser = syntax.DID("did:plc:foo") 26 - ) 27 - 28 - assert.NoError(t, e.AddRepo(fooRepo)) 29 - 30 - ok, err = e.IsRepoOwner(fooUser, fooRepo) 31 - assert.NoError(t, err) 32 - assert.True(t, ok, "repo author should be repo owner") 33 - 34 - ok, err = e.IsRepoWriteAllowed(fooUser, fooRepo) 35 - assert.NoError(t, err) 36 - assert.True(t, ok, "repo owner should be able to modify the repo itself") 37 - 38 - ok, err = e.IsRepoCollaborator(fooUser, fooRepo) 39 - assert.NoError(t, err) 40 - assert.True(t, ok, "repo owner should inherit role role:collaborator") 41 - 42 - ok, err = e.IsRepoSettingsWriteAllowed(fooUser, fooRepo) 43 - assert.NoError(t, err) 44 - assert.True(t, ok, "repo owner should inherit collaborator permissions") 45 - } 46 - 47 - func TestRepoCollaboratorPermissions(t *testing.T) { 48 - var ( 49 - e = setup(t) 50 - ok bool 51 - err error 52 - fooRepo = syntax.ATURI("at://did:plc:foo/sh.tangled.repo/reporkey") 53 - barUser = syntax.DID("did:plc:bar") 54 - ) 55 - 56 - assert.NoError(t, e.AddRepo(fooRepo)) 57 - assert.NoError(t, e.AddRepoCollaborator(barUser, fooRepo)) 58 - 59 - ok, err = e.IsRepoCollaborator(barUser, fooRepo) 60 - assert.NoError(t, err) 61 - assert.True(t, ok, "should set repo collaborator") 62 - 63 - ok, err = e.IsRepoSettingsWriteAllowed(barUser, fooRepo) 64 - assert.NoError(t, err) 65 - assert.True(t, ok, "repo collaborator should be able to edit repo settings") 66 - 67 - ok, err = e.IsRepoWriteAllowed(barUser, fooRepo) 68 - assert.NoError(t, err) 69 - assert.False(t, ok, "repo collaborator shouldn't be able to modify the repo itself") 70 - } 71 - 72 - func TestGetByRole(t *testing.T) { 73 - var ( 74 - e = setup(t) 75 - err error 76 - fooRepo = syntax.ATURI("at://did:plc:foo/sh.tangled.repo/reporkey") 77 - owner = syntax.DID("did:plc:foo") 78 - collaborator1 = syntax.DID("did:plc:bar") 79 - collaborator2 = syntax.DID("did:plc:baz") 80 - ) 81 - 82 - assert.NoError(t, e.AddRepo(fooRepo)) 83 - assert.NoError(t, e.AddRepoCollaborator(collaborator1, fooRepo)) 84 - assert.NoError(t, e.AddRepoCollaborator(collaborator2, fooRepo)) 85 - 86 - collaborators, err := e.GetRepoCollaborators(fooRepo) 87 - assert.NoError(t, err) 88 - assert.ElementsMatch(t, []syntax.DID{ 89 - owner, 90 - collaborator1, 91 - collaborator2, 92 - }, collaborators) 93 - } 94 - 95 - func TestSpindleOwnerPermissions(t *testing.T) { 96 - var ( 97 - e = setup(t) 98 - ok bool 99 - err error 100 - spindle = syntax.DID("did:web:spindle.example.com") 101 - owner = syntax.DID("did:plc:foo") 102 - member = syntax.DID("did:plc:bar") 103 - ) 104 - 105 - assert.NoError(t, e.SetSpindleOwner(owner, spindle)) 106 - assert.NoError(t, e.AddSpindleMember(member, spindle)) 107 - 108 - ok, err = e.IsSpindleMemberInviteAllowed(owner, spindle) 109 - assert.NoError(t, err) 110 - assert.True(t, ok, "spindle owner can invite members") 111 - 112 - ok, err = e.IsSpindleMemberInviteAllowed(member, spindle) 113 - assert.NoError(t, err) 114 - assert.False(t, ok, "spindle member cannot invite members") 115 - }
···
-91
rbac2/repo.go
··· 1 - package rbac2 2 - 3 - import ( 4 - "slices" 5 - "strings" 6 - 7 - "github.com/bluesky-social/indigo/atproto/syntax" 8 - "tangled.org/core/api/tangled" 9 - ) 10 - 11 - // AddRepo adds new repo with its owner to rbac enforcer 12 - func (e *Enforcer) AddRepo(repo syntax.ATURI) error { 13 - if err := validateAtUri(repo, tangled.RepoNSID); err != nil { 14 - return err 15 - } 16 - user := repo.Authority() 17 - 18 - return e.setRoleForUser(user.String(), "repo:owner", repo.String()) 19 - } 20 - 21 - // DeleteRepo deletes all policies related to the repo 22 - func (e *Enforcer) DeleteRepo(repo syntax.ATURI) error { 23 - if err := validateAtUri(repo, tangled.RepoNSID); err != nil { 24 - return err 25 - } 26 - 27 - _, err := e.e.DeleteDomains(repo.String()) 28 - return err 29 - } 30 - 31 - // AddRepoCollaborator adds new collaborator to the repo 32 - func (e *Enforcer) AddRepoCollaborator(user syntax.DID, repo syntax.ATURI) error { 33 - if err := validateAtUri(repo, tangled.RepoNSID); err != nil { 34 - return err 35 - } 36 - 37 - _, err := e.e.AddRoleForUser(user.String(), "repo:collaborator", repo.String()) 38 - return err 39 - } 40 - 41 - // RemoveRepoCollaborator removes the collaborator from the repo. 42 - // This won't remove inherited roles like repository owner. 43 - func (e *Enforcer) RemoveRepoCollaborator(user syntax.DID, repo syntax.ATURI) error { 44 - if err := validateAtUri(repo, tangled.RepoNSID); err != nil { 45 - return err 46 - } 47 - 48 - _, err := e.e.DeleteRoleForUser(user.String(), "repo:collaborator", repo.String()) 49 - return err 50 - } 51 - 52 - func (e *Enforcer) GetRepoCollaborators(repo syntax.ATURI) ([]syntax.DID, error) { 53 - var collaborators []syntax.DID 54 - members, err := e.e.GetImplicitUsersForRole("repo:collaborator", repo.String()) 55 - if err != nil { 56 - return nil, err 57 - } 58 - for _, m := range members { 59 - if !strings.HasPrefix(m, "did:") { // skip non-user subjects like 'repo:owner' 60 - continue 61 - } 62 - collaborators = append(collaborators, syntax.DID(m)) 63 - } 64 - 65 - slices.Sort(collaborators) 66 - return slices.Compact(collaborators), nil 67 - } 68 - 69 - func (e *Enforcer) IsRepoOwner(user syntax.DID, repo syntax.ATURI) (bool, error) { 70 - return e.e.HasRoleForUser(user.String(), "repo:owner", repo.String()) 71 - } 72 - 73 - func (e *Enforcer) IsRepoCollaborator(user syntax.DID, repo syntax.ATURI) (bool, error) { 74 - return e.hasImplicitRoleForUser(user.String(), "repo:collaborator", repo.String()) 75 - } 76 - 77 - func (e *Enforcer) IsRepoWriteAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) { 78 - return e.e.Enforce(user.String(), repo.String(), "#/", "write") 79 - } 80 - 81 - func (e *Enforcer) IsRepoSettingsWriteAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) { 82 - return e.e.Enforce(user.String(), repo.String(), "#/settings", "write") 83 - } 84 - 85 - func (e *Enforcer) IsRepoCollaboratorInviteAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) { 86 - return e.e.Enforce(user.String(), repo.String(), "#/collaborator", "write") 87 - } 88 - 89 - func (e *Enforcer) IsRepoGitPushAllowed(user syntax.DID, repo syntax.ATURI) (bool, error) { 90 - return e.e.Enforce(user.String(), repo.String(), "#/git", "write") 91 - }
···
-29
rbac2/spindle.go
··· 1 - package rbac2 2 - 3 - import "github.com/bluesky-social/indigo/atproto/syntax" 4 - 5 - func (e *Enforcer) SetSpindleOwner(user syntax.DID, spindle syntax.DID) error { 6 - return e.setRoleForUser(user.String(), "server:owner", intoSpindle(spindle)) 7 - } 8 - 9 - func (e *Enforcer) IsSpindleMember(user syntax.DID, spindle syntax.DID) (bool, error) { 10 - return e.e.HasRoleForUser(user.String(), "server:member", spindle.String()) 11 - } 12 - 13 - func (e *Enforcer) AddSpindleMember(user syntax.DID, spindle syntax.DID) error { 14 - _, err := e.e.AddRoleForUser(user.String(), "server:member", intoSpindle(spindle)) 15 - return err 16 - } 17 - 18 - func (e *Enforcer) RemoveSpindleMember(user syntax.DID, spindle syntax.DID) error { 19 - _, err := e.e.DeleteRoleForUser(user.String(), "server:member", intoSpindle(spindle)) 20 - return err 21 - } 22 - 23 - func (e *Enforcer) IsSpindleMemberInviteAllowed(user syntax.DID, spindle syntax.DID) (bool, error) { 24 - return e.e.Enforce(user.String(), intoSpindle(spindle), "#/member", "write") 25 - } 26 - 27 - func intoSpindle(did syntax.DID) string { 28 - return "/spindle/" + did.String() 29 - }
···
+11 -18
spindle/config/config.go
··· 3 import ( 4 "context" 5 "fmt" 6 - "path" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 9 "github.com/sethvargo/go-envconfig" 10 ) 11 12 type Server struct { 13 - ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"` 14 - DBPath string `env:"DB_PATH, default=spindle.db"` 15 - Hostname string `env:"HOSTNAME, required"` 16 - JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 17 - TapUrl string `env:"TAP_URL, required"` 18 - PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 19 - Dev bool `env:"DEV, default=false"` 20 - Owner syntax.DID `env:"OWNER, required"` 21 - Secrets Secrets `env:",prefix=SECRETS_"` 22 - LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 23 - DataDir string `env:"DATA_DIR, default=/var/lib/spindle"` 24 - QueueSize int `env:"QUEUE_SIZE, default=100"` 25 - MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 26 } 27 28 func (s Server) Did() syntax.DID { 29 return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname)) 30 - } 31 - 32 - func (s Server) RepoDir() string { 33 - return path.Join(s.DataDir, "repos") 34 } 35 36 type Secrets struct {
··· 3 import ( 4 "context" 5 "fmt" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "github.com/sethvargo/go-envconfig" 9 ) 10 11 type Server struct { 12 + ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"` 13 + DBPath string `env:"DB_PATH, default=spindle.db"` 14 + Hostname string `env:"HOSTNAME, required"` 15 + JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"` 16 + PlcUrl string `env:"PLC_URL, default=https://plc.directory"` 17 + Dev bool `env:"DEV, default=false"` 18 + Owner string `env:"OWNER, required"` 19 + Secrets Secrets `env:",prefix=SECRETS_"` 20 + LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 21 + QueueSize int `env:"QUEUE_SIZE, default=100"` 22 + MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 23 } 24 25 func (s Server) Did() syntax.DID { 26 return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname)) 27 } 28 29 type Secrets struct {
+18 -59
spindle/db/db.go
··· 1 package db 2 3 import ( 4 - "context" 5 "database/sql" 6 "strings" 7 8 - "github.com/bluesky-social/indigo/atproto/syntax" 9 _ "github.com/mattn/go-sqlite3" 10 - "tangled.org/core/log" 11 - "tangled.org/core/orm" 12 ) 13 14 type DB struct { 15 *sql.DB 16 } 17 18 - func Make(ctx context.Context, dbPath string) (*DB, error) { 19 // https://github.com/mattn/go-sqlite3#connection-string 20 opts := []string{ 21 "_foreign_keys=1", ··· 23 "_synchronous=NORMAL", 24 "_auto_vacuum=incremental", 25 } 26 - 27 - logger := log.FromContext(ctx) 28 - logger = log.SubLogger(logger, "db") 29 30 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 31 if err != nil { 32 return nil, err 33 } 34 35 - conn, err := db.Conn(ctx) 36 - if err != nil { 37 - return nil, err 38 - } 39 - defer conn.Close() 40 41 _, err = db.Exec(` 42 create table if not exists _jetstream ( ··· 85 return nil, err 86 } 87 88 - // run migrations 89 90 - // NOTE: this won't migrate existing records 91 - // they will be fetched again with tap instead 92 - orm.RunMigration(conn, logger, "add-rkey-to-repos", func(tx *sql.Tx) error { 93 - // archive legacy repos (just in case) 94 - _, err = tx.Exec(`alter table repos rename to repos_old`) 95 - if err != nil { 96 - return err 97 - } 98 - 99 - _, err := tx.Exec(` 100 - create table repos_new ( 101 - -- identifiers 102 - id integer primary key autoincrement, 103 - did text not null, 104 - rkey text not null, 105 - at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 106 - 107 - name text not null, 108 - knot text not null, 109 - 110 - addedAt text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 111 - unique(did, rkey) 112 - ); 113 - `) 114 - if err != nil { 115 - return err 116 - } 117 - 118 - return nil 119 - }) 120 - 121 - return &DB{db}, nil 122 } 123 124 - func (d *DB) IsKnownDid(did syntax.DID) (bool, error) { 125 - // is spindle member / repo collaborator 126 - var exists bool 127 - err := d.QueryRow( 128 - `select exists ( 129 - select 1 from repo_collaborators where did = ? 130 - union all 131 - select 1 from spindle_members where did = ? 132 - )`, 133 - did, 134 - did, 135 - ).Scan(&exists) 136 - return exists, err 137 }
··· 1 package db 2 3 import ( 4 "database/sql" 5 "strings" 6 7 _ "github.com/mattn/go-sqlite3" 8 ) 9 10 type DB struct { 11 *sql.DB 12 } 13 14 + func Make(dbPath string) (*DB, error) { 15 // https://github.com/mattn/go-sqlite3#connection-string 16 opts := []string{ 17 "_foreign_keys=1", ··· 19 "_synchronous=NORMAL", 20 "_auto_vacuum=incremental", 21 } 22 23 db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 24 if err != nil { 25 return nil, err 26 } 27 28 + // NOTE: If any other migration is added here, you MUST 29 + // copy the pattern in appview: use a single sql.Conn 30 + // for every migration. 31 32 _, err = db.Exec(` 33 create table if not exists _jetstream ( ··· 76 return nil, err 77 } 78 79 + return &DB{db}, nil 80 + } 81 82 + func (d *DB) SaveLastTimeUs(lastTimeUs int64) error { 83 + _, err := d.Exec(` 84 + insert into _jetstream (id, last_time_us) 85 + values (1, ?) 86 + on conflict(id) do update set last_time_us = excluded.last_time_us 87 + `, lastTimeUs) 88 + return err 89 } 90 91 + func (d *DB) GetLastTimeUs() (int64, error) { 92 + var lastTimeUs int64 93 + row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`) 94 + err := row.Scan(&lastTimeUs) 95 + return lastTimeUs, err 96 }
+18 -6
spindle/db/events.go
··· 18 EventJson string `json:"event"` 19 } 20 21 - func (d *DB) insertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, ··· 70 return evts, nil 71 } 72 73 func (d *DB) createStatusEvent( 74 workflowId models.WorkflowId, 75 statusKind models.StatusKind, ··· 100 EventJson: string(eventJson), 101 } 102 103 - return d.insertEvent(event, n) 104 105 } 106 ··· 148 149 func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 150 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 151 - } 152 - 153 - func (d *DB) StatusCancelled(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 154 - return d.createStatusEvent(workflowId, models.StatusKindCancelled, &workflowError, &exitCode, n) 155 } 156 157 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
··· 18 EventJson string `json:"event"` 19 } 20 21 + func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, ··· 70 return evts, nil 71 } 72 73 + func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(s) 75 + if err != nil { 76 + return err 77 + } 78 + 79 + event := Event{ 80 + Rkey: rkey, 81 + Nsid: tangled.PipelineStatusNSID, 82 + Created: time.Now().UnixNano(), 83 + EventJson: string(eventJson), 84 + } 85 + 86 + return d.InsertEvent(event, n) 87 + } 88 + 89 func (d *DB) createStatusEvent( 90 workflowId models.WorkflowId, 91 statusKind models.StatusKind, ··· 116 EventJson: string(eventJson), 117 } 118 119 + return d.InsertEvent(event, n) 120 121 } 122 ··· 164 165 func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 166 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 167 } 168 169 func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error {
+44
spindle/db/known_dids.go
···
··· 1 + package db 2 + 3 + func (d *DB) AddDid(did string) error { 4 + _, err := d.Exec(`insert or ignore into known_dids (did) values (?)`, did) 5 + return err 6 + } 7 + 8 + func (d *DB) RemoveDid(did string) error { 9 + _, err := d.Exec(`delete from known_dids where did = ?`, did) 10 + return err 11 + } 12 + 13 + func (d *DB) GetAllDids() ([]string, error) { 14 + var dids []string 15 + 16 + rows, err := d.Query(`select did from known_dids`) 17 + if err != nil { 18 + return nil, err 19 + } 20 + defer rows.Close() 21 + 22 + for rows.Next() { 23 + var did string 24 + if err := rows.Scan(&did); err != nil { 25 + return nil, err 26 + } 27 + dids = append(dids, did) 28 + } 29 + 30 + if err := rows.Err(); err != nil { 31 + return nil, err 32 + } 33 + 34 + return dids, nil 35 + } 36 + 37 + func (d *DB) HasKnownDids() bool { 38 + var count int 39 + err := d.QueryRow(`select count(*) from known_dids`).Scan(&count) 40 + if err != nil { 41 + return false 42 + } 43 + return count > 0 44 + }
+11 -120
spindle/db/repos.go
··· 1 package db 2 3 - import "github.com/bluesky-social/indigo/atproto/syntax" 4 - 5 type Repo struct { 6 - Did syntax.DID 7 - Rkey syntax.RecordKey 8 - Name string 9 - Knot string 10 } 11 12 - type RepoCollaborator struct { 13 - Did syntax.DID 14 - Rkey syntax.RecordKey 15 - Repo syntax.ATURI 16 - Subject syntax.DID 17 - } 18 - 19 - func (d *DB) PutRepo(repo *Repo) error { 20 - _, err := d.Exec( 21 - `insert or ignore into repos (did, rkey, name, knot) 22 - values (?, ?, ?, ?) 23 - on conflict(did, rkey) do update set 24 - name = excluded.name 25 - knot = excluded.knot`, 26 - repo.Did, 27 - repo.Rkey, 28 - repo.Name, 29 - repo.Knot, 30 - ) 31 - return err 32 - } 33 - 34 - func (d *DB) DeleteRepo(did syntax.DID, rkey syntax.RecordKey) error { 35 - _, err := d.Exec( 36 - `delete from repos where did = ? and rkey = ?`, 37 - did, 38 - rkey, 39 - ) 40 return err 41 } 42 ··· 63 return knots, nil 64 } 65 66 - func (d *DB) GetRepo(did syntax.DID, rkey syntax.RecordKey) (*Repo, error) { 67 var repo Repo 68 - err := d.DB.QueryRow( 69 - `select 70 - did, 71 - rkey, 72 - name, 73 - knot 74 - from repos where did = ? and rkey = ?`, 75 - did, 76 - rkey, 77 - ).Scan( 78 - &repo.Did, 79 - &repo.Rkey, 80 - &repo.Name, 81 - &repo.Knot, 82 - ) 83 if err != nil { 84 return nil, err 85 } 86 - return &repo, nil 87 - } 88 89 - func (d *DB) GetRepoWithName(did syntax.DID, name string) (*Repo, error) { 90 - var repo Repo 91 - err := d.DB.QueryRow( 92 - `select 93 - did, 94 - rkey, 95 - name, 96 - knot 97 - from repos where did = ? and name = ?`, 98 - did, 99 - name, 100 - ).Scan( 101 - &repo.Did, 102 - &repo.Rkey, 103 - &repo.Name, 104 - &repo.Knot, 105 - ) 106 - if err != nil { 107 - return nil, err 108 - } 109 return &repo, nil 110 } 111 - 112 - func (d *DB) PutRepoCollaborator(collaborator *RepoCollaborator) error { 113 - _, err := d.Exec( 114 - `insert into repo_collaborators (did, rkey, repo, subject) 115 - values (?, ?, ?, ?) 116 - on conflict(did, rkey) do update set 117 - repo = excluded.repo 118 - subject = excluded.subject`, 119 - collaborator.Did, 120 - collaborator.Rkey, 121 - collaborator.Repo, 122 - collaborator.Subject, 123 - ) 124 - return err 125 - } 126 - 127 - func (d *DB) RemoveRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) error { 128 - _, err := d.Exec( 129 - `delete from repo_collaborators where did = ? and rkey = ?`, 130 - did, 131 - rkey, 132 - ) 133 - return err 134 - } 135 - 136 - func (d *DB) GetRepoCollaborator(did syntax.DID, rkey syntax.RecordKey) (*RepoCollaborator, error) { 137 - var collaborator RepoCollaborator 138 - err := d.DB.QueryRow( 139 - `select 140 - did, 141 - rkey, 142 - repo, 143 - subject 144 - from repo_collaborators 145 - where did = ? and rkey = ?`, 146 - did, 147 - rkey, 148 - ).Scan( 149 - &collaborator.Did, 150 - &collaborator.Rkey, 151 - &collaborator.Repo, 152 - &collaborator.Subject, 153 - ) 154 - if err != nil { 155 - return nil, err 156 - } 157 - return &collaborator, nil 158 - }
··· 1 package db 2 3 type Repo struct { 4 + Knot string 5 + Owner string 6 + Name string 7 } 8 9 + func (d *DB) AddRepo(knot, owner, name string) error { 10 + _, err := d.Exec(`insert or ignore into repos (knot, owner, name) values (?, ?, ?)`, knot, owner, name) 11 return err 12 } 13 ··· 34 return knots, nil 35 } 36 37 + func (d *DB) GetRepo(knot, owner, name string) (*Repo, error) { 38 var repo Repo 39 + 40 + query := "select knot, owner, name from repos where knot = ? and owner = ? and name = ?" 41 + err := d.DB.QueryRow(query, knot, owner, name). 42 + Scan(&repo.Knot, &repo.Owner, &repo.Name) 43 + 44 if err != nil { 45 return nil, err 46 } 47 48 return &repo, nil 49 }
+10 -24
spindle/engines/nixery/engine.go
··· 179 return err 180 } 181 e.registerCleanup(wid, func(ctx context.Context) error { 182 - err := e.docker.NetworkRemove(ctx, networkName(wid)) 183 - if err != nil { 184 - return fmt.Errorf("removing network: %w", err) 185 - } 186 - return nil 187 }) 188 189 addl := wf.Data.(addlFields) ··· 233 return fmt.Errorf("creating container: %w", err) 234 } 235 e.registerCleanup(wid, func(ctx context.Context) error { 236 - err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 237 if err != nil { 238 - return fmt.Errorf("stopping container: %w", err) 239 } 240 241 - err = e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 242 RemoveVolumes: true, 243 RemoveLinks: false, 244 Force: false, 245 }) 246 - if err != nil { 247 - return fmt.Errorf("removing container: %w", err) 248 - } 249 - return nil 250 }) 251 252 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) ··· 402 } 403 404 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 405 - fns := e.drainCleanups(wid) 406 407 for _, fn := range fns { 408 if err := fn(ctx); err != nil { ··· 418 419 key := wid.String() 420 e.cleanup[key] = append(e.cleanup[key], fn) 421 - } 422 - 423 - func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 424 - e.cleanupMu.Lock() 425 - key := wid.String() 426 - 427 - fns := e.cleanup[key] 428 - delete(e.cleanup, key) 429 - e.cleanupMu.Unlock() 430 - 431 - return fns 432 } 433 434 func networkName(wid models.WorkflowId) string {
··· 179 return err 180 } 181 e.registerCleanup(wid, func(ctx context.Context) error { 182 + return e.docker.NetworkRemove(ctx, networkName(wid)) 183 }) 184 185 addl := wf.Data.(addlFields) ··· 229 return fmt.Errorf("creating container: %w", err) 230 } 231 e.registerCleanup(wid, func(ctx context.Context) error { 232 + err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 233 if err != nil { 234 + return err 235 } 236 237 + return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 238 RemoveVolumes: true, 239 RemoveLinks: false, 240 Force: false, 241 }) 242 }) 243 244 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) ··· 394 } 395 396 func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 397 + e.cleanupMu.Lock() 398 + key := wid.String() 399 + 400 + fns := e.cleanup[key] 401 + delete(e.cleanup, key) 402 + e.cleanupMu.Unlock() 403 404 for _, fn := range fns { 405 if err := fn(ctx); err != nil { ··· 415 416 key := wid.String() 417 e.cleanup[key] = append(e.cleanup[key], fn) 418 } 419 420 func networkName(wid models.WorkflowId) string {
+300
spindle/ingester.go
···
··· 1 + package spindle 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "fmt" 8 + "time" 9 + 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/eventconsumer" 12 + "tangled.org/core/rbac" 13 + "tangled.org/core/spindle/db" 14 + 15 + comatproto "github.com/bluesky-social/indigo/api/atproto" 16 + "github.com/bluesky-social/indigo/atproto/identity" 17 + "github.com/bluesky-social/indigo/atproto/syntax" 18 + "github.com/bluesky-social/indigo/xrpc" 19 + "github.com/bluesky-social/jetstream/pkg/models" 20 + securejoin "github.com/cyphar/filepath-securejoin" 21 + ) 22 + 23 + type Ingester func(ctx context.Context, e *models.Event) error 24 + 25 + func (s *Spindle) ingest() Ingester { 26 + return func(ctx context.Context, e *models.Event) error { 27 + var err error 28 + defer func() { 29 + eventTime := e.TimeUS 30 + lastTimeUs := eventTime + 1 31 + if err := s.db.SaveLastTimeUs(lastTimeUs); err != nil { 32 + err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 33 + } 34 + }() 35 + 36 + if e.Kind != models.EventKindCommit { 37 + return nil 38 + } 39 + 40 + switch e.Commit.Collection { 41 + case tangled.SpindleMemberNSID: 42 + err = s.ingestMember(ctx, e) 43 + case tangled.RepoNSID: 44 + err = s.ingestRepo(ctx, e) 45 + case tangled.RepoCollaboratorNSID: 46 + err = s.ingestCollaborator(ctx, e) 47 + } 48 + 49 + if err != nil { 50 + s.l.Debug("failed to process message", "nsid", e.Commit.Collection, "err", err) 51 + } 52 + 53 + return nil 54 + } 55 + } 56 + 57 + func (s *Spindle) ingestMember(_ context.Context, e *models.Event) error { 58 + var err error 59 + did := e.Did 60 + rkey := e.Commit.RKey 61 + 62 + l := s.l.With("component", "ingester", "record", tangled.SpindleMemberNSID) 63 + 64 + switch e.Commit.Operation { 65 + case models.CommitOperationCreate, models.CommitOperationUpdate: 66 + raw := e.Commit.Record 67 + record := tangled.SpindleMember{} 68 + err = json.Unmarshal(raw, &record) 69 + if err != nil { 70 + l.Error("invalid record", "error", err) 71 + return err 72 + } 73 + 74 + domain := s.cfg.Server.Hostname 75 + recordInstance := record.Instance 76 + 77 + if recordInstance != domain { 78 + l.Error("domain mismatch", "domain", recordInstance, "expected", domain) 79 + return fmt.Errorf("domain mismatch: %s != %s", record.Instance, domain) 80 + } 81 + 82 + ok, err := s.e.IsSpindleInviteAllowed(did, rbacDomain) 83 + if err != nil || !ok { 84 + l.Error("failed to add member", "did", did, "error", err) 85 + return fmt.Errorf("failed to enforce permissions: %w", err) 86 + } 87 + 88 + if err := db.AddSpindleMember(s.db, db.SpindleMember{ 89 + Did: syntax.DID(did), 90 + Rkey: rkey, 91 + Instance: recordInstance, 92 + Subject: syntax.DID(record.Subject), 93 + Created: time.Now(), 94 + }); err != nil { 95 + l.Error("failed to add member", "error", err) 96 + return fmt.Errorf("failed to add member: %w", err) 97 + } 98 + 99 + if err := s.e.AddSpindleMember(rbacDomain, record.Subject); err != nil { 100 + l.Error("failed to add member", "error", err) 101 + return fmt.Errorf("failed to add member: %w", err) 102 + } 103 + l.Info("added member from firehose", "member", record.Subject) 104 + 105 + if err := s.db.AddDid(record.Subject); err != nil { 106 + l.Error("failed to add did", "error", err) 107 + return fmt.Errorf("failed to add did: %w", err) 108 + } 109 + s.jc.AddDid(record.Subject) 110 + 111 + return nil 112 + 113 + case models.CommitOperationDelete: 114 + record, err := db.GetSpindleMember(s.db, did, rkey) 115 + if err != nil { 116 + l.Error("failed to find member", "error", err) 117 + return fmt.Errorf("failed to find member: %w", err) 118 + } 119 + 120 + if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 121 + l.Error("failed to remove member", "error", err) 122 + return fmt.Errorf("failed to remove member: %w", err) 123 + } 124 + 125 + if err := s.e.RemoveSpindleMember(rbacDomain, record.Subject.String()); err != nil { 126 + l.Error("failed to add member", "error", err) 127 + return fmt.Errorf("failed to add member: %w", err) 128 + } 129 + l.Info("added member from firehose", "member", record.Subject) 130 + 131 + if err := s.db.RemoveDid(record.Subject.String()); err != nil { 132 + l.Error("failed to add did", "error", err) 133 + return fmt.Errorf("failed to add did: %w", err) 134 + } 135 + s.jc.RemoveDid(record.Subject.String()) 136 + 137 + } 138 + return nil 139 + } 140 + 141 + func (s *Spindle) ingestRepo(ctx context.Context, e *models.Event) error { 142 + var err error 143 + did := e.Did 144 + 145 + l := s.l.With("component", "ingester", "record", tangled.RepoNSID) 146 + 147 + l.Info("ingesting repo record", "did", did) 148 + 149 + switch e.Commit.Operation { 150 + case models.CommitOperationCreate, models.CommitOperationUpdate: 151 + raw := e.Commit.Record 152 + record := tangled.Repo{} 153 + err = json.Unmarshal(raw, &record) 154 + if err != nil { 155 + l.Error("invalid record", "error", err) 156 + return err 157 + } 158 + 159 + domain := s.cfg.Server.Hostname 160 + 161 + // no spindle configured for this repo 162 + if record.Spindle == nil { 163 + l.Info("no spindle configured", "name", record.Name) 164 + return nil 165 + } 166 + 167 + // this repo did not want this spindle 168 + if *record.Spindle != domain { 169 + l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 170 + return nil 171 + } 172 + 173 + // add this repo to the watch list 174 + if err := s.db.AddRepo(record.Knot, did, record.Name); err != nil { 175 + l.Error("failed to add repo", "error", err) 176 + return fmt.Errorf("failed to add repo: %w", err) 177 + } 178 + 179 + didSlashRepo, err := securejoin.SecureJoin(did, record.Name) 180 + if err != nil { 181 + return err 182 + } 183 + 184 + // add repo to rbac 185 + if err := s.e.AddRepo(did, rbac.ThisServer, didSlashRepo); err != nil { 186 + l.Error("failed to add repo to enforcer", "error", err) 187 + return fmt.Errorf("failed to add repo: %w", err) 188 + } 189 + 190 + // add collaborators to rbac 191 + owner, err := s.res.ResolveIdent(ctx, did) 192 + if err != nil || owner.Handle.IsInvalidHandle() { 193 + return err 194 + } 195 + if err := s.fetchAndAddCollaborators(ctx, owner, didSlashRepo); err != nil { 196 + return err 197 + } 198 + 199 + // add this knot to the event consumer 200 + src := eventconsumer.NewKnotSource(record.Knot) 201 + s.ks.AddSource(context.Background(), src) 202 + 203 + return nil 204 + 205 + } 206 + return nil 207 + } 208 + 209 + func (s *Spindle) ingestCollaborator(ctx context.Context, e *models.Event) error { 210 + var err error 211 + 212 + l := s.l.With("component", "ingester", "record", tangled.RepoCollaboratorNSID, "did", e.Did) 213 + 214 + l.Info("ingesting collaborator record") 215 + 216 + switch e.Commit.Operation { 217 + case models.CommitOperationCreate, models.CommitOperationUpdate: 218 + raw := e.Commit.Record 219 + record := tangled.RepoCollaborator{} 220 + err = json.Unmarshal(raw, &record) 221 + if err != nil { 222 + l.Error("invalid record", "error", err) 223 + return err 224 + } 225 + 226 + subjectId, err := s.res.ResolveIdent(ctx, record.Subject) 227 + if err != nil || subjectId.Handle.IsInvalidHandle() { 228 + return err 229 + } 230 + 231 + repoAt, err := syntax.ParseATURI(record.Repo) 232 + if err != nil { 233 + l.Info("rejecting record, invalid repoAt", "repoAt", record.Repo) 234 + return nil 235 + } 236 + 237 + // TODO: get rid of this entirely 238 + // resolve this aturi to extract the repo record 239 + owner, err := s.res.ResolveIdent(ctx, repoAt.Authority().String()) 240 + if err != nil || owner.Handle.IsInvalidHandle() { 241 + return fmt.Errorf("failed to resolve handle: %w", err) 242 + } 243 + 244 + xrpcc := xrpc.Client{ 245 + Host: owner.PDSEndpoint(), 246 + } 247 + 248 + resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 249 + if err != nil { 250 + return err 251 + } 252 + 253 + repo := resp.Value.Val.(*tangled.Repo) 254 + didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 255 + 256 + // check perms for this user 257 + if ok, err := s.e.IsCollaboratorInviteAllowed(owner.DID.String(), rbac.ThisServer, didSlashRepo); !ok || err != nil { 258 + return fmt.Errorf("insufficient permissions: %w", err) 259 + } 260 + 261 + // add collaborator to rbac 262 + if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 263 + l.Error("failed to add repo to enforcer", "error", err) 264 + return fmt.Errorf("failed to add repo: %w", err) 265 + } 266 + 267 + return nil 268 + } 269 + return nil 270 + } 271 + 272 + func (s *Spindle) fetchAndAddCollaborators(ctx context.Context, owner *identity.Identity, didSlashRepo string) error { 273 + l := s.l.With("component", "ingester", "handler", "fetchAndAddCollaborators") 274 + 275 + l.Info("fetching and adding existing collaborators") 276 + 277 + xrpcc := xrpc.Client{ 278 + Host: owner.PDSEndpoint(), 279 + } 280 + 281 + resp, err := comatproto.RepoListRecords(ctx, &xrpcc, tangled.RepoCollaboratorNSID, "", 50, owner.DID.String(), false) 282 + if err != nil { 283 + return err 284 + } 285 + 286 + var errs error 287 + for _, r := range resp.Records { 288 + if r == nil { 289 + continue 290 + } 291 + record := r.Value.Val.(*tangled.RepoCollaborator) 292 + 293 + if err := s.e.AddCollaborator(record.Subject, rbac.ThisServer, didSlashRepo); err != nil { 294 + l.Error("failed to add repo to enforcer", "error", err) 295 + errors.Join(errs, fmt.Errorf("failed to add repo: %w", err)) 296 + } 297 + } 298 + 299 + return errs 300 + }
+1 -1
spindle/models/pipeline_env.go
··· 20 // Standard CI environment variable 21 env["CI"] = "true" 22 23 - env["TANGLED_PIPELINE_ID"] = pipelineId.AtUri().String() 24 25 // Repo info 26 if tr.Repo != nil {
··· 20 // Standard CI environment variable 21 env["CI"] = "true" 22 23 + env["TANGLED_PIPELINE_ID"] = pipelineId.Rkey 24 25 // Repo info 26 if tr.Repo != nil {
+70 -133
spindle/server.go
··· 1 package spindle 2 3 import ( 4 - "bytes" 5 "context" 6 _ "embed" 7 "encoding/json" ··· 9 "log/slog" 10 "maps" 11 "net/http" 12 - "os" 13 - "os/exec" 14 - "path" 15 - "strings" 16 17 - "github.com/bluesky-social/indigo/atproto/syntax" 18 "github.com/go-chi/chi/v5" 19 - "github.com/hashicorp/go-version" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/eventconsumer" 22 "tangled.org/core/eventconsumer/cursor" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/log" 25 "tangled.org/core/notifier" 26 - "tangled.org/core/rbac2" 27 "tangled.org/core/spindle/config" 28 "tangled.org/core/spindle/db" 29 "tangled.org/core/spindle/engine" ··· 32 "tangled.org/core/spindle/queue" 33 "tangled.org/core/spindle/secrets" 34 "tangled.org/core/spindle/xrpc" 35 - "tangled.org/core/tap" 36 "tangled.org/core/xrpc/serviceauth" 37 ) 38 39 //go:embed motd 40 var motd []byte 41 42 type Spindle struct { 43 - tap *tap.Client 44 db *db.DB 45 - e *rbac2.Enforcer 46 l *slog.Logger 47 n *notifier.Notifier 48 engs map[string]models.Engine ··· 57 func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 58 logger := log.FromContext(ctx) 59 60 - if err := ensureGitVersion(); err != nil { 61 - return nil, fmt.Errorf("ensuring git version: %w", err) 62 - } 63 - 64 - d, err := db.Make(ctx, cfg.Server.DBPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to setup db: %w", err) 67 } 68 69 - e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 70 if err != nil { 71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 72 } 73 74 n := notifier.New() 75 ··· 101 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 102 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 103 104 - tap := tap.NewClient(cfg.Server.TapUrl, "") 105 106 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 107 108 spindle := &Spindle{ 109 - tap: &tap, 110 e: e, 111 db: d, 112 l: logger, ··· 118 vault: vault, 119 } 120 121 - err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 122 if err != nil { 123 return nil, err 124 } ··· 127 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 128 if err != nil { 129 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 130 } 131 132 // for each incoming sh.tangled.pipeline, we execute ··· 176 } 177 178 // Enforcer returns the RBAC enforcer instance. 179 - func (s *Spindle) Enforcer() *rbac2.Enforcer { 180 return s.e 181 } 182 ··· 196 s.ks.Start(ctx) 197 }() 198 199 - go func() { 200 - s.l.Info("starting tap stream consumer") 201 - s.tap.Connect(ctx, &tap.SimpleIndexer{ 202 - EventHandler: s.processEvent, 203 - }) 204 - }() 205 - 206 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 207 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 208 } ··· 254 Config: s.cfg, 255 Resolver: s.res, 256 Vault: s.vault, 257 - Notifier: s.Notifier(), 258 ServiceAuth: serviceAuth, 259 } 260 ··· 262 } 263 264 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 265 - l := log.FromContext(ctx).With("handler", "processKnotStream") 266 - l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 267 if msg.Nsid == tangled.PipelineNSID { 268 - return nil 269 tpl := tangled.Pipeline{} 270 err := json.Unmarshal(msg.EventJson, &tpl) 271 if err != nil { ··· 286 } 287 288 // filter by repos 289 - _, err = s.db.GetRepoWithName( 290 - syntax.DID(tpl.TriggerMetadata.Repo.Did), 291 tpl.TriggerMetadata.Repo.Repo, 292 ) 293 if err != nil { 294 - return fmt.Errorf("failed to get repo: %w", err) 295 } 296 297 pipelineId := models.PipelineId{ ··· 312 Name: w.Name, 313 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 314 if err != nil { 315 - return fmt.Errorf("db.StatusFailed: %w", err) 316 } 317 318 continue ··· 326 327 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 328 if err != nil { 329 - return fmt.Errorf("init workflow: %w", err) 330 } 331 332 // inject TANGLED_* env vars after InitWorkflow ··· 343 Name: w.Name, 344 }, s.n) 345 if err != nil { 346 - return fmt.Errorf("db.StatusPending: %w", err) 347 } 348 } 349 } ··· 366 } else { 367 s.l.Error("failed to enqueue pipeline: queue is full") 368 } 369 - } else if msg.Nsid == tangled.GitRefUpdateNSID { 370 - event := tangled.GitRefUpdate{} 371 - if err := json.Unmarshal(msg.EventJson, &event); err != nil { 372 - l.Error("error unmarshalling", "err", err) 373 - return err 374 - } 375 - l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 376 - 377 - // use event.RepoAt 378 - // sync git repos in {data}/repos/{did}/sh.tangled.repo/{rkey} 379 - // if it's nil, don't run pipeline. knot needs upgrade 380 - // we will leave sh.tangled.pipeline.trigger for backward compatibility 381 - 382 - // NOTE: we are blindly trusting the knot that it will return only repos it own 383 - repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 384 - repoPath := s.newRepoPath(event.RepoDid, event.RepoName) 385 - err := sparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha) 386 - if err != nil { 387 - l.Error("failed to sync git repo", "err", err) 388 - return fmt.Errorf("sync git repo: %w", err) 389 - } 390 - l.Info("synced git repo") 391 - 392 - // TODO: plan the pipeline 393 } 394 395 return nil 396 } 397 398 - func (s *Spindle) newRepoPath(did, name string) string { 399 - return path.Join(s.cfg.Server.RepoDir(), did, name) 400 - } 401 - 402 - func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 403 - scheme := "https://" 404 - if s.cfg.Server.Dev { 405 - scheme = "http://" 406 - } 407 - return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 408 - } 409 410 - const RequiredVersion = "2.49.0" 411 - 412 - func ensureGitVersion() error { 413 - v, err := gitVersion() 414 if err != nil { 415 - return fmt.Errorf("fetching git version: %w", err) 416 } 417 - if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 418 - return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 419 - } 420 - return nil 421 - } 422 423 - // TODO: move to "git" module shared between knot, appview & spindle 424 - func gitVersion() (*version.Version, error) { 425 - var buf bytes.Buffer 426 - cmd := exec.Command("git", "version") 427 - cmd.Stdout = &buf 428 - cmd.Stderr = os.Stderr 429 - err := cmd.Run() 430 - if err != nil { 431 - return nil, err 432 - } 433 - fields := strings.Fields(buf.String()) 434 - if len(fields) < 3 { 435 - return nil, fmt.Errorf("invalid git version: %s", buf) 436 - } 437 438 - // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1" 439 - versionString := fields[2] 440 - if pos := strings.Index(versionString, "windows"); pos >= 1 { 441 - versionString = versionString[:pos-1] 442 - } 443 - return version.NewVersion(versionString) 444 - } 445 - 446 - func sparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error { 447 - exist, err := isDir(path) 448 - if err != nil { 449 - return err 450 - } 451 - if !exist { 452 - if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil { 453 - return fmt.Errorf("git clone: %w", err) 454 - } 455 - if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", `'/.tangled/workflows'`).Run(); err != nil { 456 - return fmt.Errorf("git sparse-checkout set: %w", err) 457 - } 458 - if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil { 459 - return fmt.Errorf("git checkout: %w", err) 460 } 461 - } else { 462 - if err := exec.Command("git", "-C", path, "pull", "origin", rev).Run(); err != nil { 463 - return fmt.Errorf("git pull: %w", err) 464 } 465 } 466 - return nil 467 - } 468 469 - func isDir(path string) (bool, error) { 470 - info, err := os.Stat(path) 471 - if err == nil && info.IsDir() { 472 - return true, nil 473 - } 474 - if os.IsNotExist(err) { 475 - return false, nil 476 - } 477 - return false, err 478 }
··· 1 package spindle 2 3 import ( 4 "context" 5 _ "embed" 6 "encoding/json" ··· 8 "log/slog" 9 "maps" 10 "net/http" 11 12 "github.com/go-chi/chi/v5" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/eventconsumer" 15 "tangled.org/core/eventconsumer/cursor" 16 "tangled.org/core/idresolver" 17 + "tangled.org/core/jetstream" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 + "tangled.org/core/rbac" 21 "tangled.org/core/spindle/config" 22 "tangled.org/core/spindle/db" 23 "tangled.org/core/spindle/engine" ··· 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/xrpc/serviceauth" 30 ) 31 32 //go:embed motd 33 var motd []byte 34 35 + const ( 36 + rbacDomain = "thisserver" 37 + ) 38 + 39 type Spindle struct { 40 + jc *jetstream.JetstreamClient 41 db *db.DB 42 + e *rbac.Enforcer 43 l *slog.Logger 44 n *notifier.Notifier 45 engs map[string]models.Engine ··· 54 func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 55 logger := log.FromContext(ctx) 56 57 + d, err := db.Make(cfg.Server.DBPath) 58 if err != nil { 59 return nil, fmt.Errorf("failed to setup db: %w", err) 60 } 61 62 + e, err := rbac.NewEnforcer(cfg.Server.DBPath) 63 if err != nil { 64 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 65 } 66 + e.E.EnableAutoSave(true) 67 68 n := notifier.New() 69 ··· 95 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 96 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 97 98 + collections := []string{ 99 + tangled.SpindleMemberNSID, 100 + tangled.RepoNSID, 101 + tangled.RepoCollaboratorNSID, 102 + } 103 + jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 104 + if err != nil { 105 + return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 106 + } 107 + jc.AddDid(cfg.Server.Owner) 108 + 109 + // Check if the spindle knows about any Dids; 110 + dids, err := d.GetAllDids() 111 + if err != nil { 112 + return nil, fmt.Errorf("failed to get all dids: %w", err) 113 + } 114 + for _, d := range dids { 115 + jc.AddDid(d) 116 + } 117 118 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 119 120 spindle := &Spindle{ 121 + jc: jc, 122 e: e, 123 db: d, 124 l: logger, ··· 130 vault: vault, 131 } 132 133 + err = e.AddSpindle(rbacDomain) 134 + if err != nil { 135 + return nil, fmt.Errorf("failed to set rbac domain: %w", err) 136 + } 137 + err = spindle.configureOwner() 138 if err != nil { 139 return nil, err 140 } ··· 143 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 144 if err != nil { 145 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 146 + } 147 + 148 + err = jc.StartJetstream(ctx, spindle.ingest()) 149 + if err != nil { 150 + return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 151 } 152 153 // for each incoming sh.tangled.pipeline, we execute ··· 197 } 198 199 // Enforcer returns the RBAC enforcer instance. 200 + func (s *Spindle) Enforcer() *rbac.Enforcer { 201 return s.e 202 } 203 ··· 217 s.ks.Start(ctx) 218 }() 219 220 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 221 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 222 } ··· 268 Config: s.cfg, 269 Resolver: s.res, 270 Vault: s.vault, 271 ServiceAuth: serviceAuth, 272 } 273 ··· 275 } 276 277 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 278 if msg.Nsid == tangled.PipelineNSID { 279 tpl := tangled.Pipeline{} 280 err := json.Unmarshal(msg.EventJson, &tpl) 281 if err != nil { ··· 296 } 297 298 // filter by repos 299 + _, err = s.db.GetRepo( 300 + tpl.TriggerMetadata.Repo.Knot, 301 + tpl.TriggerMetadata.Repo.Did, 302 tpl.TriggerMetadata.Repo.Repo, 303 ) 304 if err != nil { 305 + return err 306 } 307 308 pipelineId := models.PipelineId{ ··· 323 Name: w.Name, 324 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 325 if err != nil { 326 + return err 327 } 328 329 continue ··· 337 338 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 339 if err != nil { 340 + return err 341 } 342 343 // inject TANGLED_* env vars after InitWorkflow ··· 354 Name: w.Name, 355 }, s.n) 356 if err != nil { 357 + return err 358 } 359 } 360 } ··· 377 } else { 378 s.l.Error("failed to enqueue pipeline: queue is full") 379 } 380 } 381 382 return nil 383 } 384 385 + func (s *Spindle) configureOwner() error { 386 + cfgOwner := s.cfg.Server.Owner 387 388 + existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 389 if err != nil { 390 + return err 391 } 392 393 + switch len(existing) { 394 + case 0: 395 + // no owner configured, continue 396 + case 1: 397 + // find existing owner 398 + existingOwner := existing[0] 399 400 + // no ownership change, this is okay 401 + if existingOwner == s.cfg.Server.Owner { 402 + break 403 } 404 + 405 + // remove existing owner 406 + err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 407 + if err != nil { 408 + return nil 409 } 410 + default: 411 + return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 412 } 413 414 + return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 415 }
-281
spindle/tap.go
··· 1 - package spindle 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "time" 8 - 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 - "tangled.org/core/api/tangled" 11 - "tangled.org/core/eventconsumer" 12 - "tangled.org/core/spindle/db" 13 - "tangled.org/core/tap" 14 - ) 15 - 16 - func (s *Spindle) processEvent(ctx context.Context, evt tap.Event) error { 17 - l := s.l.With("component", "tapIndexer") 18 - 19 - var err error 20 - switch evt.Type { 21 - case tap.EvtRecord: 22 - switch evt.Record.Collection.String() { 23 - case tangled.SpindleMemberNSID: 24 - err = s.processMember(ctx, evt) 25 - case tangled.RepoNSID: 26 - err = s.processRepo(ctx, evt) 27 - case tangled.RepoCollaboratorNSID: 28 - err = s.processCollaborator(ctx, evt) 29 - case tangled.RepoPullNSID: 30 - err = s.processPull(ctx, evt) 31 - } 32 - case tap.EvtIdentity: 33 - // no-op 34 - } 35 - 36 - if err != nil { 37 - l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 38 - return err 39 - } 40 - return nil 41 - } 42 - 43 - // NOTE: make sure to return nil if we don't need to retry (e.g. forbidden, unrelated) 44 - 45 - func (s *Spindle) processMember(ctx context.Context, evt tap.Event) error { 46 - l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 47 - 48 - l.Info("processing spindle.member record") 49 - 50 - // check perms for this user 51 - if ok, err := s.e.IsSpindleMemberInviteAllowed(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 52 - l.Warn("forbidden request", "did", evt.Record.Did, "error", err) 53 - return nil 54 - } 55 - 56 - switch evt.Record.Action { 57 - case tap.RecordCreateAction, tap.RecordUpdateAction: 58 - record := tangled.SpindleMember{} 59 - if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 60 - return fmt.Errorf("parsing record: %w", err) 61 - } 62 - 63 - domain := s.cfg.Server.Hostname 64 - if record.Instance != domain { 65 - l.Info("domain mismatch", "domain", record.Instance, "expected", domain) 66 - return nil 67 - } 68 - 69 - created, err := time.Parse(record.CreatedAt, time.RFC3339) 70 - if err != nil { 71 - created = time.Now() 72 - } 73 - if err := db.AddSpindleMember(s.db, db.SpindleMember{ 74 - Did: evt.Record.Did, 75 - Rkey: evt.Record.Rkey.String(), 76 - Instance: record.Instance, 77 - Subject: syntax.DID(record.Subject), 78 - Created: created, 79 - }); err != nil { 80 - l.Error("failed to add member", "error", err) 81 - return fmt.Errorf("adding member to db: %w", err) 82 - } 83 - if err := s.e.AddSpindleMember(syntax.DID(record.Subject), s.cfg.Server.Did()); err != nil { 84 - return fmt.Errorf("adding member to rbac: %w", err) 85 - } 86 - if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 87 - return fmt.Errorf("adding did to tap", err) 88 - } 89 - 90 - l.Info("added member", "member", record.Subject) 91 - return nil 92 - 93 - case tap.RecordDeleteAction: 94 - var ( 95 - did = evt.Record.Did.String() 96 - rkey = evt.Record.Rkey.String() 97 - ) 98 - member, err := db.GetSpindleMember(s.db, did, rkey) 99 - if err != nil { 100 - return fmt.Errorf("finding member: %w", err) 101 - } 102 - 103 - if err := db.RemoveSpindleMember(s.db, did, rkey); err != nil { 104 - return fmt.Errorf("removing member from db: %w", err) 105 - } 106 - if err := s.e.RemoveSpindleMember(member.Subject, s.cfg.Server.Did()); err != nil { 107 - return fmt.Errorf("removing member from rbac: %w", err) 108 - } 109 - if err := s.tapSafeRemoveDid(ctx, member.Subject); err != nil { 110 - return fmt.Errorf("removing did from tap: %w", err) 111 - } 112 - 113 - l.Info("removed member", "member", member.Subject) 114 - return nil 115 - } 116 - return nil 117 - } 118 - 119 - func (s *Spindle) processCollaborator(ctx context.Context, evt tap.Event) error { 120 - l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 121 - 122 - l.Info("processing collaborator record") 123 - switch evt.Record.Action { 124 - case tap.RecordCreateAction, tap.RecordUpdateAction: 125 - record := tangled.RepoCollaborator{} 126 - if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 127 - l.Error("invalid record", "err", err) 128 - return fmt.Errorf("parsing record: %w", err) 129 - } 130 - 131 - // check perms for this user 132 - if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, syntax.ATURI(record.Repo)); !ok || err != nil { 133 - l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 134 - return nil 135 - } 136 - 137 - if err := s.db.PutRepoCollaborator(&db.RepoCollaborator{ 138 - Did: evt.Record.Did, 139 - Rkey: evt.Record.Rkey, 140 - Repo: syntax.ATURI(record.Repo), 141 - Subject: syntax.DID(record.Subject), 142 - }); err != nil { 143 - return fmt.Errorf("adding collaborator to db: %w", err) 144 - } 145 - if err := s.e.AddRepoCollaborator(syntax.DID(record.Subject), syntax.ATURI(record.Repo)); err != nil { 146 - return fmt.Errorf("adding collaborator to rbac: %w", err) 147 - } 148 - if err := s.tap.AddRepos(ctx, []syntax.DID{syntax.DID(record.Subject)}); err != nil { 149 - return fmt.Errorf("adding did to tap: %w", err) 150 - } 151 - 152 - l.Info("add repo collaborator", "subejct", record.Subject, "repo", record.Repo) 153 - return nil 154 - 155 - case tap.RecordDeleteAction: 156 - // get existing collaborator 157 - collaborator, err := s.db.GetRepoCollaborator(evt.Record.Did, evt.Record.Rkey) 158 - if err != nil { 159 - return fmt.Errorf("failed to get existing collaborator info: %w", err) 160 - } 161 - 162 - // check perms for this user 163 - if ok, err := s.e.IsRepoCollaboratorInviteAllowed(evt.Record.Did, collaborator.Repo); !ok || err != nil { 164 - l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 165 - return nil 166 - } 167 - 168 - if err := s.db.RemoveRepoCollaborator(collaborator.Subject, collaborator.Rkey); err != nil { 169 - return fmt.Errorf("removing collaborator from db: %w", err) 170 - } 171 - if err := s.e.RemoveRepoCollaborator(collaborator.Subject, collaborator.Repo); err != nil { 172 - return fmt.Errorf("removing collaborator from rbac: %w", err) 173 - } 174 - if err := s.tapSafeRemoveDid(ctx, collaborator.Subject); err != nil { 175 - return fmt.Errorf("removing did from tap: %w", err) 176 - } 177 - 178 - l.Info("removed repo collaborator", "subejct", collaborator.Subject, "repo", collaborator.Repo) 179 - return nil 180 - } 181 - return nil 182 - } 183 - 184 - func (s *Spindle) processRepo(ctx context.Context, evt tap.Event) error { 185 - l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 186 - 187 - l.Info("processing repo record") 188 - 189 - // check perms for this user 190 - if ok, err := s.e.IsSpindleMember(evt.Record.Did, s.cfg.Server.Did()); !ok || err != nil { 191 - l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 192 - return nil 193 - } 194 - 195 - switch evt.Record.Action { 196 - case tap.RecordCreateAction, tap.RecordUpdateAction: 197 - record := tangled.Repo{} 198 - if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 199 - return fmt.Errorf("parsing record: %w", err) 200 - } 201 - 202 - domain := s.cfg.Server.Hostname 203 - if record.Spindle == nil || *record.Spindle != domain { 204 - if record.Spindle == nil { 205 - l.Info("spindle isn't configured", "name", record.Name) 206 - } else { 207 - l.Info("different spindle configured", "name", record.Name, "spindle", *record.Spindle, "domain", domain) 208 - } 209 - if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 210 - return fmt.Errorf("deleting repo from db: %w", err) 211 - } 212 - return nil 213 - } 214 - 215 - if err := s.db.PutRepo(&db.Repo{ 216 - Did: evt.Record.Did, 217 - Rkey: evt.Record.Rkey, 218 - Name: record.Name, 219 - Knot: record.Knot, 220 - }); err != nil { 221 - return fmt.Errorf("adding repo to db: %w", err) 222 - } 223 - 224 - if err := s.e.AddRepo(evt.Record.AtUri()); err != nil { 225 - return fmt.Errorf("adding repo to rbac") 226 - } 227 - 228 - // add this knot to the event consumer 229 - src := eventconsumer.NewKnotSource(record.Knot) 230 - s.ks.AddSource(context.Background(), src) 231 - 232 - l.Info("added repo", "repo", evt.Record.AtUri()) 233 - return nil 234 - 235 - case tap.RecordDeleteAction: 236 - // check perms for this user 237 - if ok, err := s.e.IsRepoOwner(evt.Record.Did, evt.Record.AtUri()); !ok || err != nil { 238 - l.Warn("forbidden request", "did", evt.Record.Did, "err", err) 239 - return nil 240 - } 241 - 242 - if err := s.db.DeleteRepo(evt.Record.Did, evt.Record.Rkey); err != nil { 243 - return fmt.Errorf("deleting repo from db: %w", err) 244 - } 245 - 246 - if err := s.e.DeleteRepo(evt.Record.AtUri()); err != nil { 247 - return fmt.Errorf("deleting repo from rbac: %w", err) 248 - } 249 - 250 - l.Info("deleted repo", "repo", evt.Record.AtUri()) 251 - return nil 252 - } 253 - return nil 254 - } 255 - 256 - func (s *Spindle) processPull(ctx context.Context, evt tap.Event) error { 257 - l := s.l.With("component", "tapIndexer", "record", evt.Record.AtUri()) 258 - 259 - l.Info("processing pull record") 260 - 261 - switch evt.Record.Action { 262 - case tap.RecordCreateAction, tap.RecordUpdateAction: 263 - // TODO 264 - case tap.RecordDeleteAction: 265 - // TODO 266 - } 267 - return nil 268 - } 269 - 270 - func (s *Spindle) tapSafeRemoveDid(ctx context.Context, did syntax.DID) error { 271 - known, err := s.db.IsKnownDid(syntax.DID(did)) 272 - if err != nil { 273 - return fmt.Errorf("ensuring did known state: %w", err) 274 - } 275 - if !known { 276 - if err := s.tap.RemoveRepos(ctx, []syntax.DID{did}); err != nil { 277 - return fmt.Errorf("removing did from tap: %w", err) 278 - } 279 - } 280 - return nil 281 - }
···
+2 -1
spindle/xrpc/add_secret.go
··· 11 "github.com/bluesky-social/indigo/xrpc" 12 securejoin "github.com/cyphar/filepath-securejoin" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/spindle/secrets" 15 xrpcerr "tangled.org/core/xrpc/errors" 16 ) ··· 67 return 68 } 69 70 - if ok, err := x.Enforcer.IsRepoSettingsWriteAllowed(actorDid, repoAt); !ok || err != nil { 71 l.Error("insufficent permissions", "did", actorDid.String()) 72 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 73 return
··· 11 "github.com/bluesky-social/indigo/xrpc" 12 securejoin "github.com/cyphar/filepath-securejoin" 13 "tangled.org/core/api/tangled" 14 + "tangled.org/core/rbac" 15 "tangled.org/core/spindle/secrets" 16 xrpcerr "tangled.org/core/xrpc/errors" 17 ) ··· 68 return 69 } 70 71 + if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil { 72 l.Error("insufficent permissions", "did", actorDid.String()) 73 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 74 return
+2 -1
spindle/xrpc/list_secrets.go
··· 11 "github.com/bluesky-social/indigo/xrpc" 12 securejoin "github.com/cyphar/filepath-securejoin" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/spindle/secrets" 15 xrpcerr "tangled.org/core/xrpc/errors" 16 ) ··· 62 return 63 } 64 65 - if ok, err := x.Enforcer.IsRepoSettingsWriteAllowed(actorDid, repoAt); !ok || err != nil { 66 l.Error("insufficent permissions", "did", actorDid.String()) 67 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 68 return
··· 11 "github.com/bluesky-social/indigo/xrpc" 12 securejoin "github.com/cyphar/filepath-securejoin" 13 "tangled.org/core/api/tangled" 14 + "tangled.org/core/rbac" 15 "tangled.org/core/spindle/secrets" 16 xrpcerr "tangled.org/core/xrpc/errors" 17 ) ··· 63 return 64 } 65 66 + if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil { 67 l.Error("insufficent permissions", "did", actorDid.String()) 68 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 69 return
+1 -1
spindle/xrpc/owner.go
··· 9 ) 10 11 func (x *Xrpc) Owner(w http.ResponseWriter, r *http.Request) { 12 - owner := x.Config.Server.Owner.String() 13 if owner == "" { 14 writeError(w, xrpcerr.OwnerNotFoundError, http.StatusInternalServerError) 15 return
··· 9 ) 10 11 func (x *Xrpc) Owner(w http.ResponseWriter, r *http.Request) { 12 + owner := x.Config.Server.Owner 13 if owner == "" { 14 writeError(w, xrpcerr.OwnerNotFoundError, http.StatusInternalServerError) 15 return
-72
spindle/xrpc/pipeline_cancelPipeline.go
··· 1 - package xrpc 2 - 3 - import ( 4 - "encoding/json" 5 - "fmt" 6 - "net/http" 7 - "strings" 8 - 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 - "tangled.org/core/api/tangled" 11 - "tangled.org/core/spindle/models" 12 - xrpcerr "tangled.org/core/xrpc/errors" 13 - ) 14 - 15 - func (x *Xrpc) CancelPipeline(w http.ResponseWriter, r *http.Request) { 16 - l := x.Logger 17 - fail := func(e xrpcerr.XrpcError) { 18 - l.Error("failed", "kind", e.Tag, "error", e.Message) 19 - writeError(w, e, http.StatusBadRequest) 20 - } 21 - l.Debug("cancel pipeline") 22 - 23 - actorDid, ok := r.Context().Value(ActorDid).(syntax.DID) 24 - if !ok { 25 - fail(xrpcerr.MissingActorDidError) 26 - return 27 - } 28 - 29 - var input tangled.PipelineCancelPipeline_Input 30 - if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 31 - fail(xrpcerr.GenericError(err)) 32 - return 33 - } 34 - 35 - aturi := syntax.ATURI(input.Pipeline) 36 - wid := models.WorkflowId{ 37 - PipelineId: models.PipelineId{ 38 - Knot: strings.TrimPrefix(aturi.Authority().String(), "did:web:"), 39 - Rkey: aturi.RecordKey().String(), 40 - }, 41 - Name: input.Workflow, 42 - } 43 - l.Debug("cancel pipeline", "wid", wid) 44 - 45 - // unfortunately we have to resolve repo-at here 46 - repoAt, err := syntax.ParseATURI(input.Repo) 47 - if err != nil { 48 - fail(xrpcerr.InvalidRepoError(input.Repo)) 49 - return 50 - } 51 - 52 - isRepoOwner, err := x.Enforcer.IsRepoOwner(actorDid, repoAt) 53 - if err != nil || !isRepoOwner { 54 - fail(xrpcerr.AccessControlError(actorDid.String())) 55 - return 56 - } 57 - for _, engine := range x.Engines { 58 - l.Debug("destorying workflow", "wid", wid) 59 - err = engine.DestroyWorkflow(r.Context(), wid) 60 - if err != nil { 61 - fail(xrpcerr.GenericError(fmt.Errorf("dailed to destroy workflow: %w", err))) 62 - return 63 - } 64 - err = x.Db.StatusCancelled(wid, "User canceled the workflow", -1, x.Notifier) 65 - if err != nil { 66 - fail(xrpcerr.GenericError(fmt.Errorf("dailed to emit status failed: %w", err))) 67 - return 68 - } 69 - } 70 - 71 - w.WriteHeader(http.StatusOK) 72 - }
···
+2 -1
spindle/xrpc/remove_secret.go
··· 10 "github.com/bluesky-social/indigo/xrpc" 11 securejoin "github.com/cyphar/filepath-securejoin" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/spindle/secrets" 14 xrpcerr "tangled.org/core/xrpc/errors" 15 ) ··· 61 return 62 } 63 64 - if ok, err := x.Enforcer.IsRepoSettingsWriteAllowed(actorDid, repoAt); !ok || err != nil { 65 l.Error("insufficent permissions", "did", actorDid.String()) 66 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 67 return
··· 10 "github.com/bluesky-social/indigo/xrpc" 11 securejoin "github.com/cyphar/filepath-securejoin" 12 "tangled.org/core/api/tangled" 13 + "tangled.org/core/rbac" 14 "tangled.org/core/spindle/secrets" 15 xrpcerr "tangled.org/core/xrpc/errors" 16 ) ··· 62 return 63 } 64 65 + if ok, err := x.Enforcer.IsSettingsAllowed(actorDid.String(), rbac.ThisServer, didPath); !ok || err != nil { 66 l.Error("insufficent permissions", "did", actorDid.String()) 67 writeError(w, xrpcerr.AccessControlError(actorDid.String()), http.StatusUnauthorized) 68 return
+2 -5
spindle/xrpc/xrpc.go
··· 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/idresolver" 13 - "tangled.org/core/notifier" 14 - "tangled.org/core/rbac2" 15 "tangled.org/core/spindle/config" 16 "tangled.org/core/spindle/db" 17 "tangled.org/core/spindle/models" ··· 25 type Xrpc struct { 26 Logger *slog.Logger 27 Db *db.DB 28 - Enforcer *rbac2.Enforcer 29 Engines map[string]models.Engine 30 Config *config.Config 31 Resolver *idresolver.Resolver 32 Vault secrets.Manager 33 - Notifier *notifier.Notifier 34 ServiceAuth *serviceauth.ServiceAuth 35 } 36 ··· 43 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 44 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 45 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 46 - r.Post("/"+tangled.PipelineCancelPipelineNSID, x.CancelPipeline) 47 }) 48 49 // service query endpoints (no auth required)
··· 10 11 "tangled.org/core/api/tangled" 12 "tangled.org/core/idresolver" 13 + "tangled.org/core/rbac" 14 "tangled.org/core/spindle/config" 15 "tangled.org/core/spindle/db" 16 "tangled.org/core/spindle/models" ··· 24 type Xrpc struct { 25 Logger *slog.Logger 26 Db *db.DB 27 + Enforcer *rbac.Enforcer 28 Engines map[string]models.Engine 29 Config *config.Config 30 Resolver *idresolver.Resolver 31 Vault secrets.Manager 32 ServiceAuth *serviceauth.ServiceAuth 33 } 34 ··· 41 r.Post("/"+tangled.RepoAddSecretNSID, x.AddSecret) 42 r.Post("/"+tangled.RepoRemoveSecretNSID, x.RemoveSecret) 43 r.Get("/"+tangled.RepoListSecretsNSID, x.ListSecrets) 44 }) 45 46 // service query endpoints (no auth required)
-18
tap/simpleIndexer.go
··· 1 - package tap 2 - 3 - import "context" 4 - 5 - type SimpleIndexer struct { 6 - EventHandler func(ctx context.Context, evt Event) error 7 - ErrorHandler func(ctx context.Context, err error) 8 - } 9 - 10 - var _ Handler = (*SimpleIndexer)(nil) 11 - 12 - func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error { 13 - return i.EventHandler(ctx, evt) 14 - } 15 - 16 - func (i *SimpleIndexer) OnError(ctx context.Context, err error) { 17 - i.ErrorHandler(ctx, err) 18 - }
···
-169
tap/tap.go
··· 1 - /// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 - 3 - package tap 4 - 5 - import ( 6 - "bytes" 7 - "context" 8 - "encoding/json" 9 - "fmt" 10 - "net/http" 11 - "net/url" 12 - 13 - "github.com/bluesky-social/indigo/atproto/syntax" 14 - "github.com/gorilla/websocket" 15 - "tangled.org/core/log" 16 - ) 17 - 18 - // type WebsocketOptions struct { 19 - // maxReconnectSeconds int 20 - // heartbeatIntervalMs int 21 - // // onReconnectError 22 - // } 23 - 24 - type Handler interface { 25 - OnEvent(ctx context.Context, evt Event) error 26 - OnError(ctx context.Context, err error) 27 - } 28 - 29 - type Client struct { 30 - Url string 31 - AdminPassword string 32 - HTTPClient *http.Client 33 - } 34 - 35 - func NewClient(url, adminPassword string) Client { 36 - return Client{ 37 - Url: url, 38 - AdminPassword: adminPassword, 39 - HTTPClient: &http.Client{}, 40 - } 41 - } 42 - 43 - func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 44 - body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 45 - if err != nil { 46 - return err 47 - } 48 - req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 49 - if err != nil { 50 - return err 51 - } 52 - req.SetBasicAuth("admin", c.AdminPassword) 53 - req.Header.Set("Content-Type", "application/json") 54 - 55 - resp, err := c.HTTPClient.Do(req) 56 - if err != nil { 57 - return err 58 - } 59 - defer resp.Body.Close() 60 - if resp.StatusCode != http.StatusOK { 61 - return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 62 - } 63 - return nil 64 - } 65 - 66 - func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 67 - body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 68 - if err != nil { 69 - return err 70 - } 71 - req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 72 - if err != nil { 73 - return err 74 - } 75 - req.SetBasicAuth("admin", c.AdminPassword) 76 - req.Header.Set("Content-Type", "application/json") 77 - 78 - resp, err := c.HTTPClient.Do(req) 79 - if err != nil { 80 - return err 81 - } 82 - defer resp.Body.Close() 83 - if resp.StatusCode != http.StatusOK { 84 - return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 85 - } 86 - return nil 87 - } 88 - 89 - func (c *Client) Connect(ctx context.Context, handler Handler) error { 90 - l := log.FromContext(ctx) 91 - 92 - u, err := url.Parse(c.Url) 93 - if err != nil { 94 - return err 95 - } 96 - if u.Scheme == "https" { 97 - u.Scheme = "wss" 98 - } else { 99 - u.Scheme = "ws" 100 - } 101 - u.Path = "/channel" 102 - 103 - // TODO: set auth on dial 104 - 105 - url := u.String() 106 - 107 - // var backoff int 108 - // for { 109 - // select { 110 - // case <-ctx.Done(): 111 - // return ctx.Err() 112 - // default: 113 - // } 114 - // 115 - // header := http.Header{ 116 - // "Authorization": []string{""}, 117 - // } 118 - // conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 119 - // if err != nil { 120 - // l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 121 - // time.Sleep(time.Duration(5+backoff) * time.Second) 122 - // backoff++ 123 - // 124 - // continue 125 - // } else { 126 - // backoff = 0 127 - // } 128 - // 129 - // l.Info("event subscription response", "code", res.StatusCode) 130 - // } 131 - 132 - // TODO: keep websocket connection alive 133 - conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil) 134 - if err != nil { 135 - return err 136 - } 137 - defer conn.Close() 138 - 139 - for { 140 - select { 141 - case <-ctx.Done(): 142 - return ctx.Err() 143 - default: 144 - } 145 - _, message, err := conn.ReadMessage() 146 - if err != nil { 147 - return err 148 - } 149 - 150 - var ev Event 151 - if err := json.Unmarshal(message, &ev); err != nil { 152 - handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 153 - continue 154 - } 155 - if err := handler.OnEvent(ctx, ev); err != nil { 156 - handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 157 - continue 158 - } 159 - 160 - ack := map[string]any{ 161 - "type": "ack", 162 - "id": ev.ID, 163 - } 164 - if err := conn.WriteJSON(ack); err != nil { 165 - l.Warn("failed to send ack", "err", err) 166 - continue 167 - } 168 - } 169 - }
···
-62
tap/types.go
··· 1 - package tap 2 - 3 - import ( 4 - "encoding/json" 5 - "fmt" 6 - 7 - "github.com/bluesky-social/indigo/atproto/syntax" 8 - ) 9 - 10 - type EventType string 11 - 12 - const ( 13 - EvtRecord EventType = "record" 14 - EvtIdentity EventType = "identity" 15 - ) 16 - 17 - type Event struct { 18 - ID int64 `json:"id"` 19 - Type EventType `json:"type"` 20 - Record *RecordEventData `json:"record,omitempty"` 21 - Identity *IdentityEventData `json:"identity,omitempty"` 22 - } 23 - 24 - type RecordEventData struct { 25 - Live bool `json:"live"` 26 - Did syntax.DID `json:"did"` 27 - Rev string `json:"rev"` 28 - Collection syntax.NSID `json:"collection"` 29 - Rkey syntax.RecordKey `json:"rkey"` 30 - Action RecordAction `json:"action"` 31 - Record json.RawMessage `json:"record,omitempty"` 32 - CID *syntax.CID `json:"cid,omitempty"` 33 - } 34 - 35 - func (r *RecordEventData) AtUri() syntax.ATURI { 36 - return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey)) 37 - } 38 - 39 - type RecordAction string 40 - 41 - const ( 42 - RecordCreateAction RecordAction = "create" 43 - RecordUpdateAction RecordAction = "update" 44 - RecordDeleteAction RecordAction = "delete" 45 - ) 46 - 47 - type IdentityEventData struct { 48 - DID syntax.DID `json:"did"` 49 - Handle string `json:"handle"` 50 - IsActive bool `json:"is_active"` 51 - Status RepoStatus `json:"status"` 52 - } 53 - 54 - type RepoStatus string 55 - 56 - const ( 57 - RepoStatusActive RepoStatus = "active" 58 - RepoStatusTakendown RepoStatus = "takendown" 59 - RepoStatusSuspended RepoStatus = "suspended" 60 - RepoStatusDeactivated RepoStatus = "deactivated" 61 - RepoStatusDeleted RepoStatus = "deleted" 62 - )
···