Monorepo for Tangled tangled.org
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/go-chi/chi/v5" 16 "github.com/go-git/go-git/v5/plumbing/object" 17 "github.com/hashicorp/go-version" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/eventconsumer" 20 "tangled.org/core/eventconsumer/cursor" 21 "tangled.org/core/idresolver" 22 kgit "tangled.org/core/knotserver/git" 23 "tangled.org/core/log" 24 "tangled.org/core/notifier" 25 "tangled.org/core/rbac2" 26 "tangled.org/core/spindle/config" 27 "tangled.org/core/spindle/db" 28 "tangled.org/core/spindle/engine" 29 "tangled.org/core/spindle/engines/nixery" 30 "tangled.org/core/spindle/git" 31 "tangled.org/core/spindle/models" 32 "tangled.org/core/spindle/queue" 33 "tangled.org/core/spindle/secrets" 34 "tangled.org/core/spindle/xrpc" 35 "tangled.org/core/tap" 36 "tangled.org/core/tid" 37 "tangled.org/core/workflow" 38 "tangled.org/core/xrpc/serviceauth" 39) 40 41//go:embed motd 42var motd []byte 43 44type Spindle struct { 45 tap *tap.Client 46 db *db.DB 47 e *rbac2.Enforcer 48 l *slog.Logger 49 n *notifier.Notifier 50 engs map[string]models.Engine 51 jq *queue.Queue 52 cfg *config.Config 53 ks *eventconsumer.Consumer 54 res *idresolver.Resolver 55 vault secrets.Manager 56} 57 58// New creates a new Spindle server with the provided configuration and engines. 59func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 60 logger := log.FromContext(ctx) 61 62 if err := ensureGitVersion(); err != nil { 63 return nil, fmt.Errorf("ensuring git version: %w", err) 64 } 65 66 d, err := db.Make(ctx, cfg.Server.DBPath()) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup db: %w", err) 69 } 70 71 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 74 } 75 76 n := notifier.New() 77 78 var vault secrets.Manager 79 switch cfg.Server.Secrets.Provider { 80 case "openbao": 81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 83 } 84 vault, err = secrets.NewOpenBaoManager( 85 cfg.Server.Secrets.OpenBao.ProxyAddr, 86 logger, 87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 88 ) 89 if err != nil { 90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 91 } 92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 93 case "sqlite", "": 94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 97 } 98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 99 default: 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 } 102 103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 105 106 tap := tap.NewClient(cfg.Server.TapUrl, "") 107 108 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 109 110 spindle := &Spindle{ 111 tap: &tap, 112 e: e, 113 db: d, 114 l: logger, 115 n: &n, 116 engs: engines, 117 jq: jq, 118 cfg: cfg, 119 res: resolver, 120 vault: vault, 121 } 122 123 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 124 if err != nil { 125 return nil, err 126 } 127 logger.Info("owner set", "did", cfg.Server.Owner) 128 129 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 130 if err != nil { 131 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 132 } 133 134 // spindle listen to knot stream for sh.tangled.git.refUpdate 135 // which will sync the local workflow files in spindle and enqueues the 136 // pipeline job for on-push workflows 137 ccfg := eventconsumer.NewConsumerConfig() 138 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 139 ccfg.Dev = cfg.Server.Dev 140 ccfg.ProcessFunc = spindle.processKnotStream 141 ccfg.CursorStore = cursorStore 142 knownKnots, err := d.Knots() 143 if err != nil { 144 return nil, err 145 } 146 for _, knot := range knownKnots { 147 logger.Info("adding source start", "knot", knot) 148 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 149 } 150 spindle.ks = eventconsumer.NewConsumer(*ccfg) 151 152 return spindle, nil 153} 154 155// DB returns the database instance. 156func (s *Spindle) DB() *db.DB { 157 return s.db 158} 159 160// Queue returns the job queue instance. 161func (s *Spindle) Queue() *queue.Queue { 162 return s.jq 163} 164 165// Engines returns the map of available engines. 166func (s *Spindle) Engines() map[string]models.Engine { 167 return s.engs 168} 169 170// Vault returns the secrets manager instance. 171func (s *Spindle) Vault() secrets.Manager { 172 return s.vault 173} 174 175// Notifier returns the notifier instance. 176func (s *Spindle) Notifier() *notifier.Notifier { 177 return s.n 178} 179 180// Enforcer returns the RBAC enforcer instance. 181func (s *Spindle) Enforcer() *rbac2.Enforcer { 182 return s.e 183} 184 185// Start starts the Spindle server (blocking). 186func (s *Spindle) Start(ctx context.Context) error { 187 // starts a job queue runner in the background 188 s.jq.Start() 189 defer s.jq.Stop() 190 191 // Stop vault token renewal if it implements Stopper 192 if stopper, ok := s.vault.(secrets.Stopper); ok { 193 defer stopper.Stop() 194 } 195 196 go func() { 197 s.l.Info("starting knot event consumer") 198 s.ks.Start(ctx) 199 }() 200 201 // ensure server owner is tracked 202 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 203 return err 204 } 205 206 go func() { 207 s.l.Info("starting tap stream consumer") 208 s.tap.Connect(ctx, &tap.SimpleIndexer{ 209 EventHandler: s.processEvent, 210 }) 211 }() 212 213 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 214 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 215} 216 217func Run(ctx context.Context) error { 218 cfg, err := config.Load(ctx) 219 if err != nil { 220 return fmt.Errorf("failed to load config: %w", err) 221 } 222 223 nixeryEng, err := nixery.New(ctx, cfg) 224 if err != nil { 225 return err 226 } 227 228 s, err := New(ctx, cfg, map[string]models.Engine{ 229 "nixery": nixeryEng, 230 }) 231 if err != nil { 232 return err 233 } 234 235 return s.Start(ctx) 236} 237 238func (s *Spindle) Router() http.Handler { 239 mux := chi.NewRouter() 240 241 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 242 w.Write(motd) 243 }) 244 mux.HandleFunc("/events", s.Events) 245 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 246 247 mux.Mount("/xrpc", s.XrpcRouter()) 248 return mux 249} 250 251func (s *Spindle) XrpcRouter() http.Handler { 252 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 253 254 l := log.SubLogger(s.l, "xrpc") 255 256 x := xrpc.Xrpc{ 257 Logger: l, 258 Db: s.db, 259 Enforcer: s.e, 260 Engines: s.engs, 261 Config: s.cfg, 262 Resolver: s.res, 263 Vault: s.vault, 264 Notifier: s.Notifier(), 265 ServiceAuth: serviceAuth, 266 } 267 268 return x.Router() 269} 270 271func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 272 l := log.FromContext(ctx).With("handler", "processKnotStream") 273 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 274 if msg.Nsid == tangled.GitRefUpdateNSID { 275 event := tangled.GitRefUpdate{} 276 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 277 l.Error("error unmarshalling", "err", err) 278 return err 279 } 280 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 281 282 // resolve repo name to rkey 283 // TODO: git.refUpdate should respond with rkey instead of repo name 284 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 285 if err != nil { 286 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 287 } 288 289 // NOTE: we are blindly trusting the knot that it will return only repos it own 290 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 291 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 292 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 293 return fmt.Errorf("sync git repo: %w", err) 294 } 295 l.Info("synced git repo") 296 297 compiler := workflow.Compiler{ 298 Trigger: tangled.Pipeline_TriggerMetadata{ 299 Kind: string(workflow.TriggerKindPush), 300 Push: &tangled.Pipeline_PushTriggerData{ 301 Ref: event.Ref, 302 OldSha: event.OldSha, 303 NewSha: event.NewSha, 304 }, 305 Repo: &tangled.Pipeline_TriggerRepo{ 306 Did: repo.Did.String(), 307 Knot: repo.Knot, 308 Repo: repo.Name, 309 }, 310 }, 311 } 312 313 // load workflow definitions from rev (without spindle context) 314 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 315 if err != nil { 316 return fmt.Errorf("loading pipeline: %w", err) 317 } 318 if len(rawPipeline) == 0 { 319 l.Info("no workflow definition find for the repo. skipping the event") 320 return nil 321 } 322 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 323 // TODO: pass compile error to workflow log 324 for _, w := range compiler.Diagnostics.Errors { 325 l.Error(w.String()) 326 } 327 for _, w := range compiler.Diagnostics.Warnings { 328 l.Warn(w.String()) 329 } 330 331 pipelineId := models.PipelineId{ 332 Knot: tpl.TriggerMetadata.Repo.Knot, 333 Rkey: tid.TID(), 334 } 335 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 336 l.Error("failed to create pipeline event", "err", err) 337 return nil 338 } 339 err = s.processPipeline(ctx, tpl, pipelineId) 340 if err != nil { 341 return err 342 } 343 } 344 345 return nil 346} 347 348func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 349 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 350 return nil, fmt.Errorf("syncing git repo: %w", err) 351 } 352 gr, err := kgit.Open(repoPath, rev) 353 if err != nil { 354 return nil, fmt.Errorf("opening git repo: %w", err) 355 } 356 357 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 358 if errors.Is(err, object.ErrDirectoryNotFound) { 359 // return empty RawPipeline when directory doesn't exist 360 return nil, nil 361 } else if err != nil { 362 return nil, fmt.Errorf("loading file tree: %w", err) 363 } 364 365 var rawPipeline workflow.RawPipeline 366 for _, e := range workflowDir { 367 if !e.IsFile() { 368 continue 369 } 370 371 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 372 contents, err := gr.RawContent(fpath) 373 if err != nil { 374 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 375 } 376 377 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 378 Name: e.Name, 379 Contents: contents, 380 }) 381 } 382 383 return rawPipeline, nil 384} 385 386func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 387 // Build pipeline environment variables once for all workflows 388 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 389 390 // filter & init workflows 391 workflows := make(map[models.Engine][]models.Workflow) 392 for _, w := range tpl.Workflows { 393 if w == nil { 394 continue 395 } 396 if _, ok := s.engs[w.Engine]; !ok { 397 err := s.db.StatusFailed(models.WorkflowId{ 398 PipelineId: pipelineId, 399 Name: w.Name, 400 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 401 if err != nil { 402 return fmt.Errorf("db.StatusFailed: %w", err) 403 } 404 405 continue 406 } 407 408 eng := s.engs[w.Engine] 409 410 if _, ok := workflows[eng]; !ok { 411 workflows[eng] = []models.Workflow{} 412 } 413 414 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 415 if err != nil { 416 return fmt.Errorf("init workflow: %w", err) 417 } 418 419 // inject TANGLED_* env vars after InitWorkflow 420 // This prevents user-defined env vars from overriding them 421 if ewf.Environment == nil { 422 ewf.Environment = make(map[string]string) 423 } 424 maps.Copy(ewf.Environment, pipelineEnv) 425 426 workflows[eng] = append(workflows[eng], *ewf) 427 } 428 429 // enqueue pipeline 430 ok := s.jq.Enqueue(queue.Job{ 431 Run: func() error { 432 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 433 RepoOwner: tpl.TriggerMetadata.Repo.Did, 434 RepoName: tpl.TriggerMetadata.Repo.Repo, 435 Workflows: workflows, 436 }, pipelineId) 437 return nil 438 }, 439 OnFail: func(jobError error) { 440 s.l.Error("pipeline run failed", "error", jobError) 441 }, 442 }) 443 if !ok { 444 return fmt.Errorf("failed to enqueue pipeline: queue is full") 445 } 446 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 447 448 // emit StatusPending for all workflows here (after successful enqueue) 449 for _, ewfs := range workflows { 450 for _, ewf := range ewfs { 451 err := s.db.StatusPending(models.WorkflowId{ 452 PipelineId: pipelineId, 453 Name: ewf.Name, 454 }, s.n) 455 if err != nil { 456 return fmt.Errorf("db.StatusPending: %w", err) 457 } 458 } 459 } 460 return nil 461} 462 463// newRepoPath creates a path to store repository by its did and rkey. 464// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 465func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 466 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 467} 468 469func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 470 scheme := "https://" 471 if s.cfg.Server.Dev { 472 scheme = "http://" 473 } 474 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 475} 476 477const RequiredVersion = "2.49.0" 478 479func ensureGitVersion() error { 480 v, err := git.Version() 481 if err != nil { 482 return fmt.Errorf("fetching git version: %w", err) 483 } 484 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 485 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 486 } 487 return nil 488}