Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/jetstream"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/queue"
28 "tangled.org/core/spindle/secrets"
29 "tangled.org/core/spindle/xrpc"
30 "tangled.org/core/xrpc/serviceauth"
31)
32
33//go:embed motd
34var defaultMotd []byte
35
36const (
37 rbacDomain = "thisserver"
38)
39
40type Spindle struct {
41 jc *jetstream.JetstreamClient
42 db *db.DB
43 e *rbac.Enforcer
44 l *slog.Logger
45 n *notifier.Notifier
46 engs map[string]models.Engine
47 jq *queue.Queue
48 cfg *config.Config
49 ks *eventconsumer.Consumer
50 res *idresolver.Resolver
51 vault secrets.Manager
52 motd []byte
53 motdMu sync.RWMutex
54}
55
56// New creates a new Spindle server with the provided configuration and engines.
57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
58 logger := log.FromContext(ctx)
59
60 d, err := db.Make(cfg.Server.DBPath)
61 if err != nil {
62 return nil, fmt.Errorf("failed to setup db: %w", err)
63 }
64
65 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
66 if err != nil {
67 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
68 }
69 e.E.EnableAutoSave(true)
70
71 n := notifier.New()
72
73 var vault secrets.Manager
74 switch cfg.Server.Secrets.Provider {
75 case "openbao":
76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78 }
79 vault, err = secrets.NewOpenBaoManager(
80 cfg.Server.Secrets.OpenBao.ProxyAddr,
81 logger,
82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83 )
84 if err != nil {
85 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86 }
87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88 case "sqlite", "":
89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
90 if err != nil {
91 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92 }
93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
94 default:
95 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96 }
97
98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
99 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
100
101 collections := []string{
102 tangled.SpindleMemberNSID,
103 tangled.RepoNSID,
104 tangled.RepoCollaboratorNSID,
105 }
106 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
107 if err != nil {
108 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
109 }
110 jc.AddDid(cfg.Server.Owner)
111
112 // Check if the spindle knows about any Dids;
113 dids, err := d.GetAllDids()
114 if err != nil {
115 return nil, fmt.Errorf("failed to get all dids: %w", err)
116 }
117 for _, d := range dids {
118 jc.AddDid(d)
119 }
120
121 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
122
123 spindle := &Spindle{
124 jc: jc,
125 e: e,
126 db: d,
127 l: logger,
128 n: &n,
129 engs: engines,
130 jq: jq,
131 cfg: cfg,
132 res: resolver,
133 vault: vault,
134 motd: defaultMotd,
135 }
136
137 err = e.AddSpindle(rbacDomain)
138 if err != nil {
139 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
140 }
141 err = spindle.configureOwner()
142 if err != nil {
143 return nil, err
144 }
145 logger.Info("owner set", "did", cfg.Server.Owner)
146
147 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
148 if err != nil {
149 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
150 }
151
152 err = jc.StartJetstream(ctx, spindle.ingest())
153 if err != nil {
154 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
155 }
156
157 // for each incoming sh.tangled.pipeline, we execute
158 // spindle.processPipeline, which in turn enqueues the pipeline
159 // job in the above registered queue.
160 ccfg := eventconsumer.NewConsumerConfig()
161 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
162 ccfg.Dev = cfg.Server.Dev
163 ccfg.ProcessFunc = spindle.processPipeline
164 ccfg.CursorStore = cursorStore
165 knownKnots, err := d.Knots()
166 if err != nil {
167 return nil, err
168 }
169 for _, knot := range knownKnots {
170 logger.Info("adding source start", "knot", knot)
171 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
172 }
173 spindle.ks = eventconsumer.NewConsumer(*ccfg)
174
175 return spindle, nil
176}
177
178// DB returns the database instance.
179func (s *Spindle) DB() *db.DB {
180 return s.db
181}
182
183// Queue returns the job queue instance.
184func (s *Spindle) Queue() *queue.Queue {
185 return s.jq
186}
187
188// Engines returns the map of available engines.
189func (s *Spindle) Engines() map[string]models.Engine {
190 return s.engs
191}
192
193// Vault returns the secrets manager instance.
194func (s *Spindle) Vault() secrets.Manager {
195 return s.vault
196}
197
198// Notifier returns the notifier instance.
199func (s *Spindle) Notifier() *notifier.Notifier {
200 return s.n
201}
202
203// Enforcer returns the RBAC enforcer instance.
204func (s *Spindle) Enforcer() *rbac.Enforcer {
205 return s.e
206}
207
208// SetMotdContent sets custom MOTD content, replacing the embedded default.
209func (s *Spindle) SetMotdContent(content []byte) {
210 s.motdMu.Lock()
211 defer s.motdMu.Unlock()
212 s.motd = content
213}
214
215// GetMotdContent returns the current MOTD content.
216func (s *Spindle) GetMotdContent() []byte {
217 s.motdMu.RLock()
218 defer s.motdMu.RUnlock()
219 return s.motd
220}
221
222// Start starts the Spindle server (blocking).
223func (s *Spindle) Start(ctx context.Context) error {
224 // starts a job queue runner in the background
225 s.jq.Start()
226 defer s.jq.Stop()
227
228 // Stop vault token renewal if it implements Stopper
229 if stopper, ok := s.vault.(secrets.Stopper); ok {
230 defer stopper.Stop()
231 }
232
233 go func() {
234 s.l.Info("starting knot event consumer")
235 s.ks.Start(ctx)
236 }()
237
238 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
239 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
240}
241
242func Run(ctx context.Context) error {
243 cfg, err := config.Load(ctx)
244 if err != nil {
245 return fmt.Errorf("failed to load config: %w", err)
246 }
247
248 nixeryEng, err := nixery.New(ctx, cfg)
249 if err != nil {
250 return err
251 }
252
253 s, err := New(ctx, cfg, map[string]models.Engine{
254 "nixery": nixeryEng,
255 })
256 if err != nil {
257 return err
258 }
259
260 return s.Start(ctx)
261}
262
263func (s *Spindle) Router() http.Handler {
264 mux := chi.NewRouter()
265
266 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
267 w.Write(s.GetMotdContent())
268 })
269 mux.HandleFunc("/events", s.Events)
270 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
271
272 mux.Mount("/xrpc", s.XrpcRouter())
273 return mux
274}
275
276func (s *Spindle) XrpcRouter() http.Handler {
277 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
278
279 l := log.SubLogger(s.l, "xrpc")
280
281 x := xrpc.Xrpc{
282 Logger: l,
283 Db: s.db,
284 Enforcer: s.e,
285 Engines: s.engs,
286 Config: s.cfg,
287 Resolver: s.res,
288 Vault: s.vault,
289 ServiceAuth: serviceAuth,
290 }
291
292 return x.Router()
293}
294
295func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
296 if msg.Nsid == tangled.PipelineNSID {
297 tpl := tangled.Pipeline{}
298 err := json.Unmarshal(msg.EventJson, &tpl)
299 if err != nil {
300 fmt.Println("error unmarshalling", err)
301 return err
302 }
303
304 if tpl.TriggerMetadata == nil {
305 return fmt.Errorf("no trigger metadata found")
306 }
307
308 if tpl.TriggerMetadata.Repo == nil {
309 return fmt.Errorf("no repo data found")
310 }
311
312 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
313 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
314 }
315
316 // filter by repos
317 _, err = s.db.GetRepo(
318 tpl.TriggerMetadata.Repo.Knot,
319 tpl.TriggerMetadata.Repo.Did,
320 tpl.TriggerMetadata.Repo.Repo,
321 )
322 if err != nil {
323 return err
324 }
325
326 pipelineId := models.PipelineId{
327 Knot: src.Key(),
328 Rkey: msg.Rkey,
329 }
330
331 workflows := make(map[models.Engine][]models.Workflow)
332
333 // Build pipeline environment variables once for all workflows
334 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
335
336 for _, w := range tpl.Workflows {
337 if w != nil {
338 if _, ok := s.engs[w.Engine]; !ok {
339 err = s.db.StatusFailed(models.WorkflowId{
340 PipelineId: pipelineId,
341 Name: w.Name,
342 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
343 if err != nil {
344 return err
345 }
346
347 continue
348 }
349
350 eng := s.engs[w.Engine]
351
352 if _, ok := workflows[eng]; !ok {
353 workflows[eng] = []models.Workflow{}
354 }
355
356 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
357 if err != nil {
358 return err
359 }
360
361 // inject TANGLED_* env vars after InitWorkflow
362 // This prevents user-defined env vars from overriding them
363 if ewf.Environment == nil {
364 ewf.Environment = make(map[string]string)
365 }
366 maps.Copy(ewf.Environment, pipelineEnv)
367
368 workflows[eng] = append(workflows[eng], *ewf)
369
370 err = s.db.StatusPending(models.WorkflowId{
371 PipelineId: pipelineId,
372 Name: w.Name,
373 }, s.n)
374 if err != nil {
375 return err
376 }
377 }
378 }
379
380 ok := s.jq.Enqueue(queue.Job{
381 Run: func() error {
382 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
383 RepoOwner: tpl.TriggerMetadata.Repo.Did,
384 RepoName: tpl.TriggerMetadata.Repo.Repo,
385 Workflows: workflows,
386 }, pipelineId)
387 return nil
388 },
389 OnFail: func(jobError error) {
390 s.l.Error("pipeline run failed", "error", jobError)
391 },
392 })
393 if ok {
394 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
395 } else {
396 s.l.Error("failed to enqueue pipeline: queue is full")
397 }
398 }
399
400 return nil
401}
402
403func (s *Spindle) configureOwner() error {
404 cfgOwner := s.cfg.Server.Owner
405
406 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
407 if err != nil {
408 return err
409 }
410
411 switch len(existing) {
412 case 0:
413 // no owner configured, continue
414 case 1:
415 // find existing owner
416 existingOwner := existing[0]
417
418 // no ownership change, this is okay
419 if existingOwner == s.cfg.Server.Owner {
420 break
421 }
422
423 // remove existing owner
424 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
425 if err != nil {
426 return nil
427 }
428 default:
429 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
430 }
431
432 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
433}