forked from tangled.org/core
Monorepo for Tangled

spindle: sync workflow files on `sh.tangled.git.refUpdate`

Spindle will listen to `sh.tangled.git.refUpdate` event from knot
stream and sync its local git repo instead. Spindle's git repo will
sparse-checkout only `/.tangled/workflows` directory.

Spindle now requires git version >=2.49 for `--revision` flag in `git
clone` command.

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me cfb9d1fe 4fd20b2b

verified
Changed files
+135
nix
spindle
+1
go.mod
··· 131 131 github.com/hashicorp/go-secure-stdlib/parseutil v0.2.0 // indirect 132 132 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect 133 133 github.com/hashicorp/go-sockaddr v1.0.7 // indirect 134 + github.com/hashicorp/go-version v1.8.0 // indirect 134 135 github.com/hashicorp/golang-lru v1.0.2 // indirect 135 136 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 136 137 github.com/hashicorp/hcl v1.0.1-vault-7 // indirect
+2
go.sum
··· 264 264 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= 265 265 github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= 266 266 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= 267 + github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= 268 + github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= 267 269 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 268 270 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 269 271 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+3
nix/gomod2nix.toml
··· 304 304 [mod."github.com/hashicorp/go-sockaddr"] 305 305 version = "v1.0.7" 306 306 hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs=" 307 + [mod."github.com/hashicorp/go-version"] 308 + version = "v1.8.0" 309 + hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8=" 307 310 [mod."github.com/hashicorp/golang-lru"] 308 311 version = "v1.0.2" 309 312 hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
+4
nix/modules/spindle.nix
··· 1 1 { 2 2 config, 3 + pkgs, 3 4 lib, 4 5 ... 5 6 }: let ··· 149 150 description = "spindle service"; 150 151 after = ["network.target" "docker.service"]; 151 152 wantedBy = ["multi-user.target"]; 153 + path = [ 154 + pkgs.git 155 + ]; 152 156 serviceConfig = { 153 157 LogsDirectory = "spindle"; 154 158 StateDirectory = "spindle";
+6
spindle/config/config.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "path" 6 7 7 8 "github.com/bluesky-social/indigo/atproto/syntax" 8 9 "github.com/sethvargo/go-envconfig" ··· 19 20 Owner syntax.DID `env:"OWNER, required"` 20 21 Secrets Secrets `env:",prefix=SECRETS_"` 21 22 LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 23 + DataDir string `env:"DATA_DIR, default=/var/lib/spindle"` 22 24 QueueSize int `env:"QUEUE_SIZE, default=100"` 23 25 MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 24 26 } 25 27 26 28 func (s Server) Did() syntax.DID { 27 29 return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname)) 30 + } 31 + 32 + func (s Server) RepoDir() string { 33 + return path.Join(s.DataDir, "repos") 28 34 } 29 35 30 36 type Secrets struct {
+119
spindle/server.go
··· 1 1 package spindle 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 _ "embed" 6 7 "encoding/json" ··· 8 9 "log/slog" 9 10 "maps" 10 11 "net/http" 12 + "os" 13 + "os/exec" 14 + "path" 15 + "strings" 11 16 12 17 "github.com/bluesky-social/indigo/atproto/syntax" 13 18 "github.com/go-chi/chi/v5" 19 + "github.com/hashicorp/go-version" 14 20 "tangled.org/core/api/tangled" 15 21 "tangled.org/core/eventconsumer" 16 22 "tangled.org/core/eventconsumer/cursor" ··· 50 56 // New creates a new Spindle server with the provided configuration and engines. 51 57 func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 52 58 logger := log.FromContext(ctx) 59 + 60 + if err := ensureGitVersion(); err != nil { 61 + return nil, fmt.Errorf("ensuring git version: %w", err) 62 + } 53 63 54 64 d, err := db.Make(ctx, cfg.Server.DBPath) 55 65 if err != nil { ··· 252 262 } 253 263 254 264 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 265 + l := log.FromContext(ctx).With("handler", "processKnotStream") 266 + l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 255 267 if msg.Nsid == tangled.PipelineNSID { 268 + return nil 256 269 tpl := tangled.Pipeline{} 257 270 err := json.Unmarshal(msg.EventJson, &tpl) 258 271 if err != nil { ··· 353 366 } else { 354 367 s.l.Error("failed to enqueue pipeline: queue is full") 355 368 } 369 + } else if msg.Nsid == tangled.GitRefUpdateNSID { 370 + event := tangled.GitRefUpdate{} 371 + if err := json.Unmarshal(msg.EventJson, &event); err != nil { 372 + l.Error("error unmarshalling", "err", err) 373 + return err 374 + } 375 + l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 376 + 377 + // use event.RepoAt 378 + // sync git repos in {data}/repos/{did}/sh.tangled.repo/{rkey} 379 + // if it's nil, don't run pipeline. knot needs upgrade 380 + // we will leave sh.tangled.pipeline.trigger for backward compatibility 381 + 382 + // NOTE: we are blindly trusting the knot that it will return only repos it own 383 + repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 384 + repoPath := s.newRepoPath(event.RepoDid, event.RepoName) 385 + err := sparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha) 386 + if err != nil { 387 + l.Error("failed to sync git repo", "err", err) 388 + return fmt.Errorf("sync git repo: %w", err) 389 + } 390 + l.Info("synced git repo") 391 + 392 + // TODO: plan the pipeline 356 393 } 357 394 358 395 return nil 359 396 } 397 + 398 + func (s *Spindle) newRepoPath(did, name string) string { 399 + return path.Join(s.cfg.Server.RepoDir(), did, name) 400 + } 401 + 402 + func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 403 + scheme := "https://" 404 + if s.cfg.Server.Dev { 405 + scheme = "http://" 406 + } 407 + return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 408 + } 409 + 410 + const RequiredVersion = "2.49.0" 411 + 412 + func ensureGitVersion() error { 413 + v, err := gitVersion() 414 + if err != nil { 415 + return fmt.Errorf("fetching git version: %w", err) 416 + } 417 + if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 418 + return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 419 + } 420 + return nil 421 + } 422 + 423 + // TODO: move to "git" module shared between knot, appview & spindle 424 + func gitVersion() (*version.Version, error) { 425 + var buf bytes.Buffer 426 + cmd := exec.Command("git", "version") 427 + cmd.Stdout = &buf 428 + cmd.Stderr = os.Stderr 429 + err := cmd.Run() 430 + if err != nil { 431 + return nil, err 432 + } 433 + fields := strings.Fields(buf.String()) 434 + if len(fields) < 3 { 435 + return nil, fmt.Errorf("invalid git version: %s", buf) 436 + } 437 + 438 + // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1" 439 + versionString := fields[2] 440 + if pos := strings.Index(versionString, "windows"); pos >= 1 { 441 + versionString = versionString[:pos-1] 442 + } 443 + return version.NewVersion(versionString) 444 + } 445 + 446 + func sparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error { 447 + exist, err := isDir(path) 448 + if err != nil { 449 + return err 450 + } 451 + if !exist { 452 + if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil { 453 + return fmt.Errorf("git clone: %w", err) 454 + } 455 + if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", `'/.tangled/workflows'`).Run(); err != nil { 456 + return fmt.Errorf("git sparse-checkout set: %w", err) 457 + } 458 + if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil { 459 + return fmt.Errorf("git checkout: %w", err) 460 + } 461 + } else { 462 + if err := exec.Command("git", "-C", path, "pull", "origin", rev).Run(); err != nil { 463 + return fmt.Errorf("git pull: %w", err) 464 + } 465 + } 466 + return nil 467 + } 468 + 469 + func isDir(path string) (bool, error) { 470 + info, err := os.Stat(path) 471 + if err == nil && info.IsDir() { 472 + return true, nil 473 + } 474 + if os.IsNotExist(err) { 475 + return false, nil 476 + } 477 + return false, err 478 + }