forked from tangled.org/core
Monorepo for Tangled
1package spindle 2 3import ( 4 "bytes" 5 "context" 6 _ "embed" 7 "encoding/json" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "os" 13 "os/exec" 14 "path" 15 "strings" 16 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 "github.com/go-chi/chi/v5" 19 "github.com/hashicorp/go-version" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/eventconsumer" 22 "tangled.org/core/eventconsumer/cursor" 23 "tangled.org/core/idresolver" 24 "tangled.org/core/log" 25 "tangled.org/core/notifier" 26 "tangled.org/core/rbac2" 27 "tangled.org/core/spindle/config" 28 "tangled.org/core/spindle/db" 29 "tangled.org/core/spindle/engine" 30 "tangled.org/core/spindle/engines/nixery" 31 "tangled.org/core/spindle/models" 32 "tangled.org/core/spindle/queue" 33 "tangled.org/core/spindle/secrets" 34 "tangled.org/core/spindle/xrpc" 35 "tangled.org/core/tap" 36 "tangled.org/core/xrpc/serviceauth" 37) 38 39//go:embed motd 40var motd []byte 41 42type Spindle struct { 43 tap *tap.Client 44 db *db.DB 45 e *rbac2.Enforcer 46 l *slog.Logger 47 n *notifier.Notifier 48 engs map[string]models.Engine 49 jq *queue.Queue 50 cfg *config.Config 51 ks *eventconsumer.Consumer 52 res *idresolver.Resolver 53 vault secrets.Manager 54} 55 56// New creates a new Spindle server with the provided configuration and engines. 57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 58 logger := log.FromContext(ctx) 59 60 if err := ensureGitVersion(); err != nil { 61 return nil, fmt.Errorf("ensuring git version: %w", err) 62 } 63 64 d, err := db.Make(ctx, cfg.Server.DBPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to setup db: %w", err) 67 } 68 69 e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 70 if err != nil { 71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 72 } 73 74 n := notifier.New() 75 76 var vault secrets.Manager 77 switch cfg.Server.Secrets.Provider { 78 case "openbao": 79 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 80 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 81 } 82 vault, err = secrets.NewOpenBaoManager( 83 cfg.Server.Secrets.OpenBao.ProxyAddr, 84 logger, 85 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 86 ) 87 if err != nil { 88 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 89 } 90 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 91 case "sqlite", "": 92 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 93 if err != nil { 94 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 95 } 96 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 97 default: 98 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 99 } 100 101 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 102 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 103 104 tap := tap.NewClient(cfg.Server.TapUrl, "") 105 106 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 107 108 spindle := &Spindle{ 109 tap: &tap, 110 e: e, 111 db: d, 112 l: logger, 113 n: &n, 114 engs: engines, 115 jq: jq, 116 cfg: cfg, 117 res: resolver, 118 vault: vault, 119 } 120 121 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 122 if err != nil { 123 return nil, err 124 } 125 logger.Info("owner set", "did", cfg.Server.Owner) 126 127 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 128 if err != nil { 129 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 130 } 131 132 // for each incoming sh.tangled.pipeline, we execute 133 // spindle.processPipeline, which in turn enqueues the pipeline 134 // job in the above registered queue. 135 ccfg := eventconsumer.NewConsumerConfig() 136 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 137 ccfg.Dev = cfg.Server.Dev 138 ccfg.ProcessFunc = spindle.processPipeline 139 ccfg.CursorStore = cursorStore 140 knownKnots, err := d.Knots() 141 if err != nil { 142 return nil, err 143 } 144 for _, knot := range knownKnots { 145 logger.Info("adding source start", "knot", knot) 146 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 147 } 148 spindle.ks = eventconsumer.NewConsumer(*ccfg) 149 150 return spindle, nil 151} 152 153// DB returns the database instance. 154func (s *Spindle) DB() *db.DB { 155 return s.db 156} 157 158// Queue returns the job queue instance. 159func (s *Spindle) Queue() *queue.Queue { 160 return s.jq 161} 162 163// Engines returns the map of available engines. 164func (s *Spindle) Engines() map[string]models.Engine { 165 return s.engs 166} 167 168// Vault returns the secrets manager instance. 169func (s *Spindle) Vault() secrets.Manager { 170 return s.vault 171} 172 173// Notifier returns the notifier instance. 174func (s *Spindle) Notifier() *notifier.Notifier { 175 return s.n 176} 177 178// Enforcer returns the RBAC enforcer instance. 179func (s *Spindle) Enforcer() *rbac2.Enforcer { 180 return s.e 181} 182 183// Start starts the Spindle server (blocking). 184func (s *Spindle) Start(ctx context.Context) error { 185 // starts a job queue runner in the background 186 s.jq.Start() 187 defer s.jq.Stop() 188 189 // Stop vault token renewal if it implements Stopper 190 if stopper, ok := s.vault.(secrets.Stopper); ok { 191 defer stopper.Stop() 192 } 193 194 go func() { 195 s.l.Info("starting knot event consumer") 196 s.ks.Start(ctx) 197 }() 198 199 go func() { 200 s.l.Info("starting tap stream consumer") 201 s.tap.Connect(ctx, &tap.SimpleIndexer{ 202 EventHandler: s.processEvent, 203 }) 204 }() 205 206 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 207 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 208} 209 210func Run(ctx context.Context) error { 211 cfg, err := config.Load(ctx) 212 if err != nil { 213 return fmt.Errorf("failed to load config: %w", err) 214 } 215 216 nixeryEng, err := nixery.New(ctx, cfg) 217 if err != nil { 218 return err 219 } 220 221 s, err := New(ctx, cfg, map[string]models.Engine{ 222 "nixery": nixeryEng, 223 }) 224 if err != nil { 225 return err 226 } 227 228 return s.Start(ctx) 229} 230 231func (s *Spindle) Router() http.Handler { 232 mux := chi.NewRouter() 233 234 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 235 w.Write(motd) 236 }) 237 mux.HandleFunc("/events", s.Events) 238 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 239 240 mux.Mount("/xrpc", s.XrpcRouter()) 241 return mux 242} 243 244func (s *Spindle) XrpcRouter() http.Handler { 245 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 246 247 l := log.SubLogger(s.l, "xrpc") 248 249 x := xrpc.Xrpc{ 250 Logger: l, 251 Db: s.db, 252 Enforcer: s.e, 253 Engines: s.engs, 254 Config: s.cfg, 255 Resolver: s.res, 256 Vault: s.vault, 257 Notifier: s.Notifier(), 258 ServiceAuth: serviceAuth, 259 } 260 261 return x.Router() 262} 263 264func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 265 l := log.FromContext(ctx).With("handler", "processKnotStream") 266 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 267 if msg.Nsid == tangled.PipelineNSID { 268 return nil 269 tpl := tangled.Pipeline{} 270 err := json.Unmarshal(msg.EventJson, &tpl) 271 if err != nil { 272 fmt.Println("error unmarshalling", err) 273 return err 274 } 275 276 if tpl.TriggerMetadata == nil { 277 return fmt.Errorf("no trigger metadata found") 278 } 279 280 if tpl.TriggerMetadata.Repo == nil { 281 return fmt.Errorf("no repo data found") 282 } 283 284 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 285 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 286 } 287 288 // filter by repos 289 _, err = s.db.GetRepoWithName( 290 syntax.DID(tpl.TriggerMetadata.Repo.Did), 291 tpl.TriggerMetadata.Repo.Repo, 292 ) 293 if err != nil { 294 return fmt.Errorf("failed to get repo: %w", err) 295 } 296 297 pipelineId := models.PipelineId{ 298 Knot: src.Key(), 299 Rkey: msg.Rkey, 300 } 301 302 workflows := make(map[models.Engine][]models.Workflow) 303 304 // Build pipeline environment variables once for all workflows 305 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 306 307 for _, w := range tpl.Workflows { 308 if w != nil { 309 if _, ok := s.engs[w.Engine]; !ok { 310 err = s.db.StatusFailed(models.WorkflowId{ 311 PipelineId: pipelineId, 312 Name: w.Name, 313 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 314 if err != nil { 315 return fmt.Errorf("db.StatusFailed: %w", err) 316 } 317 318 continue 319 } 320 321 eng := s.engs[w.Engine] 322 323 if _, ok := workflows[eng]; !ok { 324 workflows[eng] = []models.Workflow{} 325 } 326 327 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 328 if err != nil { 329 return fmt.Errorf("init workflow: %w", err) 330 } 331 332 // inject TANGLED_* env vars after InitWorkflow 333 // This prevents user-defined env vars from overriding them 334 if ewf.Environment == nil { 335 ewf.Environment = make(map[string]string) 336 } 337 maps.Copy(ewf.Environment, pipelineEnv) 338 339 workflows[eng] = append(workflows[eng], *ewf) 340 341 err = s.db.StatusPending(models.WorkflowId{ 342 PipelineId: pipelineId, 343 Name: w.Name, 344 }, s.n) 345 if err != nil { 346 return fmt.Errorf("db.StatusPending: %w", err) 347 } 348 } 349 } 350 351 ok := s.jq.Enqueue(queue.Job{ 352 Run: func() error { 353 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 354 RepoOwner: tpl.TriggerMetadata.Repo.Did, 355 RepoName: tpl.TriggerMetadata.Repo.Repo, 356 Workflows: workflows, 357 }, pipelineId) 358 return nil 359 }, 360 OnFail: func(jobError error) { 361 s.l.Error("pipeline run failed", "error", jobError) 362 }, 363 }) 364 if ok { 365 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 366 } else { 367 s.l.Error("failed to enqueue pipeline: queue is full") 368 } 369 } else if msg.Nsid == tangled.GitRefUpdateNSID { 370 event := tangled.GitRefUpdate{} 371 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 372 l.Error("error unmarshalling", "err", err) 373 return err 374 } 375 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 376 377 // use event.RepoAt 378 // sync git repos in {data}/repos/{did}/sh.tangled.repo/{rkey} 379 // if it's nil, don't run pipeline. knot needs upgrade 380 // we will leave sh.tangled.pipeline.trigger for backward compatibility 381 382 // NOTE: we are blindly trusting the knot that it will return only repos it own 383 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 384 repoPath := s.newRepoPath(event.RepoDid, event.RepoName) 385 err := sparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha) 386 if err != nil { 387 l.Error("failed to sync git repo", "err", err) 388 return fmt.Errorf("sync git repo: %w", err) 389 } 390 l.Info("synced git repo") 391 392 // TODO: plan the pipeline 393 } 394 395 return nil 396} 397 398func (s *Spindle) newRepoPath(did, name string) string { 399 return path.Join(s.cfg.Server.RepoDir(), did, name) 400} 401 402func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 403 scheme := "https://" 404 if s.cfg.Server.Dev { 405 scheme = "http://" 406 } 407 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 408} 409 410const RequiredVersion = "2.49.0" 411 412func ensureGitVersion() error { 413 v, err := gitVersion() 414 if err != nil { 415 return fmt.Errorf("fetching git version: %w", err) 416 } 417 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 418 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 419 } 420 return nil 421} 422 423// TODO: move to "git" module shared between knot, appview & spindle 424func gitVersion() (*version.Version, error) { 425 var buf bytes.Buffer 426 cmd := exec.Command("git", "version") 427 cmd.Stdout = &buf 428 cmd.Stderr = os.Stderr 429 err := cmd.Run() 430 if err != nil { 431 return nil, err 432 } 433 fields := strings.Fields(buf.String()) 434 if len(fields) < 3 { 435 return nil, fmt.Errorf("invalid git version: %s", buf) 436 } 437 438 // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1" 439 versionString := fields[2] 440 if pos := strings.Index(versionString, "windows"); pos >= 1 { 441 versionString = versionString[:pos-1] 442 } 443 return version.NewVersion(versionString) 444} 445 446func sparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error { 447 exist, err := isDir(path) 448 if err != nil { 449 return err 450 } 451 if !exist { 452 if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil { 453 return fmt.Errorf("git clone: %w", err) 454 } 455 if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", `'/.tangled/workflows'`).Run(); err != nil { 456 return fmt.Errorf("git sparse-checkout set: %w", err) 457 } 458 if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil { 459 return fmt.Errorf("git checkout: %w", err) 460 } 461 } else { 462 if err := exec.Command("git", "-C", path, "pull", "origin", rev).Run(); err != nil { 463 return fmt.Errorf("git pull: %w", err) 464 } 465 } 466 return nil 467} 468 469func isDir(path string) (bool, error) { 470 info, err := os.Stat(path) 471 if err == nil && info.IsDir() { 472 return true, nil 473 } 474 if os.IsNotExist(err) { 475 return false, nil 476 } 477 return false, err 478}