Monorepo for Tangled tangled.org
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 12 "github.com/go-chi/chi/v5" 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/eventconsumer" 15 "tangled.org/core/eventconsumer/cursor" 16 "tangled.org/core/idresolver" 17 "tangled.org/core/jetstream" 18 "tangled.org/core/log" 19 "tangled.org/core/notifier" 20 "tangled.org/core/rbac2" 21 "tangled.org/core/spindle/config" 22 "tangled.org/core/spindle/db" 23 "tangled.org/core/spindle/engine" 24 "tangled.org/core/spindle/engines/nixery" 25 "tangled.org/core/spindle/models" 26 "tangled.org/core/spindle/queue" 27 "tangled.org/core/spindle/secrets" 28 "tangled.org/core/spindle/xrpc" 29 "tangled.org/core/xrpc/serviceauth" 30) 31 32//go:embed motd 33var motd []byte 34 35type Spindle struct { 36 jc *jetstream.JetstreamClient 37 db *db.DB 38 e *rbac2.Enforcer 39 l *slog.Logger 40 n *notifier.Notifier 41 engs map[string]models.Engine 42 jq *queue.Queue 43 cfg *config.Config 44 ks *eventconsumer.Consumer 45 res *idresolver.Resolver 46 vault secrets.Manager 47} 48 49// New creates a new Spindle server with the provided configuration and engines. 50func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 51 logger := log.FromContext(ctx) 52 53 d, err := db.Make(cfg.Server.DBPath) 54 if err != nil { 55 return nil, fmt.Errorf("failed to setup db: %w", err) 56 } 57 58 e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 59 if err != nil { 60 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 61 } 62 63 n := notifier.New() 64 65 var vault secrets.Manager 66 switch cfg.Server.Secrets.Provider { 67 case "openbao": 68 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 69 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 70 } 71 vault, err = secrets.NewOpenBaoManager( 72 cfg.Server.Secrets.OpenBao.ProxyAddr, 73 logger, 74 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 75 ) 76 if err != nil { 77 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 78 } 79 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 80 case "sqlite", "": 81 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 82 if err != nil { 83 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 84 } 85 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 86 default: 87 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 88 } 89 90 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 91 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 92 93 collections := []string{ 94 tangled.SpindleMemberNSID, 95 tangled.RepoNSID, 96 tangled.RepoCollaboratorNSID, 97 } 98 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 99 if err != nil { 100 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 101 } 102 jc.AddDid(cfg.Server.Owner.String()) 103 104 // Check if the spindle knows about any Dids; 105 dids, err := d.GetAllDids() 106 if err != nil { 107 return nil, fmt.Errorf("failed to get all dids: %w", err) 108 } 109 for _, d := range dids { 110 jc.AddDid(d) 111 } 112 113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 115 spindle := &Spindle{ 116 jc: jc, 117 e: e, 118 db: d, 119 l: logger, 120 n: &n, 121 engs: engines, 122 jq: jq, 123 cfg: cfg, 124 res: resolver, 125 vault: vault, 126 } 127 128 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 129 if err != nil { 130 return nil, err 131 } 132 logger.Info("owner set", "did", cfg.Server.Owner) 133 134 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 135 if err != nil { 136 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 137 } 138 139 err = jc.StartJetstream(ctx, spindle.ingest()) 140 if err != nil { 141 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 142 } 143 144 // for each incoming sh.tangled.pipeline, we execute 145 // spindle.processPipeline, which in turn enqueues the pipeline 146 // job in the above registered queue. 147 ccfg := eventconsumer.NewConsumerConfig() 148 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 149 ccfg.Dev = cfg.Server.Dev 150 ccfg.ProcessFunc = spindle.processPipeline 151 ccfg.CursorStore = cursorStore 152 knownKnots, err := d.Knots() 153 if err != nil { 154 return nil, err 155 } 156 for _, knot := range knownKnots { 157 logger.Info("adding source start", "knot", knot) 158 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 159 } 160 spindle.ks = eventconsumer.NewConsumer(*ccfg) 161 162 return spindle, nil 163} 164 165// DB returns the database instance. 166func (s *Spindle) DB() *db.DB { 167 return s.db 168} 169 170// Queue returns the job queue instance. 171func (s *Spindle) Queue() *queue.Queue { 172 return s.jq 173} 174 175// Engines returns the map of available engines. 176func (s *Spindle) Engines() map[string]models.Engine { 177 return s.engs 178} 179 180// Vault returns the secrets manager instance. 181func (s *Spindle) Vault() secrets.Manager { 182 return s.vault 183} 184 185// Notifier returns the notifier instance. 186func (s *Spindle) Notifier() *notifier.Notifier { 187 return s.n 188} 189 190// Enforcer returns the RBAC enforcer instance. 191func (s *Spindle) Enforcer() *rbac2.Enforcer { 192 return s.e 193} 194 195// Start starts the Spindle server (blocking). 196func (s *Spindle) Start(ctx context.Context) error { 197 // starts a job queue runner in the background 198 s.jq.Start() 199 defer s.jq.Stop() 200 201 // Stop vault token renewal if it implements Stopper 202 if stopper, ok := s.vault.(secrets.Stopper); ok { 203 defer stopper.Stop() 204 } 205 206 go func() { 207 s.l.Info("starting knot event consumer") 208 s.ks.Start(ctx) 209 }() 210 211 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 212 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 213} 214 215func Run(ctx context.Context) error { 216 cfg, err := config.Load(ctx) 217 if err != nil { 218 return fmt.Errorf("failed to load config: %w", err) 219 } 220 221 nixeryEng, err := nixery.New(ctx, cfg) 222 if err != nil { 223 return err 224 } 225 226 s, err := New(ctx, cfg, map[string]models.Engine{ 227 "nixery": nixeryEng, 228 }) 229 if err != nil { 230 return err 231 } 232 233 return s.Start(ctx) 234} 235 236func (s *Spindle) Router() http.Handler { 237 mux := chi.NewRouter() 238 239 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 240 w.Write(motd) 241 }) 242 mux.HandleFunc("/events", s.Events) 243 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 244 245 mux.Mount("/xrpc", s.XrpcRouter()) 246 return mux 247} 248 249func (s *Spindle) XrpcRouter() http.Handler { 250 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 251 252 l := log.SubLogger(s.l, "xrpc") 253 254 x := xrpc.Xrpc{ 255 Logger: l, 256 Db: s.db, 257 Enforcer: s.e, 258 Engines: s.engs, 259 Config: s.cfg, 260 Resolver: s.res, 261 Vault: s.vault, 262 Notifier: s.Notifier(), 263 ServiceAuth: serviceAuth, 264 } 265 266 return x.Router() 267} 268 269func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 270 if msg.Nsid == tangled.PipelineNSID { 271 tpl := tangled.Pipeline{} 272 err := json.Unmarshal(msg.EventJson, &tpl) 273 if err != nil { 274 fmt.Println("error unmarshalling", err) 275 return err 276 } 277 278 if tpl.TriggerMetadata == nil { 279 return fmt.Errorf("no trigger metadata found") 280 } 281 282 if tpl.TriggerMetadata.Repo == nil { 283 return fmt.Errorf("no repo data found") 284 } 285 286 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 287 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 288 } 289 290 // filter by repos 291 _, err = s.db.GetRepo( 292 tpl.TriggerMetadata.Repo.Knot, 293 tpl.TriggerMetadata.Repo.Did, 294 tpl.TriggerMetadata.Repo.Repo, 295 ) 296 if err != nil { 297 return fmt.Errorf("failed to get repo: %w", err) 298 } 299 300 pipelineId := models.PipelineId{ 301 Knot: src.Key(), 302 Rkey: msg.Rkey, 303 } 304 305 workflows := make(map[models.Engine][]models.Workflow) 306 307 // Build pipeline environment variables once for all workflows 308 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 309 310 for _, w := range tpl.Workflows { 311 if w != nil { 312 if _, ok := s.engs[w.Engine]; !ok { 313 err = s.db.StatusFailed(models.WorkflowId{ 314 PipelineId: pipelineId, 315 Name: w.Name, 316 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 317 if err != nil { 318 return fmt.Errorf("db.StatusFailed: %w", err) 319 } 320 321 continue 322 } 323 324 eng := s.engs[w.Engine] 325 326 if _, ok := workflows[eng]; !ok { 327 workflows[eng] = []models.Workflow{} 328 } 329 330 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 331 if err != nil { 332 return fmt.Errorf("init workflow: %w", err) 333 } 334 335 // inject TANGLED_* env vars after InitWorkflow 336 // This prevents user-defined env vars from overriding them 337 if ewf.Environment == nil { 338 ewf.Environment = make(map[string]string) 339 } 340 maps.Copy(ewf.Environment, pipelineEnv) 341 342 workflows[eng] = append(workflows[eng], *ewf) 343 344 err = s.db.StatusPending(models.WorkflowId{ 345 PipelineId: pipelineId, 346 Name: w.Name, 347 }, s.n) 348 if err != nil { 349 return fmt.Errorf("db.StatusPending: %w", err) 350 } 351 } 352 } 353 354 ok := s.jq.Enqueue(queue.Job{ 355 Run: func() error { 356 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 357 RepoOwner: tpl.TriggerMetadata.Repo.Did, 358 RepoName: tpl.TriggerMetadata.Repo.Repo, 359 Workflows: workflows, 360 }, pipelineId) 361 return nil 362 }, 363 OnFail: func(jobError error) { 364 s.l.Error("pipeline run failed", "error", jobError) 365 }, 366 }) 367 if ok { 368 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 369 } else { 370 s.l.Error("failed to enqueue pipeline: queue is full") 371 } 372 } 373 374 return nil 375}