Monorepo for Tangled
at local-dev 443 lines 11 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 "tangled.org/core/xrpc/serviceauth" 31) 32 33//go:embed motd 34var defaultMotd []byte 35 36const ( 37 rbacDomain = "thisserver" 38 39 // devPDSURL is the default PDS endpoint for local development. 40 // In dev mode, PLC/PDS/Jetstream all run on localhost inside the VM. 41 devPDSURL = "http://localhost:2583" 42) 43 44type Spindle struct { 45 jc *jetstream.JetstreamClient 46 db *db.DB 47 e *rbac.Enforcer 48 l *slog.Logger 49 n *notifier.Notifier 50 engs map[string]models.Engine 51 jq *queue.Queue 52 cfg *config.Config 53 ks *eventconsumer.Consumer 54 res *idresolver.Resolver 55 vault secrets.Manager 56 motd []byte 57 motdMu sync.RWMutex 58} 59 60// New creates a new Spindle server with the provided configuration and engines. 61func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 62 logger := log.FromContext(ctx) 63 64 d, err := db.Make(cfg.Server.DBPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to setup db: %w", err) 67 } 68 69 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 70 if err != nil { 71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 72 } 73 e.E.EnableAutoSave(true) 74 75 n := notifier.New() 76 77 var vault secrets.Manager 78 switch cfg.Server.Secrets.Provider { 79 case "openbao": 80 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 81 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 82 } 83 vault, err = secrets.NewOpenBaoManager( 84 cfg.Server.Secrets.OpenBao.ProxyAddr, 85 logger, 86 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 87 ) 88 if err != nil { 89 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 90 } 91 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 92 case "sqlite", "": 93 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 94 if err != nil { 95 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 96 } 97 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 98 default: 99 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 100 } 101 102 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 103 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 104 105 collections := []string{ 106 tangled.SpindleMemberNSID, 107 tangled.RepoNSID, 108 tangled.RepoCollaboratorNSID, 109 } 110 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 111 if err != nil { 112 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 113 } 114 jc.AddDid(cfg.Server.Owner) 115 116 // Check if the spindle knows about any Dids; 117 dids, err := d.GetAllDids() 118 if err != nil { 119 return nil, fmt.Errorf("failed to get all dids: %w", err) 120 } 121 for _, d := range dids { 122 jc.AddDid(d) 123 } 124 125 var resolver *idresolver.Resolver 126 if cfg.Server.Dev { 127 resolver = idresolver.DefaultDevResolver(cfg.Server.PlcUrl, devPDSURL) 128 } else { 129 resolver = idresolver.DefaultResolver(cfg.Server.PlcUrl) 130 } 131 132 spindle := &Spindle{ 133 jc: jc, 134 e: e, 135 db: d, 136 l: logger, 137 n: &n, 138 engs: engines, 139 jq: jq, 140 cfg: cfg, 141 res: resolver, 142 vault: vault, 143 motd: defaultMotd, 144 } 145 146 err = e.AddSpindle(rbacDomain) 147 if err != nil { 148 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 149 } 150 err = spindle.configureOwner() 151 if err != nil { 152 return nil, err 153 } 154 logger.Info("owner set", "did", cfg.Server.Owner) 155 156 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 157 if err != nil { 158 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 159 } 160 161 err = jc.StartJetstream(ctx, spindle.ingest()) 162 if err != nil { 163 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 164 } 165 166 // for each incoming sh.tangled.pipeline, we execute 167 // spindle.processPipeline, which in turn enqueues the pipeline 168 // job in the above registered queue. 169 ccfg := eventconsumer.NewConsumerConfig() 170 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 171 ccfg.Dev = cfg.Server.Dev 172 ccfg.ProcessFunc = spindle.processPipeline 173 ccfg.CursorStore = cursorStore 174 knownKnots, err := d.Knots() 175 if err != nil { 176 return nil, err 177 } 178 for _, knot := range knownKnots { 179 logger.Info("adding source start", "knot", knot) 180 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 181 } 182 spindle.ks = eventconsumer.NewConsumer(*ccfg) 183 184 return spindle, nil 185} 186 187// DB returns the database instance. 188func (s *Spindle) DB() *db.DB { 189 return s.db 190} 191 192// Queue returns the job queue instance. 193func (s *Spindle) Queue() *queue.Queue { 194 return s.jq 195} 196 197// Engines returns the map of available engines. 198func (s *Spindle) Engines() map[string]models.Engine { 199 return s.engs 200} 201 202// Vault returns the secrets manager instance. 203func (s *Spindle) Vault() secrets.Manager { 204 return s.vault 205} 206 207// Notifier returns the notifier instance. 208func (s *Spindle) Notifier() *notifier.Notifier { 209 return s.n 210} 211 212// Enforcer returns the RBAC enforcer instance. 213func (s *Spindle) Enforcer() *rbac.Enforcer { 214 return s.e 215} 216 217// SetMotdContent sets custom MOTD content, replacing the embedded default. 218func (s *Spindle) SetMotdContent(content []byte) { 219 s.motdMu.Lock() 220 defer s.motdMu.Unlock() 221 s.motd = content 222} 223 224// GetMotdContent returns the current MOTD content. 225func (s *Spindle) GetMotdContent() []byte { 226 s.motdMu.RLock() 227 defer s.motdMu.RUnlock() 228 return s.motd 229} 230 231// Start starts the Spindle server (blocking). 232func (s *Spindle) Start(ctx context.Context) error { 233 // starts a job queue runner in the background 234 s.jq.Start() 235 defer s.jq.Stop() 236 237 // Stop vault token renewal if it implements Stopper 238 if stopper, ok := s.vault.(secrets.Stopper); ok { 239 defer stopper.Stop() 240 } 241 242 go func() { 243 s.l.Info("starting knot event consumer") 244 s.ks.Start(ctx) 245 }() 246 247 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 248 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 249} 250 251func Run(ctx context.Context) error { 252 cfg, err := config.Load(ctx) 253 if err != nil { 254 return fmt.Errorf("failed to load config: %w", err) 255 } 256 257 nixeryEng, err := nixery.New(ctx, cfg) 258 if err != nil { 259 return err 260 } 261 262 s, err := New(ctx, cfg, map[string]models.Engine{ 263 "nixery": nixeryEng, 264 }) 265 if err != nil { 266 return err 267 } 268 269 return s.Start(ctx) 270} 271 272func (s *Spindle) Router() http.Handler { 273 mux := chi.NewRouter() 274 275 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 276 w.Write(s.GetMotdContent()) 277 }) 278 mux.HandleFunc("/events", s.Events) 279 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 280 281 mux.Mount("/xrpc", s.XrpcRouter()) 282 return mux 283} 284 285func (s *Spindle) XrpcRouter() http.Handler { 286 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 287 288 l := log.SubLogger(s.l, "xrpc") 289 290 x := xrpc.Xrpc{ 291 Logger: l, 292 Db: s.db, 293 Enforcer: s.e, 294 Engines: s.engs, 295 Config: s.cfg, 296 Resolver: s.res, 297 Vault: s.vault, 298 Notifier: s.Notifier(), 299 ServiceAuth: serviceAuth, 300 } 301 302 return x.Router() 303} 304 305func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 306 if msg.Nsid == tangled.PipelineNSID { 307 tpl := tangled.Pipeline{} 308 err := json.Unmarshal(msg.EventJson, &tpl) 309 if err != nil { 310 fmt.Println("error unmarshalling", err) 311 return err 312 } 313 314 if tpl.TriggerMetadata == nil { 315 return fmt.Errorf("no trigger metadata found") 316 } 317 318 if tpl.TriggerMetadata.Repo == nil { 319 return fmt.Errorf("no repo data found") 320 } 321 322 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 323 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 324 } 325 326 // filter by repos 327 _, err = s.db.GetRepo( 328 tpl.TriggerMetadata.Repo.Knot, 329 tpl.TriggerMetadata.Repo.Did, 330 tpl.TriggerMetadata.Repo.Repo, 331 ) 332 if err != nil { 333 return fmt.Errorf("failed to get repo: %w", err) 334 } 335 336 pipelineId := models.PipelineId{ 337 Knot: src.Key(), 338 Rkey: msg.Rkey, 339 } 340 341 workflows := make(map[models.Engine][]models.Workflow) 342 343 // Build pipeline environment variables once for all workflows 344 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 345 346 for _, w := range tpl.Workflows { 347 if w != nil { 348 if _, ok := s.engs[w.Engine]; !ok { 349 err = s.db.StatusFailed(models.WorkflowId{ 350 PipelineId: pipelineId, 351 Name: w.Name, 352 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 353 if err != nil { 354 return fmt.Errorf("db.StatusFailed: %w", err) 355 } 356 357 continue 358 } 359 360 eng := s.engs[w.Engine] 361 362 if _, ok := workflows[eng]; !ok { 363 workflows[eng] = []models.Workflow{} 364 } 365 366 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 367 if err != nil { 368 return fmt.Errorf("init workflow: %w", err) 369 } 370 371 // inject TANGLED_* env vars after InitWorkflow 372 // This prevents user-defined env vars from overriding them 373 if ewf.Environment == nil { 374 ewf.Environment = make(map[string]string) 375 } 376 maps.Copy(ewf.Environment, pipelineEnv) 377 378 workflows[eng] = append(workflows[eng], *ewf) 379 380 err = s.db.StatusPending(models.WorkflowId{ 381 PipelineId: pipelineId, 382 Name: w.Name, 383 }, s.n) 384 if err != nil { 385 return fmt.Errorf("db.StatusPending: %w", err) 386 } 387 } 388 } 389 390 ok := s.jq.Enqueue(queue.Job{ 391 Run: func() error { 392 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 393 RepoOwner: tpl.TriggerMetadata.Repo.Did, 394 RepoName: tpl.TriggerMetadata.Repo.Repo, 395 Workflows: workflows, 396 }, pipelineId) 397 return nil 398 }, 399 OnFail: func(jobError error) { 400 s.l.Error("pipeline run failed", "error", jobError) 401 }, 402 }) 403 if ok { 404 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 405 } else { 406 s.l.Error("failed to enqueue pipeline: queue is full") 407 } 408 } 409 410 return nil 411} 412 413func (s *Spindle) configureOwner() error { 414 cfgOwner := s.cfg.Server.Owner 415 416 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 417 if err != nil { 418 return err 419 } 420 421 switch len(existing) { 422 case 0: 423 // no owner configured, continue 424 case 1: 425 // find existing owner 426 existingOwner := existing[0] 427 428 // no ownership change, this is okay 429 if existingOwner == s.cfg.Server.Owner { 430 break 431 } 432 433 // remove existing owner 434 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 435 if err != nil { 436 return nil 437 } 438 default: 439 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 440 } 441 442 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 443}