Monorepo for Tangled tangled.org
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "path/filepath" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/go-chi/chi/v5" 15 "github.com/hashicorp/go-version" 16 "tangled.org/core/api/tangled" 17 "tangled.org/core/eventconsumer" 18 "tangled.org/core/eventconsumer/cursor" 19 "tangled.org/core/idresolver" 20 "tangled.org/core/log" 21 "tangled.org/core/notifier" 22 "tangled.org/core/rbac2" 23 "tangled.org/core/spindle/config" 24 "tangled.org/core/spindle/db" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/engines/nixery" 27 "tangled.org/core/spindle/git" 28 "tangled.org/core/spindle/models" 29 "tangled.org/core/spindle/queue" 30 "tangled.org/core/spindle/secrets" 31 "tangled.org/core/spindle/xrpc" 32 "tangled.org/core/tap" 33 "tangled.org/core/xrpc/serviceauth" 34) 35 36//go:embed motd 37var motd []byte 38 39type Spindle struct { 40 tap *tap.Client 41 db *db.DB 42 e *rbac2.Enforcer 43 l *slog.Logger 44 n *notifier.Notifier 45 engs map[string]models.Engine 46 jq *queue.Queue 47 cfg *config.Config 48 ks *eventconsumer.Consumer 49 res *idresolver.Resolver 50 vault secrets.Manager 51} 52 53// New creates a new Spindle server with the provided configuration and engines. 54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 55 logger := log.FromContext(ctx) 56 57 if err := ensureGitVersion(); err != nil { 58 return nil, fmt.Errorf("ensuring git version: %w", err) 59 } 60 61 d, err := db.Make(ctx, cfg.Server.DBPath()) 62 if err != nil { 63 return nil, fmt.Errorf("failed to setup db: %w", err) 64 } 65 66 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 69 } 70 71 n := notifier.New() 72 73 var vault secrets.Manager 74 switch cfg.Server.Secrets.Provider { 75 case "openbao": 76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 77 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 78 } 79 vault, err = secrets.NewOpenBaoManager( 80 cfg.Server.Secrets.OpenBao.ProxyAddr, 81 logger, 82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 83 ) 84 if err != nil { 85 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 86 } 87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 88 case "sqlite", "": 89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 90 if err != nil { 91 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 92 } 93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 94 default: 95 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 96 } 97 98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 99 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 100 101 tap := tap.NewClient(cfg.Server.TapUrl, "") 102 103 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 104 105 spindle := &Spindle{ 106 tap: &tap, 107 e: e, 108 db: d, 109 l: logger, 110 n: &n, 111 engs: engines, 112 jq: jq, 113 cfg: cfg, 114 res: resolver, 115 vault: vault, 116 } 117 118 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 119 if err != nil { 120 return nil, err 121 } 122 logger.Info("owner set", "did", cfg.Server.Owner) 123 124 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 125 if err != nil { 126 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 127 } 128 129 // for each incoming sh.tangled.pipeline, we execute 130 // spindle.processPipeline, which in turn enqueues the pipeline 131 // job in the above registered queue. 132 ccfg := eventconsumer.NewConsumerConfig() 133 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 134 ccfg.Dev = cfg.Server.Dev 135 ccfg.ProcessFunc = spindle.processPipeline 136 ccfg.CursorStore = cursorStore 137 knownKnots, err := d.Knots() 138 if err != nil { 139 return nil, err 140 } 141 for _, knot := range knownKnots { 142 logger.Info("adding source start", "knot", knot) 143 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 144 } 145 spindle.ks = eventconsumer.NewConsumer(*ccfg) 146 147 return spindle, nil 148} 149 150// DB returns the database instance. 151func (s *Spindle) DB() *db.DB { 152 return s.db 153} 154 155// Queue returns the job queue instance. 156func (s *Spindle) Queue() *queue.Queue { 157 return s.jq 158} 159 160// Engines returns the map of available engines. 161func (s *Spindle) Engines() map[string]models.Engine { 162 return s.engs 163} 164 165// Vault returns the secrets manager instance. 166func (s *Spindle) Vault() secrets.Manager { 167 return s.vault 168} 169 170// Notifier returns the notifier instance. 171func (s *Spindle) Notifier() *notifier.Notifier { 172 return s.n 173} 174 175// Enforcer returns the RBAC enforcer instance. 176func (s *Spindle) Enforcer() *rbac2.Enforcer { 177 return s.e 178} 179 180// Start starts the Spindle server (blocking). 181func (s *Spindle) Start(ctx context.Context) error { 182 // starts a job queue runner in the background 183 s.jq.Start() 184 defer s.jq.Stop() 185 186 // Stop vault token renewal if it implements Stopper 187 if stopper, ok := s.vault.(secrets.Stopper); ok { 188 defer stopper.Stop() 189 } 190 191 go func() { 192 s.l.Info("starting knot event consumer") 193 s.ks.Start(ctx) 194 }() 195 196 // ensure server owner is tracked 197 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 198 return err 199 } 200 201 go func() { 202 s.l.Info("starting tap stream consumer") 203 s.tap.Connect(ctx, &tap.SimpleIndexer{ 204 EventHandler: s.processEvent, 205 }) 206 }() 207 208 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 209 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 210} 211 212func Run(ctx context.Context) error { 213 cfg, err := config.Load(ctx) 214 if err != nil { 215 return fmt.Errorf("failed to load config: %w", err) 216 } 217 218 nixeryEng, err := nixery.New(ctx, cfg) 219 if err != nil { 220 return err 221 } 222 223 s, err := New(ctx, cfg, map[string]models.Engine{ 224 "nixery": nixeryEng, 225 }) 226 if err != nil { 227 return err 228 } 229 230 return s.Start(ctx) 231} 232 233func (s *Spindle) Router() http.Handler { 234 mux := chi.NewRouter() 235 236 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 237 w.Write(motd) 238 }) 239 mux.HandleFunc("/events", s.Events) 240 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 241 242 mux.Mount("/xrpc", s.XrpcRouter()) 243 return mux 244} 245 246func (s *Spindle) XrpcRouter() http.Handler { 247 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 248 249 l := log.SubLogger(s.l, "xrpc") 250 251 x := xrpc.Xrpc{ 252 Logger: l, 253 Db: s.db, 254 Enforcer: s.e, 255 Engines: s.engs, 256 Config: s.cfg, 257 Resolver: s.res, 258 Vault: s.vault, 259 Notifier: s.Notifier(), 260 ServiceAuth: serviceAuth, 261 } 262 263 return x.Router() 264} 265 266func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 267 l := log.FromContext(ctx).With("handler", "processKnotStream") 268 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 269 if msg.Nsid == tangled.PipelineNSID { 270 return nil 271 tpl := tangled.Pipeline{} 272 err := json.Unmarshal(msg.EventJson, &tpl) 273 if err != nil { 274 fmt.Println("error unmarshalling", err) 275 return err 276 } 277 278 if tpl.TriggerMetadata == nil { 279 return fmt.Errorf("no trigger metadata found") 280 } 281 282 if tpl.TriggerMetadata.Repo == nil { 283 return fmt.Errorf("no repo data found") 284 } 285 286 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 287 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 288 } 289 290 // filter by repos 291 _, err = s.db.GetRepoWithName( 292 syntax.DID(tpl.TriggerMetadata.Repo.Did), 293 tpl.TriggerMetadata.Repo.Repo, 294 ) 295 if err != nil { 296 return fmt.Errorf("failed to get repo: %w", err) 297 } 298 299 pipelineId := models.PipelineId{ 300 Knot: src.Key(), 301 Rkey: msg.Rkey, 302 } 303 304 workflows := make(map[models.Engine][]models.Workflow) 305 306 // Build pipeline environment variables once for all workflows 307 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 308 309 for _, w := range tpl.Workflows { 310 if w != nil { 311 if _, ok := s.engs[w.Engine]; !ok { 312 err = s.db.StatusFailed(models.WorkflowId{ 313 PipelineId: pipelineId, 314 Name: w.Name, 315 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 316 if err != nil { 317 return fmt.Errorf("db.StatusFailed: %w", err) 318 } 319 320 continue 321 } 322 323 eng := s.engs[w.Engine] 324 325 if _, ok := workflows[eng]; !ok { 326 workflows[eng] = []models.Workflow{} 327 } 328 329 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 330 if err != nil { 331 return fmt.Errorf("init workflow: %w", err) 332 } 333 334 // inject TANGLED_* env vars after InitWorkflow 335 // This prevents user-defined env vars from overriding them 336 if ewf.Environment == nil { 337 ewf.Environment = make(map[string]string) 338 } 339 maps.Copy(ewf.Environment, pipelineEnv) 340 341 workflows[eng] = append(workflows[eng], *ewf) 342 343 err = s.db.StatusPending(models.WorkflowId{ 344 PipelineId: pipelineId, 345 Name: w.Name, 346 }, s.n) 347 if err != nil { 348 return fmt.Errorf("db.StatusPending: %w", err) 349 } 350 } 351 } 352 353 ok := s.jq.Enqueue(queue.Job{ 354 Run: func() error { 355 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 356 RepoOwner: tpl.TriggerMetadata.Repo.Did, 357 RepoName: tpl.TriggerMetadata.Repo.Repo, 358 Workflows: workflows, 359 }, pipelineId) 360 return nil 361 }, 362 OnFail: func(jobError error) { 363 s.l.Error("pipeline run failed", "error", jobError) 364 }, 365 }) 366 if ok { 367 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 368 } else { 369 s.l.Error("failed to enqueue pipeline: queue is full") 370 } 371 } else if msg.Nsid == tangled.GitRefUpdateNSID { 372 event := tangled.GitRefUpdate{} 373 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 374 l.Error("error unmarshalling", "err", err) 375 return err 376 } 377 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 378 379 // resolve repo name to rkey 380 // TODO: git.refUpdate should respond with rkey instead of repo name 381 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 382 if err != nil { 383 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 384 } 385 386 // NOTE: we are blindly trusting the knot that it will return only repos it own 387 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 388 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 389 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 390 return fmt.Errorf("sync git repo: %w", err) 391 } 392 l.Info("synced git repo") 393 394 // TODO: plan the pipeline 395 } 396 397 return nil 398} 399 400// newRepoPath creates a path to store repository by its did and rkey. 401// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 402func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 403 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 404} 405 406func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 407 scheme := "https://" 408 if s.cfg.Server.Dev { 409 scheme = "http://" 410 } 411 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 412} 413 414const RequiredVersion = "2.49.0" 415 416func ensureGitVersion() error { 417 v, err := git.Version() 418 if err != nil { 419 return fmt.Errorf("fetching git version: %w", err) 420 } 421 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 422 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 423 } 424 return nil 425}