Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11
12 "github.com/go-chi/chi/v5"
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/eventconsumer"
15 "tangled.org/core/eventconsumer/cursor"
16 "tangled.org/core/idresolver"
17 "tangled.org/core/jetstream"
18 "tangled.org/core/log"
19 "tangled.org/core/notifier"
20 "tangled.org/core/rbac"
21 "tangled.org/core/spindle/config"
22 "tangled.org/core/spindle/db"
23 "tangled.org/core/spindle/engine"
24 "tangled.org/core/spindle/engines/nixery"
25 "tangled.org/core/spindle/models"
26 "tangled.org/core/spindle/queue"
27 "tangled.org/core/spindle/secrets"
28 "tangled.org/core/spindle/xrpc"
29 "tangled.org/core/xrpc/serviceauth"
30)
31
32//go:embed motd
33var motd []byte
34
35const (
36 rbacDomain = "thisserver"
37)
38
39type Spindle struct {
40 jc *jetstream.JetstreamClient
41 db *db.DB
42 e *rbac.Enforcer
43 l *slog.Logger
44 n *notifier.Notifier
45 engs map[string]models.Engine
46 jq *queue.Queue
47 cfg *config.Config
48 ks *eventconsumer.Consumer
49 res *idresolver.Resolver
50 vault secrets.Manager
51}
52
53// New creates a new Spindle server with the provided configuration and engines.
54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
55 logger := log.FromContext(ctx)
56
57 d, err := db.Make(cfg.Server.DBPath)
58 if err != nil {
59 return nil, fmt.Errorf("failed to setup db: %w", err)
60 }
61
62 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
63 if err != nil {
64 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
65 }
66 e.E.EnableAutoSave(true)
67
68 n := notifier.New()
69
70 var vault secrets.Manager
71 switch cfg.Server.Secrets.Provider {
72 case "openbao":
73 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
74 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
75 }
76 vault, err = secrets.NewOpenBaoManager(
77 cfg.Server.Secrets.OpenBao.ProxyAddr,
78 logger,
79 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
80 )
81 if err != nil {
82 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
83 }
84 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
85 case "sqlite", "":
86 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
87 if err != nil {
88 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
89 }
90 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
91 default:
92 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
93 }
94
95 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
96 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
97
98 collections := []string{
99 tangled.SpindleMemberNSID,
100 tangled.RepoNSID,
101 tangled.RepoCollaboratorNSID,
102 }
103 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
104 if err != nil {
105 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
106 }
107 jc.AddDid(cfg.Server.Owner)
108
109 // Check if the spindle knows about any Dids;
110 dids, err := d.GetAllDids()
111 if err != nil {
112 return nil, fmt.Errorf("failed to get all dids: %w", err)
113 }
114 for _, d := range dids {
115 jc.AddDid(d)
116 }
117
118 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
119
120 spindle := &Spindle{
121 jc: jc,
122 e: e,
123 db: d,
124 l: logger,
125 n: &n,
126 engs: engines,
127 jq: jq,
128 cfg: cfg,
129 res: resolver,
130 vault: vault,
131 }
132
133 err = e.AddSpindle(rbacDomain)
134 if err != nil {
135 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
136 }
137 err = spindle.configureOwner()
138 if err != nil {
139 return nil, err
140 }
141 logger.Info("owner set", "did", cfg.Server.Owner)
142
143 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
144 if err != nil {
145 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
146 }
147
148 err = jc.StartJetstream(ctx, spindle.ingest())
149 if err != nil {
150 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
151 }
152
153 // for each incoming sh.tangled.pipeline, we execute
154 // spindle.processPipeline, which in turn enqueues the pipeline
155 // job in the above registered queue.
156 ccfg := eventconsumer.NewConsumerConfig()
157 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
158 ccfg.Dev = cfg.Server.Dev
159 ccfg.ProcessFunc = spindle.processPipeline
160 ccfg.CursorStore = cursorStore
161 knownKnots, err := d.Knots()
162 if err != nil {
163 return nil, err
164 }
165 for _, knot := range knownKnots {
166 logger.Info("adding source start", "knot", knot)
167 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
168 }
169 spindle.ks = eventconsumer.NewConsumer(*ccfg)
170
171 return spindle, nil
172}
173
174// DB returns the database instance.
175func (s *Spindle) DB() *db.DB {
176 return s.db
177}
178
179// Queue returns the job queue instance.
180func (s *Spindle) Queue() *queue.Queue {
181 return s.jq
182}
183
184// Engines returns the map of available engines.
185func (s *Spindle) Engines() map[string]models.Engine {
186 return s.engs
187}
188
189// Vault returns the secrets manager instance.
190func (s *Spindle) Vault() secrets.Manager {
191 return s.vault
192}
193
194// Notifier returns the notifier instance.
195func (s *Spindle) Notifier() *notifier.Notifier {
196 return s.n
197}
198
199// Enforcer returns the RBAC enforcer instance.
200func (s *Spindle) Enforcer() *rbac.Enforcer {
201 return s.e
202}
203
204// Start starts the Spindle server (blocking).
205func (s *Spindle) Start(ctx context.Context) error {
206 // starts a job queue runner in the background
207 s.jq.Start()
208 defer s.jq.Stop()
209
210 // Stop vault token renewal if it implements Stopper
211 if stopper, ok := s.vault.(secrets.Stopper); ok {
212 defer stopper.Stop()
213 }
214
215 go func() {
216 s.l.Info("starting knot event consumer")
217 s.ks.Start(ctx)
218 }()
219
220 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
221 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
222}
223
224func Run(ctx context.Context) error {
225 cfg, err := config.Load(ctx)
226 if err != nil {
227 return fmt.Errorf("failed to load config: %w", err)
228 }
229
230 nixeryEng, err := nixery.New(ctx, cfg)
231 if err != nil {
232 return err
233 }
234
235 s, err := New(ctx, cfg, map[string]models.Engine{
236 "nixery": nixeryEng,
237 })
238 if err != nil {
239 return err
240 }
241
242 return s.Start(ctx)
243}
244
245func (s *Spindle) Router() http.Handler {
246 mux := chi.NewRouter()
247
248 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
249 w.Write(motd)
250 })
251 mux.HandleFunc("/events", s.Events)
252 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
253
254 mux.Mount("/xrpc", s.XrpcRouter())
255 return mux
256}
257
258func (s *Spindle) XrpcRouter() http.Handler {
259 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
260
261 l := log.SubLogger(s.l, "xrpc")
262
263 x := xrpc.Xrpc{
264 Logger: l,
265 Db: s.db,
266 Enforcer: s.e,
267 Engines: s.engs,
268 Config: s.cfg,
269 Resolver: s.res,
270 Vault: s.vault,
271 Notifier: s.Notifier(),
272 ServiceAuth: serviceAuth,
273 }
274
275 return x.Router()
276}
277
278func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
279 if msg.Nsid == tangled.PipelineNSID {
280 tpl := tangled.Pipeline{}
281 err := json.Unmarshal(msg.EventJson, &tpl)
282 if err != nil {
283 fmt.Println("error unmarshalling", err)
284 return err
285 }
286
287 if tpl.TriggerMetadata == nil {
288 return fmt.Errorf("no trigger metadata found")
289 }
290
291 if tpl.TriggerMetadata.Repo == nil {
292 return fmt.Errorf("no repo data found")
293 }
294
295 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
296 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
297 }
298
299 // filter by repos
300 _, err = s.db.GetRepo(
301 tpl.TriggerMetadata.Repo.Knot,
302 tpl.TriggerMetadata.Repo.Did,
303 tpl.TriggerMetadata.Repo.Repo,
304 )
305 if err != nil {
306 return fmt.Errorf("failed to get repo: %w", err)
307 }
308
309 pipelineId := models.PipelineId{
310 Knot: src.Key(),
311 Rkey: msg.Rkey,
312 }
313
314 workflows := make(map[models.Engine][]models.Workflow)
315
316 // Build pipeline environment variables once for all workflows
317 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
318
319 for _, w := range tpl.Workflows {
320 if w != nil {
321 if _, ok := s.engs[w.Engine]; !ok {
322 err = s.db.StatusFailed(models.WorkflowId{
323 PipelineId: pipelineId,
324 Name: w.Name,
325 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
326 if err != nil {
327 return fmt.Errorf("db.StatusFailed: %w", err)
328 }
329
330 continue
331 }
332
333 eng := s.engs[w.Engine]
334
335 if _, ok := workflows[eng]; !ok {
336 workflows[eng] = []models.Workflow{}
337 }
338
339 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
340 if err != nil {
341 return fmt.Errorf("init workflow: %w", err)
342 }
343
344 // inject TANGLED_* env vars after InitWorkflow
345 // This prevents user-defined env vars from overriding them
346 if ewf.Environment == nil {
347 ewf.Environment = make(map[string]string)
348 }
349 maps.Copy(ewf.Environment, pipelineEnv)
350
351 workflows[eng] = append(workflows[eng], *ewf)
352
353 err = s.db.StatusPending(models.WorkflowId{
354 PipelineId: pipelineId,
355 Name: w.Name,
356 }, s.n)
357 if err != nil {
358 return fmt.Errorf("db.StatusPending: %w", err)
359 }
360 }
361 }
362
363 ok := s.jq.Enqueue(queue.Job{
364 Run: func() error {
365 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
366 RepoOwner: tpl.TriggerMetadata.Repo.Did,
367 RepoName: tpl.TriggerMetadata.Repo.Repo,
368 Workflows: workflows,
369 }, pipelineId)
370 return nil
371 },
372 OnFail: func(jobError error) {
373 s.l.Error("pipeline run failed", "error", jobError)
374 },
375 })
376 if ok {
377 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
378 } else {
379 s.l.Error("failed to enqueue pipeline: queue is full")
380 }
381 }
382
383 return nil
384}
385
386func (s *Spindle) configureOwner() error {
387 cfgOwner := s.cfg.Server.Owner
388
389 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
390 if err != nil {
391 return err
392 }
393
394 switch len(existing) {
395 case 0:
396 // no owner configured, continue
397 case 1:
398 // find existing owner
399 existingOwner := existing[0]
400
401 // no ownership change, this is okay
402 if existingOwner == s.cfg.Server.Owner {
403 break
404 }
405
406 // remove existing owner
407 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
408 if err != nil {
409 return nil
410 }
411 default:
412 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
413 }
414
415 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
416}