1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "time"
11
12 "github.com/go-chi/chi/v5"
13 "github.com/go-co-op/gocron/v2"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/jetstream"
20 "tangled.org/core/log"
21 "tangled.org/core/notifier"
22 "tangled.org/core/rbac"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/engines/nixery"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/queue"
29 "tangled.org/core/spindle/secrets"
30 "tangled.org/core/spindle/xrpc"
31 "tangled.org/core/xrpc/serviceauth"
32)
33
34//go:embed motd
35var motd []byte
36
37const (
38 rbacDomain = "thisserver"
39)
40
41type Spindle struct {
42 jc *jetstream.JetstreamClient
43 db *db.DB
44 e *rbac.Enforcer
45 l *slog.Logger
46 n *notifier.Notifier
47 engs map[string]models.Engine
48 jq *queue.Queue
49 cfg *config.Config
50 ks *eventconsumer.Consumer
51 res *idresolver.Resolver
52 vault secrets.Manager
53 crons gocron.Scheduler
54}
55
56// New creates a new Spindle server with the provided configuration and engines.
57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
58 logger := log.FromContext(ctx)
59
60 d, err := db.Make(cfg.Server.DBPath)
61 if err != nil {
62 return nil, fmt.Errorf("failed to setup db: %w", err)
63 }
64
65 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
66 if err != nil {
67 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
68 }
69 e.E.EnableAutoSave(true)
70
71 n := notifier.New()
72
73 var vault secrets.Manager
74 switch cfg.Server.Secrets.Provider {
75 case "openbao":
76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78 }
79 vault, err = secrets.NewOpenBaoManager(
80 cfg.Server.Secrets.OpenBao.ProxyAddr,
81 logger,
82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83 )
84 if err != nil {
85 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86 }
87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88 case "sqlite", "":
89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
90 if err != nil {
91 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92 }
93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
94 default:
95 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96 }
97
98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
99 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
100
101 collections := []string{
102 tangled.SpindleMemberNSID,
103 tangled.RepoNSID,
104 tangled.RepoCollaboratorNSID,
105 }
106 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
107 if err != nil {
108 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
109 }
110 jc.AddDid(cfg.Server.Owner)
111
112 // Check if the spindle knows about any Dids;
113 dids, err := d.GetAllDids()
114 if err != nil {
115 return nil, fmt.Errorf("failed to get all dids: %w", err)
116 }
117 for _, d := range dids {
118 jc.AddDid(d)
119 }
120
121 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
122
123 scheduler, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))
124 if err != nil {
125 return nil, fmt.Errorf("failed to create cron scheduler: %w", err)
126 }
127 defer func() { _ = scheduler.Shutdown() }()
128
129 spindle := &Spindle{
130 jc: jc,
131 e: e,
132 db: d,
133 l: logger,
134 n: &n,
135 engs: engines,
136 jq: jq,
137 cfg: cfg,
138 res: resolver,
139 vault: vault,
140 crons: scheduler,
141 }
142
143 err = e.AddSpindle(rbacDomain)
144 if err != nil {
145 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
146 }
147 err = spindle.configureOwner()
148 if err != nil {
149 return nil, err
150 }
151 logger.Info("owner set", "did", cfg.Server.Owner)
152
153 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
154 if err != nil {
155 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
156 }
157
158 err = jc.StartJetstream(ctx, spindle.ingest())
159 if err != nil {
160 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
161 }
162
163 // for each incoming sh.tangled.pipeline, we execute
164 // spindle.processPipeline, which in turn enqueues the pipeline
165 // job in the above registered queue.
166 ccfg := eventconsumer.NewConsumerConfig()
167 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
168 ccfg.Dev = cfg.Server.Dev
169 ccfg.ProcessFunc = spindle.processPipeline
170 ccfg.CursorStore = cursorStore
171 knownKnots, err := d.Knots()
172 if err != nil {
173 return nil, err
174 }
175 for _, knot := range knownKnots {
176 logger.Info("adding source start", "knot", knot)
177 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
178 }
179 spindle.ks = eventconsumer.NewConsumer(*ccfg)
180
181 return spindle, nil
182}
183
184// DB returns the database instance.
185func (s *Spindle) DB() *db.DB {
186 return s.db
187}
188
189// Queue returns the job queue instance.
190func (s *Spindle) Queue() *queue.Queue {
191 return s.jq
192}
193
194// Engines returns the map of available engines.
195func (s *Spindle) Engines() map[string]models.Engine {
196 return s.engs
197}
198
199// Vault returns the secrets manager instance.
200func (s *Spindle) Vault() secrets.Manager {
201 return s.vault
202}
203
204// Notifier returns the notifier instance.
205func (s *Spindle) Notifier() *notifier.Notifier {
206 return s.n
207}
208
209// Enforcer returns the RBAC enforcer instance.
210func (s *Spindle) Enforcer() *rbac.Enforcer {
211 return s.e
212}
213
214// Scheduler returns the cron scheduler instance.
215func (s *Spindle) Scheduler() *gocron.Scheduler {
216 return &s.crons
217}
218
219// Start starts the Spindle server (blocking).
220func (s *Spindle) Start(ctx context.Context) error {
221 // starts a job queue runner in the background
222 s.jq.Start()
223 defer s.jq.Stop()
224
225 // Stop vault token renewal if it implements Stopper
226 if stopper, ok := s.vault.(secrets.Stopper); ok {
227 defer stopper.Stop()
228 }
229
230 s.crons.Start()
231
232 go func() {
233 s.l.Info("starting knot event consumer")
234 s.ks.Start(ctx)
235 }()
236
237 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
238 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
239}
240
241func Run(ctx context.Context) error {
242 cfg, err := config.Load(ctx)
243 if err != nil {
244 return fmt.Errorf("failed to load config: %w", err)
245 }
246
247 nixeryEng, err := nixery.New(ctx, cfg)
248 if err != nil {
249 return err
250 }
251
252 s, err := New(ctx, cfg, map[string]models.Engine{
253 "nixery": nixeryEng,
254 })
255 if err != nil {
256 return err
257 }
258
259 return s.Start(ctx)
260}
261
262func (s *Spindle) Router() http.Handler {
263 mux := chi.NewRouter()
264
265 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
266 w.Write(motd)
267 })
268 mux.HandleFunc("/events", s.Events)
269 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
270
271 mux.Mount("/xrpc", s.XrpcRouter())
272 return mux
273}
274
275func (s *Spindle) XrpcRouter() http.Handler {
276 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
277
278 l := log.SubLogger(s.l, "xrpc")
279
280 x := xrpc.Xrpc{
281 Logger: l,
282 Db: s.db,
283 Enforcer: s.e,
284 Engines: s.engs,
285 Config: s.cfg,
286 Resolver: s.res,
287 Vault: s.vault,
288 ServiceAuth: serviceAuth,
289 }
290
291 return x.Router()
292}
293
294func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
295 if msg.Nsid == tangled.PipelineNSID {
296 tpl := tangled.Pipeline{}
297 err := json.Unmarshal(msg.EventJson, &tpl)
298 if err != nil {
299 fmt.Println("error unmarshalling", err)
300 return err
301 }
302
303 if tpl.TriggerMetadata == nil {
304 return fmt.Errorf("no trigger metadata found")
305 }
306
307 if tpl.TriggerMetadata.Repo == nil {
308 return fmt.Errorf("no repo data found")
309 }
310
311 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
312 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
313 }
314
315 // filter by repos
316 _, err = s.db.GetRepo(
317 tpl.TriggerMetadata.Repo.Knot,
318 tpl.TriggerMetadata.Repo.Did,
319 tpl.TriggerMetadata.Repo.Repo,
320 )
321 if err != nil {
322 return err
323 }
324
325 pipelineId := models.PipelineId{
326 Knot: src.Key(),
327 Rkey: msg.Rkey,
328 }
329
330 workflows := make(map[models.Engine][]models.Workflow)
331
332 for _, w := range tpl.Workflows {
333 if w != nil {
334 if _, ok := s.engs[w.Engine]; !ok {
335 err = s.db.StatusFailed(models.WorkflowId{
336 PipelineId: pipelineId,
337 Name: w.Name,
338 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
339 if err != nil {
340 return err
341 }
342
343 continue
344 }
345
346 eng := s.engs[w.Engine]
347
348 if _, ok := workflows[eng]; !ok {
349 workflows[eng] = []models.Workflow{}
350 }
351
352 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
353 if err != nil {
354 return err
355 }
356
357 workflows[eng] = append(workflows[eng], *ewf)
358
359 err = s.db.StatusPending(models.WorkflowId{
360 PipelineId: pipelineId,
361 Name: w.Name,
362 }, s.n)
363 if err != nil {
364 return err
365 }
366 }
367 }
368
369 ok := s.jq.Enqueue(queue.Job{
370 Run: func() error {
371 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
372 RepoOwner: tpl.TriggerMetadata.Repo.Did,
373 RepoName: tpl.TriggerMetadata.Repo.Repo,
374 Workflows: workflows,
375 }, pipelineId)
376 return nil
377 },
378 OnFail: func(jobError error) {
379 s.l.Error("pipeline run failed", "error", jobError)
380 },
381 })
382 if ok {
383 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
384 } else {
385 s.l.Error("failed to enqueue pipeline: queue is full")
386 }
387 }
388
389 return nil
390}
391
392func (s *Spindle) configureOwner() error {
393 cfgOwner := s.cfg.Server.Owner
394
395 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
396 if err != nil {
397 return err
398 }
399
400 switch len(existing) {
401 case 0:
402 // no owner configured, continue
403 case 1:
404 // find existing owner
405 existingOwner := existing[0]
406
407 // no ownership change, this is okay
408 if existingOwner == s.cfg.Server.Owner {
409 break
410 }
411
412 // remove existing owner
413 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
414 if err != nil {
415 return nil
416 }
417 default:
418 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
419 }
420
421 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
422}