forked from
tangled.org/core
Monorepo for Tangled
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/jetstream"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/queue"
28 "tangled.org/core/spindle/secrets"
29 "tangled.org/core/spindle/xrpc"
30 "tangled.org/core/xrpc/serviceauth"
31)
32
33//go:embed motd
34var defaultMotd []byte
35
36const (
37 rbacDomain = "thisserver"
38
39 // devPDSURL is the default PDS endpoint for local development.
40 // In dev mode, PLC/PDS/Jetstream all run on localhost inside the VM.
41 devPDSURL = "http://localhost:2583"
42)
43
44type Spindle struct {
45 jc *jetstream.JetstreamClient
46 db *db.DB
47 e *rbac.Enforcer
48 l *slog.Logger
49 n *notifier.Notifier
50 engs map[string]models.Engine
51 jq *queue.Queue
52 cfg *config.Config
53 ks *eventconsumer.Consumer
54 res *idresolver.Resolver
55 vault secrets.Manager
56 motd []byte
57 motdMu sync.RWMutex
58}
59
60// New creates a new Spindle server with the provided configuration and engines.
61func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
62 logger := log.FromContext(ctx)
63
64 d, err := db.Make(cfg.Server.DBPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to setup db: %w", err)
67 }
68
69 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
70 if err != nil {
71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
72 }
73 e.E.EnableAutoSave(true)
74
75 n := notifier.New()
76
77 var vault secrets.Manager
78 switch cfg.Server.Secrets.Provider {
79 case "openbao":
80 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
81 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
82 }
83 vault, err = secrets.NewOpenBaoManager(
84 cfg.Server.Secrets.OpenBao.ProxyAddr,
85 logger,
86 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
87 )
88 if err != nil {
89 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
90 }
91 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
92 case "sqlite", "":
93 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
94 if err != nil {
95 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
96 }
97 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
98 default:
99 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
100 }
101
102 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
103 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
104
105 collections := []string{
106 tangled.SpindleMemberNSID,
107 tangled.RepoNSID,
108 tangled.RepoCollaboratorNSID,
109 }
110 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
111 if err != nil {
112 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
113 }
114 jc.AddDid(cfg.Server.Owner)
115
116 // Check if the spindle knows about any Dids;
117 dids, err := d.GetAllDids()
118 if err != nil {
119 return nil, fmt.Errorf("failed to get all dids: %w", err)
120 }
121 for _, d := range dids {
122 jc.AddDid(d)
123 }
124
125 var resolver *idresolver.Resolver
126 if cfg.Server.Dev {
127 resolver = idresolver.DefaultDevResolver(cfg.Server.PlcUrl, devPDSURL)
128 } else {
129 resolver = idresolver.DefaultResolver(cfg.Server.PlcUrl)
130 }
131
132 spindle := &Spindle{
133 jc: jc,
134 e: e,
135 db: d,
136 l: logger,
137 n: &n,
138 engs: engines,
139 jq: jq,
140 cfg: cfg,
141 res: resolver,
142 vault: vault,
143 motd: defaultMotd,
144 }
145
146 err = e.AddSpindle(rbacDomain)
147 if err != nil {
148 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
149 }
150 err = spindle.configureOwner()
151 if err != nil {
152 return nil, err
153 }
154 logger.Info("owner set", "did", cfg.Server.Owner)
155
156 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
157 if err != nil {
158 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
159 }
160
161 err = jc.StartJetstream(ctx, spindle.ingest())
162 if err != nil {
163 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
164 }
165
166 // for each incoming sh.tangled.pipeline, we execute
167 // spindle.processPipeline, which in turn enqueues the pipeline
168 // job in the above registered queue.
169 ccfg := eventconsumer.NewConsumerConfig()
170 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
171 ccfg.Dev = cfg.Server.Dev
172 ccfg.ProcessFunc = spindle.processPipeline
173 ccfg.CursorStore = cursorStore
174 knownKnots, err := d.Knots()
175 if err != nil {
176 return nil, err
177 }
178 for _, knot := range knownKnots {
179 logger.Info("adding source start", "knot", knot)
180 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
181 }
182 spindle.ks = eventconsumer.NewConsumer(*ccfg)
183
184 return spindle, nil
185}
186
187// DB returns the database instance.
188func (s *Spindle) DB() *db.DB {
189 return s.db
190}
191
192// Queue returns the job queue instance.
193func (s *Spindle) Queue() *queue.Queue {
194 return s.jq
195}
196
197// Engines returns the map of available engines.
198func (s *Spindle) Engines() map[string]models.Engine {
199 return s.engs
200}
201
202// Vault returns the secrets manager instance.
203func (s *Spindle) Vault() secrets.Manager {
204 return s.vault
205}
206
207// Notifier returns the notifier instance.
208func (s *Spindle) Notifier() *notifier.Notifier {
209 return s.n
210}
211
212// Enforcer returns the RBAC enforcer instance.
213func (s *Spindle) Enforcer() *rbac.Enforcer {
214 return s.e
215}
216
217// SetMotdContent sets custom MOTD content, replacing the embedded default.
218func (s *Spindle) SetMotdContent(content []byte) {
219 s.motdMu.Lock()
220 defer s.motdMu.Unlock()
221 s.motd = content
222}
223
224// GetMotdContent returns the current MOTD content.
225func (s *Spindle) GetMotdContent() []byte {
226 s.motdMu.RLock()
227 defer s.motdMu.RUnlock()
228 return s.motd
229}
230
231// Start starts the Spindle server (blocking).
232func (s *Spindle) Start(ctx context.Context) error {
233 // starts a job queue runner in the background
234 s.jq.Start()
235 defer s.jq.Stop()
236
237 // Stop vault token renewal if it implements Stopper
238 if stopper, ok := s.vault.(secrets.Stopper); ok {
239 defer stopper.Stop()
240 }
241
242 go func() {
243 s.l.Info("starting knot event consumer")
244 s.ks.Start(ctx)
245 }()
246
247 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
248 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
249}
250
251func Run(ctx context.Context) error {
252 cfg, err := config.Load(ctx)
253 if err != nil {
254 return fmt.Errorf("failed to load config: %w", err)
255 }
256
257 nixeryEng, err := nixery.New(ctx, cfg)
258 if err != nil {
259 return err
260 }
261
262 s, err := New(ctx, cfg, map[string]models.Engine{
263 "nixery": nixeryEng,
264 })
265 if err != nil {
266 return err
267 }
268
269 return s.Start(ctx)
270}
271
272func (s *Spindle) Router() http.Handler {
273 mux := chi.NewRouter()
274
275 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
276 w.Write(s.GetMotdContent())
277 })
278 mux.HandleFunc("/events", s.Events)
279 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
280
281 mux.Mount("/xrpc", s.XrpcRouter())
282 return mux
283}
284
285func (s *Spindle) XrpcRouter() http.Handler {
286 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
287
288 l := log.SubLogger(s.l, "xrpc")
289
290 x := xrpc.Xrpc{
291 Logger: l,
292 Db: s.db,
293 Enforcer: s.e,
294 Engines: s.engs,
295 Config: s.cfg,
296 Resolver: s.res,
297 Vault: s.vault,
298 Notifier: s.Notifier(),
299 ServiceAuth: serviceAuth,
300 }
301
302 return x.Router()
303}
304
305func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
306 if msg.Nsid == tangled.PipelineNSID {
307 tpl := tangled.Pipeline{}
308 err := json.Unmarshal(msg.EventJson, &tpl)
309 if err != nil {
310 fmt.Println("error unmarshalling", err)
311 return err
312 }
313
314 if tpl.TriggerMetadata == nil {
315 return fmt.Errorf("no trigger metadata found")
316 }
317
318 if tpl.TriggerMetadata.Repo == nil {
319 return fmt.Errorf("no repo data found")
320 }
321
322 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
323 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
324 }
325
326 // filter by repos
327 _, err = s.db.GetRepo(
328 tpl.TriggerMetadata.Repo.Knot,
329 tpl.TriggerMetadata.Repo.Did,
330 tpl.TriggerMetadata.Repo.Repo,
331 )
332 if err != nil {
333 return fmt.Errorf("failed to get repo: %w", err)
334 }
335
336 pipelineId := models.PipelineId{
337 Knot: src.Key(),
338 Rkey: msg.Rkey,
339 }
340
341 workflows := make(map[models.Engine][]models.Workflow)
342
343 // Build pipeline environment variables once for all workflows
344 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
345
346 for _, w := range tpl.Workflows {
347 if w != nil {
348 if _, ok := s.engs[w.Engine]; !ok {
349 err = s.db.StatusFailed(models.WorkflowId{
350 PipelineId: pipelineId,
351 Name: w.Name,
352 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
353 if err != nil {
354 return fmt.Errorf("db.StatusFailed: %w", err)
355 }
356
357 continue
358 }
359
360 eng := s.engs[w.Engine]
361
362 if _, ok := workflows[eng]; !ok {
363 workflows[eng] = []models.Workflow{}
364 }
365
366 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
367 if err != nil {
368 return fmt.Errorf("init workflow: %w", err)
369 }
370
371 // inject TANGLED_* env vars after InitWorkflow
372 // This prevents user-defined env vars from overriding them
373 if ewf.Environment == nil {
374 ewf.Environment = make(map[string]string)
375 }
376 maps.Copy(ewf.Environment, pipelineEnv)
377
378 workflows[eng] = append(workflows[eng], *ewf)
379
380 err = s.db.StatusPending(models.WorkflowId{
381 PipelineId: pipelineId,
382 Name: w.Name,
383 }, s.n)
384 if err != nil {
385 return fmt.Errorf("db.StatusPending: %w", err)
386 }
387 }
388 }
389
390 ok := s.jq.Enqueue(queue.Job{
391 Run: func() error {
392 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
393 RepoOwner: tpl.TriggerMetadata.Repo.Did,
394 RepoName: tpl.TriggerMetadata.Repo.Repo,
395 Workflows: workflows,
396 }, pipelineId)
397 return nil
398 },
399 OnFail: func(jobError error) {
400 s.l.Error("pipeline run failed", "error", jobError)
401 },
402 })
403 if ok {
404 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
405 } else {
406 s.l.Error("failed to enqueue pipeline: queue is full")
407 }
408 }
409
410 return nil
411}
412
413func (s *Spindle) configureOwner() error {
414 cfgOwner := s.cfg.Server.Owner
415
416 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
417 if err != nil {
418 return err
419 }
420
421 switch len(existing) {
422 case 0:
423 // no owner configured, continue
424 case 1:
425 // find existing owner
426 existingOwner := existing[0]
427
428 // no ownership change, this is okay
429 if existingOwner == s.cfg.Server.Owner {
430 break
431 }
432
433 // remove existing owner
434 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
435 if err != nil {
436 return nil
437 }
438 default:
439 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
440 }
441
442 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
443}