forked from tangled.org/core
Monorepo for Tangled
at master 11 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 "tangled.org/core/xrpc/serviceauth" 31) 32 33//go:embed motd 34var defaultMotd []byte 35 36const ( 37 rbacDomain = "thisserver" 38) 39 40type Spindle struct { 41 jc *jetstream.JetstreamClient 42 db *db.DB 43 e *rbac.Enforcer 44 l *slog.Logger 45 n *notifier.Notifier 46 engs map[string]models.Engine 47 jq *queue.Queue 48 cfg *config.Config 49 ks *eventconsumer.Consumer 50 res *idresolver.Resolver 51 vault secrets.Manager 52 motd []byte 53 motdMu sync.RWMutex 54} 55 56// New creates a new Spindle server with the provided configuration and engines. 57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 58 logger := log.FromContext(ctx) 59 60 d, err := db.Make(cfg.Server.DBPath) 61 if err != nil { 62 return nil, fmt.Errorf("failed to setup db: %w", err) 63 } 64 65 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 66 if err != nil { 67 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 } 69 e.E.EnableAutoSave(true) 70 71 n := notifier.New() 72 73 var vault secrets.Manager 74 switch cfg.Server.Secrets.Provider { 75 case "openbao": 76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 77 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 78 } 79 vault, err = secrets.NewOpenBaoManager( 80 cfg.Server.Secrets.OpenBao.ProxyAddr, 81 logger, 82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 83 ) 84 if err != nil { 85 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 86 } 87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 88 case "sqlite", "": 89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 90 if err != nil { 91 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 92 } 93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 94 default: 95 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 96 } 97 98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 99 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 100 101 collections := []string{ 102 tangled.SpindleMemberNSID, 103 tangled.RepoNSID, 104 tangled.RepoCollaboratorNSID, 105 } 106 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 107 if err != nil { 108 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 109 } 110 jc.AddDid(cfg.Server.Owner) 111 112 // Check if the spindle knows about any Dids; 113 dids, err := d.GetAllDids() 114 if err != nil { 115 return nil, fmt.Errorf("failed to get all dids: %w", err) 116 } 117 for _, d := range dids { 118 jc.AddDid(d) 119 } 120 121 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 122 123 spindle := &Spindle{ 124 jc: jc, 125 e: e, 126 db: d, 127 l: logger, 128 n: &n, 129 engs: engines, 130 jq: jq, 131 cfg: cfg, 132 res: resolver, 133 vault: vault, 134 motd: defaultMotd, 135 } 136 137 err = e.AddSpindle(rbacDomain) 138 if err != nil { 139 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 140 } 141 err = spindle.configureOwner() 142 if err != nil { 143 return nil, err 144 } 145 logger.Info("owner set", "did", cfg.Server.Owner) 146 147 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 148 if err != nil { 149 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 150 } 151 152 err = jc.StartJetstream(ctx, spindle.ingest()) 153 if err != nil { 154 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 155 } 156 157 // for each incoming sh.tangled.pipeline, we execute 158 // spindle.processPipeline, which in turn enqueues the pipeline 159 // job in the above registered queue. 160 ccfg := eventconsumer.NewConsumerConfig() 161 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 162 ccfg.Dev = cfg.Server.Dev 163 ccfg.ProcessFunc = spindle.processPipeline 164 ccfg.CursorStore = cursorStore 165 knownKnots, err := d.Knots() 166 if err != nil { 167 return nil, err 168 } 169 for _, knot := range knownKnots { 170 logger.Info("adding source start", "knot", knot) 171 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 172 } 173 spindle.ks = eventconsumer.NewConsumer(*ccfg) 174 175 return spindle, nil 176} 177 178// DB returns the database instance. 179func (s *Spindle) DB() *db.DB { 180 return s.db 181} 182 183// Queue returns the job queue instance. 184func (s *Spindle) Queue() *queue.Queue { 185 return s.jq 186} 187 188// Engines returns the map of available engines. 189func (s *Spindle) Engines() map[string]models.Engine { 190 return s.engs 191} 192 193// Vault returns the secrets manager instance. 194func (s *Spindle) Vault() secrets.Manager { 195 return s.vault 196} 197 198// Notifier returns the notifier instance. 199func (s *Spindle) Notifier() *notifier.Notifier { 200 return s.n 201} 202 203// Enforcer returns the RBAC enforcer instance. 204func (s *Spindle) Enforcer() *rbac.Enforcer { 205 return s.e 206} 207 208// SetMotdContent sets custom MOTD content, replacing the embedded default. 209func (s *Spindle) SetMotdContent(content []byte) { 210 s.motdMu.Lock() 211 defer s.motdMu.Unlock() 212 s.motd = content 213} 214 215// GetMotdContent returns the current MOTD content. 216func (s *Spindle) GetMotdContent() []byte { 217 s.motdMu.RLock() 218 defer s.motdMu.RUnlock() 219 return s.motd 220} 221 222// Start starts the Spindle server (blocking). 223func (s *Spindle) Start(ctx context.Context) error { 224 // starts a job queue runner in the background 225 s.jq.Start() 226 defer s.jq.Stop() 227 228 // Stop vault token renewal if it implements Stopper 229 if stopper, ok := s.vault.(secrets.Stopper); ok { 230 defer stopper.Stop() 231 } 232 233 go func() { 234 s.l.Info("starting knot event consumer") 235 s.ks.Start(ctx) 236 }() 237 238 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 239 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 240} 241 242func Run(ctx context.Context) error { 243 cfg, err := config.Load(ctx) 244 if err != nil { 245 return fmt.Errorf("failed to load config: %w", err) 246 } 247 248 nixeryEng, err := nixery.New(ctx, cfg) 249 if err != nil { 250 return err 251 } 252 253 s, err := New(ctx, cfg, map[string]models.Engine{ 254 "nixery": nixeryEng, 255 }) 256 if err != nil { 257 return err 258 } 259 260 return s.Start(ctx) 261} 262 263func (s *Spindle) Router() http.Handler { 264 mux := chi.NewRouter() 265 266 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 267 w.Write(s.GetMotdContent()) 268 }) 269 mux.HandleFunc("/events", s.Events) 270 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 271 272 mux.Mount("/xrpc", s.XrpcRouter()) 273 return mux 274} 275 276func (s *Spindle) XrpcRouter() http.Handler { 277 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 278 279 l := log.SubLogger(s.l, "xrpc") 280 281 x := xrpc.Xrpc{ 282 Logger: l, 283 Db: s.db, 284 Enforcer: s.e, 285 Engines: s.engs, 286 Config: s.cfg, 287 Resolver: s.res, 288 Vault: s.vault, 289 Notifier: s.Notifier(), 290 ServiceAuth: serviceAuth, 291 } 292 293 return x.Router() 294} 295 296func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 297 if msg.Nsid == tangled.PipelineNSID { 298 tpl := tangled.Pipeline{} 299 err := json.Unmarshal(msg.EventJson, &tpl) 300 if err != nil { 301 fmt.Println("error unmarshalling", err) 302 return err 303 } 304 305 if tpl.TriggerMetadata == nil { 306 return fmt.Errorf("no trigger metadata found") 307 } 308 309 if tpl.TriggerMetadata.Repo == nil { 310 return fmt.Errorf("no repo data found") 311 } 312 313 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 314 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 315 } 316 317 // filter by repos 318 _, err = s.db.GetRepo( 319 tpl.TriggerMetadata.Repo.Knot, 320 tpl.TriggerMetadata.Repo.Did, 321 tpl.TriggerMetadata.Repo.Repo, 322 ) 323 if err != nil { 324 return fmt.Errorf("failed to get repo: %w", err) 325 } 326 327 pipelineId := models.PipelineId{ 328 Knot: src.Key(), 329 Rkey: msg.Rkey, 330 } 331 332 workflows := make(map[models.Engine][]models.Workflow) 333 334 // Build pipeline environment variables once for all workflows 335 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 336 337 for _, w := range tpl.Workflows { 338 if w != nil { 339 if _, ok := s.engs[w.Engine]; !ok { 340 err = s.db.StatusFailed(models.WorkflowId{ 341 PipelineId: pipelineId, 342 Name: w.Name, 343 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 344 if err != nil { 345 return fmt.Errorf("db.StatusFailed: %w", err) 346 } 347 348 continue 349 } 350 351 eng := s.engs[w.Engine] 352 353 if _, ok := workflows[eng]; !ok { 354 workflows[eng] = []models.Workflow{} 355 } 356 357 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 358 if err != nil { 359 return fmt.Errorf("init workflow: %w", err) 360 } 361 362 // inject TANGLED_* env vars after InitWorkflow 363 // This prevents user-defined env vars from overriding them 364 if ewf.Environment == nil { 365 ewf.Environment = make(map[string]string) 366 } 367 maps.Copy(ewf.Environment, pipelineEnv) 368 369 workflows[eng] = append(workflows[eng], *ewf) 370 371 err = s.db.StatusPending(models.WorkflowId{ 372 PipelineId: pipelineId, 373 Name: w.Name, 374 }, s.n) 375 if err != nil { 376 return fmt.Errorf("db.StatusPending: %w", err) 377 } 378 } 379 } 380 381 ok := s.jq.Enqueue(queue.Job{ 382 Run: func() error { 383 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 384 RepoOwner: tpl.TriggerMetadata.Repo.Did, 385 RepoName: tpl.TriggerMetadata.Repo.Repo, 386 Workflows: workflows, 387 }, pipelineId) 388 return nil 389 }, 390 OnFail: func(jobError error) { 391 s.l.Error("pipeline run failed", "error", jobError) 392 }, 393 }) 394 if ok { 395 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 396 } else { 397 s.l.Error("failed to enqueue pipeline: queue is full") 398 } 399 } 400 401 return nil 402} 403 404func (s *Spindle) configureOwner() error { 405 cfgOwner := s.cfg.Server.Owner 406 407 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 408 if err != nil { 409 return err 410 } 411 412 switch len(existing) { 413 case 0: 414 // no owner configured, continue 415 case 1: 416 // find existing owner 417 existingOwner := existing[0] 418 419 // no ownership change, this is okay 420 if existingOwner == s.cfg.Server.Owner { 421 break 422 } 423 424 // remove existing owner 425 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 426 if err != nil { 427 return nil 428 } 429 default: 430 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 431 } 432 433 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 434}