Monorepo for Tangled tangled.org
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 12 "github.com/go-chi/chi/v5" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/eventconsumer" 15 "tangled.org/core/eventconsumer/cursor" 16 "tangled.org/core/idresolver" 17 "tangled.org/core/jetstream" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 "tangled.org/core/rbac" 21 "tangled.org/core/spindle/config" 22 "tangled.org/core/spindle/db" 23 "tangled.org/core/spindle/engine" 24 "tangled.org/core/spindle/engines/nixery" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/xrpc/serviceauth" 30) 31 32//go:embed motd 33var motd []byte 34 35const ( 36 rbacDomain = "thisserver" 37) 38 39type Spindle struct { 40 jc *jetstream.JetstreamClient 41 db *db.DB 42 e *rbac.Enforcer 43 l *slog.Logger 44 n *notifier.Notifier 45 engs map[string]models.Engine 46 jq *queue.Queue 47 cfg *config.Config 48 ks *eventconsumer.Consumer 49 res *idresolver.Resolver 50 vault secrets.Manager 51} 52 53// New creates a new Spindle server with the provided configuration and engines. 54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 55 logger := log.FromContext(ctx) 56 57 d, err := db.Make(cfg.Server.DBPath) 58 if err != nil { 59 return nil, fmt.Errorf("failed to setup db: %w", err) 60 } 61 62 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 63 if err != nil { 64 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 65 } 66 e.E.EnableAutoSave(true) 67 68 n := notifier.New() 69 70 var vault secrets.Manager 71 switch cfg.Server.Secrets.Provider { 72 case "openbao": 73 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 74 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 75 } 76 vault, err = secrets.NewOpenBaoManager( 77 cfg.Server.Secrets.OpenBao.ProxyAddr, 78 logger, 79 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 80 ) 81 if err != nil { 82 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 83 } 84 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 85 case "sqlite", "": 86 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 87 if err != nil { 88 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 89 } 90 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 91 default: 92 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 93 } 94 95 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 96 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 97 98 collections := []string{ 99 tangled.SpindleMemberNSID, 100 tangled.RepoNSID, 101 tangled.RepoCollaboratorNSID, 102 } 103 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 104 if err != nil { 105 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 106 } 107 jc.AddDid(cfg.Server.Owner) 108 109 // Check if the spindle knows about any Dids; 110 dids, err := d.GetAllDids() 111 if err != nil { 112 return nil, fmt.Errorf("failed to get all dids: %w", err) 113 } 114 for _, d := range dids { 115 jc.AddDid(d) 116 } 117 118 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 119 120 spindle := &Spindle{ 121 jc: jc, 122 e: e, 123 db: d, 124 l: logger, 125 n: &n, 126 engs: engines, 127 jq: jq, 128 cfg: cfg, 129 res: resolver, 130 vault: vault, 131 } 132 133 err = e.AddSpindle(rbacDomain) 134 if err != nil { 135 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 136 } 137 err = spindle.configureOwner() 138 if err != nil { 139 return nil, err 140 } 141 logger.Info("owner set", "did", cfg.Server.Owner) 142 143 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 144 if err != nil { 145 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 146 } 147 148 err = jc.StartJetstream(ctx, spindle.ingest()) 149 if err != nil { 150 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 151 } 152 153 // for each incoming sh.tangled.pipeline, we execute 154 // spindle.processPipeline, which in turn enqueues the pipeline 155 // job in the above registered queue. 156 ccfg := eventconsumer.NewConsumerConfig() 157 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 158 ccfg.Dev = cfg.Server.Dev 159 ccfg.ProcessFunc = spindle.processPipeline 160 ccfg.CursorStore = cursorStore 161 knownKnots, err := d.Knots() 162 if err != nil { 163 return nil, err 164 } 165 for _, knot := range knownKnots { 166 logger.Info("adding source start", "knot", knot) 167 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 168 } 169 spindle.ks = eventconsumer.NewConsumer(*ccfg) 170 171 return spindle, nil 172} 173 174// DB returns the database instance. 175func (s *Spindle) DB() *db.DB { 176 return s.db 177} 178 179// Queue returns the job queue instance. 180func (s *Spindle) Queue() *queue.Queue { 181 return s.jq 182} 183 184// Engines returns the map of available engines. 185func (s *Spindle) Engines() map[string]models.Engine { 186 return s.engs 187} 188 189// Vault returns the secrets manager instance. 190func (s *Spindle) Vault() secrets.Manager { 191 return s.vault 192} 193 194// Notifier returns the notifier instance. 195func (s *Spindle) Notifier() *notifier.Notifier { 196 return s.n 197} 198 199// Enforcer returns the RBAC enforcer instance. 200func (s *Spindle) Enforcer() *rbac.Enforcer { 201 return s.e 202} 203 204// Start starts the Spindle server (blocking). 205func (s *Spindle) Start(ctx context.Context) error { 206 // starts a job queue runner in the background 207 s.jq.Start() 208 defer s.jq.Stop() 209 210 // Stop vault token renewal if it implements Stopper 211 if stopper, ok := s.vault.(secrets.Stopper); ok { 212 defer stopper.Stop() 213 } 214 215 go func() { 216 s.l.Info("starting knot event consumer") 217 s.ks.Start(ctx) 218 }() 219 220 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 221 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 222} 223 224func Run(ctx context.Context) error { 225 cfg, err := config.Load(ctx) 226 if err != nil { 227 return fmt.Errorf("failed to load config: %w", err) 228 } 229 230 nixeryEng, err := nixery.New(ctx, cfg) 231 if err != nil { 232 return err 233 } 234 235 s, err := New(ctx, cfg, map[string]models.Engine{ 236 "nixery": nixeryEng, 237 }) 238 if err != nil { 239 return err 240 } 241 242 return s.Start(ctx) 243} 244 245func (s *Spindle) Router() http.Handler { 246 mux := chi.NewRouter() 247 248 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 249 w.Write(motd) 250 }) 251 mux.HandleFunc("/events", s.Events) 252 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 253 254 mux.Mount("/xrpc", s.XrpcRouter()) 255 return mux 256} 257 258func (s *Spindle) XrpcRouter() http.Handler { 259 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 260 261 l := log.SubLogger(s.l, "xrpc") 262 263 x := xrpc.Xrpc{ 264 Logger: l, 265 Db: s.db, 266 Enforcer: s.e, 267 Engines: s.engs, 268 Config: s.cfg, 269 Resolver: s.res, 270 Vault: s.vault, 271 Notifier: s.Notifier(), 272 ServiceAuth: serviceAuth, 273 } 274 275 return x.Router() 276} 277 278func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 279 if msg.Nsid == tangled.PipelineNSID { 280 tpl := tangled.Pipeline{} 281 err := json.Unmarshal(msg.EventJson, &tpl) 282 if err != nil { 283 fmt.Println("error unmarshalling", err) 284 return err 285 } 286 287 if tpl.TriggerMetadata == nil { 288 return fmt.Errorf("no trigger metadata found") 289 } 290 291 if tpl.TriggerMetadata.Repo == nil { 292 return fmt.Errorf("no repo data found") 293 } 294 295 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 296 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 297 } 298 299 // filter by repos 300 _, err = s.db.GetRepo( 301 tpl.TriggerMetadata.Repo.Knot, 302 tpl.TriggerMetadata.Repo.Did, 303 tpl.TriggerMetadata.Repo.Repo, 304 ) 305 if err != nil { 306 return fmt.Errorf("failed to get repo: %w", err) 307 } 308 309 pipelineId := models.PipelineId{ 310 Knot: src.Key(), 311 Rkey: msg.Rkey, 312 } 313 314 workflows := make(map[models.Engine][]models.Workflow) 315 316 // Build pipeline environment variables once for all workflows 317 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 318 319 for _, w := range tpl.Workflows { 320 if w != nil { 321 if _, ok := s.engs[w.Engine]; !ok { 322 err = s.db.StatusFailed(models.WorkflowId{ 323 PipelineId: pipelineId, 324 Name: w.Name, 325 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 326 if err != nil { 327 return fmt.Errorf("db.StatusFailed: %w", err) 328 } 329 330 continue 331 } 332 333 eng := s.engs[w.Engine] 334 335 if _, ok := workflows[eng]; !ok { 336 workflows[eng] = []models.Workflow{} 337 } 338 339 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 340 if err != nil { 341 return fmt.Errorf("init workflow: %w", err) 342 } 343 344 // inject TANGLED_* env vars after InitWorkflow 345 // This prevents user-defined env vars from overriding them 346 if ewf.Environment == nil { 347 ewf.Environment = make(map[string]string) 348 } 349 maps.Copy(ewf.Environment, pipelineEnv) 350 351 workflows[eng] = append(workflows[eng], *ewf) 352 353 err = s.db.StatusPending(models.WorkflowId{ 354 PipelineId: pipelineId, 355 Name: w.Name, 356 }, s.n) 357 if err != nil { 358 return fmt.Errorf("db.StatusPending: %w", err) 359 } 360 } 361 } 362 363 ok := s.jq.Enqueue(queue.Job{ 364 Run: func() error { 365 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 366 RepoOwner: tpl.TriggerMetadata.Repo.Did, 367 RepoName: tpl.TriggerMetadata.Repo.Repo, 368 Workflows: workflows, 369 }, pipelineId) 370 return nil 371 }, 372 OnFail: func(jobError error) { 373 s.l.Error("pipeline run failed", "error", jobError) 374 }, 375 }) 376 if ok { 377 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 378 } else { 379 s.l.Error("failed to enqueue pipeline: queue is full") 380 } 381 } 382 383 return nil 384} 385 386func (s *Spindle) configureOwner() error { 387 cfgOwner := s.cfg.Server.Owner 388 389 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 390 if err != nil { 391 return err 392 } 393 394 switch len(existing) { 395 case 0: 396 // no owner configured, continue 397 case 1: 398 // find existing owner 399 existingOwner := existing[0] 400 401 // no ownership change, this is okay 402 if existingOwner == s.cfg.Server.Owner { 403 break 404 } 405 406 // remove existing owner 407 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 408 if err != nil { 409 return nil 410 } 411 default: 412 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 413 } 414 415 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 416}