1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "time" 11 12 "github.com/go-chi/chi/v5" 13 "github.com/go-co-op/gocron/v2" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/jetstream" 20 "tangled.org/core/log" 21 "tangled.org/core/notifier" 22 "tangled.org/core/rbac" 23 "tangled.org/core/spindle/config" 24 "tangled.org/core/spindle/db" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/engines/nixery" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/queue" 29 "tangled.org/core/spindle/secrets" 30 "tangled.org/core/spindle/xrpc" 31 "tangled.org/core/xrpc/serviceauth" 32) 33 34//go:embed motd 35var motd []byte 36 37const ( 38 rbacDomain = "thisserver" 39) 40 41type Spindle struct { 42 jc *jetstream.JetstreamClient 43 db *db.DB 44 e *rbac.Enforcer 45 l *slog.Logger 46 n *notifier.Notifier 47 engs map[string]models.Engine 48 jq *queue.Queue 49 cfg *config.Config 50 ks *eventconsumer.Consumer 51 res *idresolver.Resolver 52 vault secrets.Manager 53 crons gocron.Scheduler 54} 55 56// New creates a new Spindle server with the provided configuration and engines. 57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 58 logger := log.FromContext(ctx) 59 60 d, err := db.Make(cfg.Server.DBPath) 61 if err != nil { 62 return nil, fmt.Errorf("failed to setup db: %w", err) 63 } 64 65 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 66 if err != nil { 67 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 } 69 e.E.EnableAutoSave(true) 70 71 n := notifier.New() 72 73 var vault secrets.Manager 74 switch cfg.Server.Secrets.Provider { 75 case "openbao": 76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 77 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 78 } 79 vault, err = secrets.NewOpenBaoManager( 80 cfg.Server.Secrets.OpenBao.ProxyAddr, 81 logger, 82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 83 ) 84 if err != nil { 85 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 86 } 87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 88 case "sqlite", "": 89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 90 if err != nil { 91 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 92 } 93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 94 default: 95 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 96 } 97 98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 99 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 100 101 collections := []string{ 102 tangled.SpindleMemberNSID, 103 tangled.RepoNSID, 104 tangled.RepoCollaboratorNSID, 105 } 106 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 107 if err != nil { 108 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 109 } 110 jc.AddDid(cfg.Server.Owner) 111 112 // Check if the spindle knows about any Dids; 113 dids, err := d.GetAllDids() 114 if err != nil { 115 return nil, fmt.Errorf("failed to get all dids: %w", err) 116 } 117 for _, d := range dids { 118 jc.AddDid(d) 119 } 120 121 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 122 123 scheduler, err := gocron.NewScheduler(gocron.WithLocation(time.UTC)) 124 if err != nil { 125 return nil, fmt.Errorf("failed to create cron scheduler: %w", err) 126 } 127 defer func() { _ = scheduler.Shutdown() }() 128 129 spindle := &Spindle{ 130 jc: jc, 131 e: e, 132 db: d, 133 l: logger, 134 n: &n, 135 engs: engines, 136 jq: jq, 137 cfg: cfg, 138 res: resolver, 139 vault: vault, 140 crons: scheduler, 141 } 142 143 err = e.AddSpindle(rbacDomain) 144 if err != nil { 145 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 146 } 147 err = spindle.configureOwner() 148 if err != nil { 149 return nil, err 150 } 151 logger.Info("owner set", "did", cfg.Server.Owner) 152 153 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 154 if err != nil { 155 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 156 } 157 158 err = jc.StartJetstream(ctx, spindle.ingest()) 159 if err != nil { 160 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 161 } 162 163 // for each incoming sh.tangled.pipeline, we execute 164 // spindle.processPipeline, which in turn enqueues the pipeline 165 // job in the above registered queue. 166 ccfg := eventconsumer.NewConsumerConfig() 167 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 168 ccfg.Dev = cfg.Server.Dev 169 ccfg.ProcessFunc = spindle.processPipeline 170 ccfg.CursorStore = cursorStore 171 knownKnots, err := d.Knots() 172 if err != nil { 173 return nil, err 174 } 175 for _, knot := range knownKnots { 176 logger.Info("adding source start", "knot", knot) 177 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 178 } 179 spindle.ks = eventconsumer.NewConsumer(*ccfg) 180 181 return spindle, nil 182} 183 184// DB returns the database instance. 185func (s *Spindle) DB() *db.DB { 186 return s.db 187} 188 189// Queue returns the job queue instance. 190func (s *Spindle) Queue() *queue.Queue { 191 return s.jq 192} 193 194// Engines returns the map of available engines. 195func (s *Spindle) Engines() map[string]models.Engine { 196 return s.engs 197} 198 199// Vault returns the secrets manager instance. 200func (s *Spindle) Vault() secrets.Manager { 201 return s.vault 202} 203 204// Notifier returns the notifier instance. 205func (s *Spindle) Notifier() *notifier.Notifier { 206 return s.n 207} 208 209// Enforcer returns the RBAC enforcer instance. 210func (s *Spindle) Enforcer() *rbac.Enforcer { 211 return s.e 212} 213 214// Scheduler returns the cron scheduler instance. 215func (s *Spindle) Scheduler() *gocron.Scheduler { 216 return &s.crons 217} 218 219// Start starts the Spindle server (blocking). 220func (s *Spindle) Start(ctx context.Context) error { 221 // starts a job queue runner in the background 222 s.jq.Start() 223 defer s.jq.Stop() 224 225 // Stop vault token renewal if it implements Stopper 226 if stopper, ok := s.vault.(secrets.Stopper); ok { 227 defer stopper.Stop() 228 } 229 230 s.crons.Start() 231 232 go func() { 233 s.l.Info("starting knot event consumer") 234 s.ks.Start(ctx) 235 }() 236 237 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 238 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 239} 240 241func Run(ctx context.Context) error { 242 cfg, err := config.Load(ctx) 243 if err != nil { 244 return fmt.Errorf("failed to load config: %w", err) 245 } 246 247 nixeryEng, err := nixery.New(ctx, cfg) 248 if err != nil { 249 return err 250 } 251 252 s, err := New(ctx, cfg, map[string]models.Engine{ 253 "nixery": nixeryEng, 254 }) 255 if err != nil { 256 return err 257 } 258 259 return s.Start(ctx) 260} 261 262func (s *Spindle) Router() http.Handler { 263 mux := chi.NewRouter() 264 265 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 266 w.Write(motd) 267 }) 268 mux.HandleFunc("/events", s.Events) 269 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 270 271 mux.Mount("/xrpc", s.XrpcRouter()) 272 return mux 273} 274 275func (s *Spindle) XrpcRouter() http.Handler { 276 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 277 278 l := log.SubLogger(s.l, "xrpc") 279 280 x := xrpc.Xrpc{ 281 Logger: l, 282 Db: s.db, 283 Enforcer: s.e, 284 Engines: s.engs, 285 Config: s.cfg, 286 Resolver: s.res, 287 Vault: s.vault, 288 ServiceAuth: serviceAuth, 289 } 290 291 return x.Router() 292} 293 294func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 295 if msg.Nsid == tangled.PipelineNSID { 296 tpl := tangled.Pipeline{} 297 err := json.Unmarshal(msg.EventJson, &tpl) 298 if err != nil { 299 fmt.Println("error unmarshalling", err) 300 return err 301 } 302 303 if tpl.TriggerMetadata == nil { 304 return fmt.Errorf("no trigger metadata found") 305 } 306 307 if tpl.TriggerMetadata.Repo == nil { 308 return fmt.Errorf("no repo data found") 309 } 310 311 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 312 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 313 } 314 315 // filter by repos 316 _, err = s.db.GetRepo( 317 tpl.TriggerMetadata.Repo.Knot, 318 tpl.TriggerMetadata.Repo.Did, 319 tpl.TriggerMetadata.Repo.Repo, 320 ) 321 if err != nil { 322 return err 323 } 324 325 pipelineId := models.PipelineId{ 326 Knot: src.Key(), 327 Rkey: msg.Rkey, 328 } 329 330 workflows := make(map[models.Engine][]models.Workflow) 331 332 for _, w := range tpl.Workflows { 333 if w != nil { 334 if _, ok := s.engs[w.Engine]; !ok { 335 err = s.db.StatusFailed(models.WorkflowId{ 336 PipelineId: pipelineId, 337 Name: w.Name, 338 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 339 if err != nil { 340 return err 341 } 342 343 continue 344 } 345 346 eng := s.engs[w.Engine] 347 348 if _, ok := workflows[eng]; !ok { 349 workflows[eng] = []models.Workflow{} 350 } 351 352 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 353 if err != nil { 354 return err 355 } 356 357 workflows[eng] = append(workflows[eng], *ewf) 358 359 err = s.db.StatusPending(models.WorkflowId{ 360 PipelineId: pipelineId, 361 Name: w.Name, 362 }, s.n) 363 if err != nil { 364 return err 365 } 366 } 367 } 368 369 ok := s.jq.Enqueue(queue.Job{ 370 Run: func() error { 371 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 372 RepoOwner: tpl.TriggerMetadata.Repo.Did, 373 RepoName: tpl.TriggerMetadata.Repo.Repo, 374 Workflows: workflows, 375 }, pipelineId) 376 return nil 377 }, 378 OnFail: func(jobError error) { 379 s.l.Error("pipeline run failed", "error", jobError) 380 }, 381 }) 382 if ok { 383 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 384 } else { 385 s.l.Error("failed to enqueue pipeline: queue is full") 386 } 387 } 388 389 return nil 390} 391 392func (s *Spindle) configureOwner() error { 393 cfgOwner := s.cfg.Server.Owner 394 395 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 396 if err != nil { 397 return err 398 } 399 400 switch len(existing) { 401 case 0: 402 // no owner configured, continue 403 case 1: 404 // find existing owner 405 existingOwner := existing[0] 406 407 // no ownership change, this is okay 408 if existingOwner == s.cfg.Server.Owner { 409 break 410 } 411 412 // remove existing owner 413 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 414 if err != nil { 415 return nil 416 } 417 default: 418 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 419 } 420 421 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 422}