Monorepo for Tangled tangled.org
at sl/rbac2test 506 lines 14 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/go-chi/chi/v5" 17 "github.com/go-git/go-git/v5/plumbing/object" 18 "github.com/hashicorp/go-version" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/idresolver" 23 kgit "tangled.org/core/knotserver/git" 24 "tangled.org/core/log" 25 "tangled.org/core/notifier" 26 "tangled.org/core/rbac2" 27 "tangled.org/core/spindle/config" 28 "tangled.org/core/spindle/db" 29 "tangled.org/core/spindle/engine" 30 "tangled.org/core/spindle/engines/nixery" 31 "tangled.org/core/spindle/git" 32 "tangled.org/core/spindle/models" 33 "tangled.org/core/spindle/queue" 34 "tangled.org/core/spindle/secrets" 35 "tangled.org/core/spindle/xrpc" 36 "tangled.org/core/tap" 37 "tangled.org/core/tid" 38 "tangled.org/core/workflow" 39 "tangled.org/core/xrpc/serviceauth" 40) 41 42//go:embed motd 43var defaultMotd []byte 44 45type Spindle struct { 46 tap *tap.Client 47 db *db.DB 48 e *rbac2.Enforcer 49 l *slog.Logger 50 n *notifier.Notifier 51 engs map[string]models.Engine 52 jq *queue.Queue 53 cfg *config.Config 54 ks *eventconsumer.Consumer 55 res *idresolver.Resolver 56 vault secrets.Manager 57 motd []byte 58 motdMu sync.RWMutex 59} 60 61// New creates a new Spindle server with the provided configuration and engines. 62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 63 logger := log.FromContext(ctx) 64 65 if err := ensureGitVersion(); err != nil { 66 return nil, fmt.Errorf("ensuring git version: %w", err) 67 } 68 69 d, err := db.Make(ctx, cfg.Server.DBPath()) 70 if err != nil { 71 return nil, fmt.Errorf("failed to setup db: %w", err) 72 } 73 74 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 75 if err != nil { 76 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 77 } 78 79 n := notifier.New() 80 81 var vault secrets.Manager 82 switch cfg.Server.Secrets.Provider { 83 case "openbao": 84 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 85 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 86 } 87 vault, err = secrets.NewOpenBaoManager( 88 cfg.Server.Secrets.OpenBao.ProxyAddr, 89 logger, 90 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 91 ) 92 if err != nil { 93 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 94 } 95 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 96 case "sqlite", "": 97 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 98 if err != nil { 99 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 100 } 101 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 102 default: 103 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 104 } 105 106 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 107 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 108 109 tap := tap.NewClient(cfg.Server.TapUrl, "") 110 111 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 112 113 spindle := &Spindle{ 114 tap: &tap, 115 e: e, 116 db: d, 117 l: logger, 118 n: &n, 119 engs: engines, 120 jq: jq, 121 cfg: cfg, 122 res: resolver, 123 vault: vault, 124 motd: defaultMotd, 125 } 126 127 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 128 if err != nil { 129 return nil, err 130 } 131 logger.Info("owner set", "did", cfg.Server.Owner) 132 133 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 134 if err != nil { 135 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 136 } 137 138 // spindle listen to knot stream for sh.tangled.git.refUpdate 139 // which will sync the local workflow files in spindle and enqueues the 140 // pipeline job for on-push workflows 141 ccfg := eventconsumer.NewConsumerConfig() 142 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 143 ccfg.Dev = cfg.Server.Dev 144 ccfg.ProcessFunc = spindle.processKnotStream 145 ccfg.CursorStore = cursorStore 146 knownKnots, err := d.Knots() 147 if err != nil { 148 return nil, err 149 } 150 for _, knot := range knownKnots { 151 logger.Info("adding source start", "knot", knot) 152 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 153 } 154 spindle.ks = eventconsumer.NewConsumer(*ccfg) 155 156 return spindle, nil 157} 158 159// DB returns the database instance. 160func (s *Spindle) DB() *db.DB { 161 return s.db 162} 163 164// Queue returns the job queue instance. 165func (s *Spindle) Queue() *queue.Queue { 166 return s.jq 167} 168 169// Engines returns the map of available engines. 170func (s *Spindle) Engines() map[string]models.Engine { 171 return s.engs 172} 173 174// Vault returns the secrets manager instance. 175func (s *Spindle) Vault() secrets.Manager { 176 return s.vault 177} 178 179// Notifier returns the notifier instance. 180func (s *Spindle) Notifier() *notifier.Notifier { 181 return s.n 182} 183 184// Enforcer returns the RBAC enforcer instance. 185func (s *Spindle) Enforcer() *rbac2.Enforcer { 186 return s.e 187} 188 189// SetMotdContent sets custom MOTD content, replacing the embedded default. 190func (s *Spindle) SetMotdContent(content []byte) { 191 s.motdMu.Lock() 192 defer s.motdMu.Unlock() 193 s.motd = content 194} 195 196// GetMotdContent returns the current MOTD content. 197func (s *Spindle) GetMotdContent() []byte { 198 s.motdMu.RLock() 199 defer s.motdMu.RUnlock() 200 return s.motd 201} 202 203// Start starts the Spindle server (blocking). 204func (s *Spindle) Start(ctx context.Context) error { 205 // starts a job queue runner in the background 206 s.jq.Start() 207 defer s.jq.Stop() 208 209 // Stop vault token renewal if it implements Stopper 210 if stopper, ok := s.vault.(secrets.Stopper); ok { 211 defer stopper.Stop() 212 } 213 214 go func() { 215 s.l.Info("starting knot event consumer") 216 s.ks.Start(ctx) 217 }() 218 219 // ensure server owner is tracked 220 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 221 return err 222 } 223 224 go func() { 225 s.l.Info("starting tap stream consumer") 226 s.tap.Connect(ctx, &tap.SimpleIndexer{ 227 EventHandler: s.processEvent, 228 }) 229 }() 230 231 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 232 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 233} 234 235func Run(ctx context.Context) error { 236 cfg, err := config.Load(ctx) 237 if err != nil { 238 return fmt.Errorf("failed to load config: %w", err) 239 } 240 241 nixeryEng, err := nixery.New(ctx, cfg) 242 if err != nil { 243 return err 244 } 245 246 s, err := New(ctx, cfg, map[string]models.Engine{ 247 "nixery": nixeryEng, 248 }) 249 if err != nil { 250 return err 251 } 252 253 return s.Start(ctx) 254} 255 256func (s *Spindle) Router() http.Handler { 257 mux := chi.NewRouter() 258 259 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 260 w.Write(s.GetMotdContent()) 261 }) 262 mux.HandleFunc("/events", s.Events) 263 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 264 265 mux.Mount("/xrpc", s.XrpcRouter()) 266 return mux 267} 268 269func (s *Spindle) XrpcRouter() http.Handler { 270 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 271 272 l := log.SubLogger(s.l, "xrpc") 273 274 x := xrpc.Xrpc{ 275 Logger: l, 276 Db: s.db, 277 Enforcer: s.e, 278 Engines: s.engs, 279 Config: s.cfg, 280 Resolver: s.res, 281 Vault: s.vault, 282 Notifier: s.Notifier(), 283 ServiceAuth: serviceAuth, 284 } 285 286 return x.Router() 287} 288 289func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 290 l := log.FromContext(ctx).With("handler", "processKnotStream") 291 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 292 if msg.Nsid == tangled.GitRefUpdateNSID { 293 event := tangled.GitRefUpdate{} 294 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 295 l.Error("error unmarshalling", "err", err) 296 return err 297 } 298 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 299 300 // resolve repo name to rkey 301 // TODO: git.refUpdate should respond with rkey instead of repo name 302 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 303 if err != nil { 304 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 305 } 306 307 // NOTE: we are blindly trusting the knot that it will return only repos it own 308 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 309 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 310 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 311 return fmt.Errorf("sync git repo: %w", err) 312 } 313 l.Info("synced git repo") 314 315 compiler := workflow.Compiler{ 316 Trigger: tangled.Pipeline_TriggerMetadata{ 317 Kind: string(workflow.TriggerKindPush), 318 Push: &tangled.Pipeline_PushTriggerData{ 319 Ref: event.Ref, 320 OldSha: event.OldSha, 321 NewSha: event.NewSha, 322 }, 323 Repo: &tangled.Pipeline_TriggerRepo{ 324 Did: repo.Did.String(), 325 Knot: repo.Knot, 326 Repo: repo.Name, 327 }, 328 }, 329 } 330 331 // load workflow definitions from rev (without spindle context) 332 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 333 if err != nil { 334 return fmt.Errorf("loading pipeline: %w", err) 335 } 336 if len(rawPipeline) == 0 { 337 l.Info("no workflow definition find for the repo. skipping the event") 338 return nil 339 } 340 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 341 // TODO: pass compile error to workflow log 342 for _, w := range compiler.Diagnostics.Errors { 343 l.Error(w.String()) 344 } 345 for _, w := range compiler.Diagnostics.Warnings { 346 l.Warn(w.String()) 347 } 348 349 pipelineId := models.PipelineId{ 350 Knot: tpl.TriggerMetadata.Repo.Knot, 351 Rkey: tid.TID(), 352 } 353 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 354 l.Error("failed to create pipeline event", "err", err) 355 return nil 356 } 357 err = s.processPipeline(ctx, tpl, pipelineId) 358 if err != nil { 359 return err 360 } 361 } 362 363 return nil 364} 365 366func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 367 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 368 return nil, fmt.Errorf("syncing git repo: %w", err) 369 } 370 gr, err := kgit.Open(repoPath, rev) 371 if err != nil { 372 return nil, fmt.Errorf("opening git repo: %w", err) 373 } 374 375 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 376 if errors.Is(err, object.ErrDirectoryNotFound) { 377 // return empty RawPipeline when directory doesn't exist 378 return nil, nil 379 } else if err != nil { 380 return nil, fmt.Errorf("loading file tree: %w", err) 381 } 382 383 var rawPipeline workflow.RawPipeline 384 for _, e := range workflowDir { 385 if !e.IsFile() { 386 continue 387 } 388 389 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 390 contents, err := gr.RawContent(fpath) 391 if err != nil { 392 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 393 } 394 395 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 396 Name: e.Name, 397 Contents: contents, 398 }) 399 } 400 401 return rawPipeline, nil 402} 403 404func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 405 // Build pipeline environment variables once for all workflows 406 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 407 408 // filter & init workflows 409 workflows := make(map[models.Engine][]models.Workflow) 410 for _, w := range tpl.Workflows { 411 if w == nil { 412 continue 413 } 414 if _, ok := s.engs[w.Engine]; !ok { 415 err := s.db.StatusFailed(models.WorkflowId{ 416 PipelineId: pipelineId, 417 Name: w.Name, 418 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 419 if err != nil { 420 return fmt.Errorf("db.StatusFailed: %w", err) 421 } 422 423 continue 424 } 425 426 eng := s.engs[w.Engine] 427 428 if _, ok := workflows[eng]; !ok { 429 workflows[eng] = []models.Workflow{} 430 } 431 432 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 433 if err != nil { 434 return fmt.Errorf("init workflow: %w", err) 435 } 436 437 // inject TANGLED_* env vars after InitWorkflow 438 // This prevents user-defined env vars from overriding them 439 if ewf.Environment == nil { 440 ewf.Environment = make(map[string]string) 441 } 442 maps.Copy(ewf.Environment, pipelineEnv) 443 444 workflows[eng] = append(workflows[eng], *ewf) 445 } 446 447 // enqueue pipeline 448 ok := s.jq.Enqueue(queue.Job{ 449 Run: func() error { 450 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 451 RepoOwner: tpl.TriggerMetadata.Repo.Did, 452 RepoName: tpl.TriggerMetadata.Repo.Repo, 453 Workflows: workflows, 454 }, pipelineId) 455 return nil 456 }, 457 OnFail: func(jobError error) { 458 s.l.Error("pipeline run failed", "error", jobError) 459 }, 460 }) 461 if !ok { 462 return fmt.Errorf("failed to enqueue pipeline: queue is full") 463 } 464 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 465 466 // emit StatusPending for all workflows here (after successful enqueue) 467 for _, ewfs := range workflows { 468 for _, ewf := range ewfs { 469 err := s.db.StatusPending(models.WorkflowId{ 470 PipelineId: pipelineId, 471 Name: ewf.Name, 472 }, s.n) 473 if err != nil { 474 return fmt.Errorf("db.StatusPending: %w", err) 475 } 476 } 477 } 478 return nil 479} 480 481// newRepoPath creates a path to store repository by its did and rkey. 482// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 483func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 484 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 485} 486 487func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 488 scheme := "https://" 489 if s.cfg.Server.Dev { 490 scheme = "http://" 491 } 492 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 493} 494 495const RequiredVersion = "2.49.0" 496 497func ensureGitVersion() error { 498 v, err := git.Version() 499 if err != nil { 500 return fmt.Errorf("fetching git version: %w", err) 501 } 502 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 503 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 504 } 505 return nil 506}