1package spindle
2
3import (
4 "bytes"
5 "context"
6 _ "embed"
7 "encoding/json"
8 "fmt"
9 "log/slog"
10 "maps"
11 "net/http"
12 "os"
13 "os/exec"
14 "path"
15 "strings"
16
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 "github.com/go-chi/chi/v5"
19 "github.com/hashicorp/go-version"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/eventconsumer"
22 "tangled.org/core/eventconsumer/cursor"
23 "tangled.org/core/idresolver"
24 "tangled.org/core/log"
25 "tangled.org/core/notifier"
26 "tangled.org/core/rbac2"
27 "tangled.org/core/spindle/config"
28 "tangled.org/core/spindle/db"
29 "tangled.org/core/spindle/engine"
30 "tangled.org/core/spindle/engines/nixery"
31 "tangled.org/core/spindle/models"
32 "tangled.org/core/spindle/queue"
33 "tangled.org/core/spindle/secrets"
34 "tangled.org/core/spindle/xrpc"
35 "tangled.org/core/tap"
36 "tangled.org/core/xrpc/serviceauth"
37)
38
39//go:embed motd
40var motd []byte
41
42type Spindle struct {
43 tap *tap.Client
44 db *db.DB
45 e *rbac2.Enforcer
46 l *slog.Logger
47 n *notifier.Notifier
48 engs map[string]models.Engine
49 jq *queue.Queue
50 cfg *config.Config
51 ks *eventconsumer.Consumer
52 res *idresolver.Resolver
53 vault secrets.Manager
54}
55
56// New creates a new Spindle server with the provided configuration and engines.
57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
58 logger := log.FromContext(ctx)
59
60 if err := ensureGitVersion(); err != nil {
61 return nil, fmt.Errorf("ensuring git version: %w", err)
62 }
63
64 d, err := db.Make(ctx, cfg.Server.DBPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to setup db: %w", err)
67 }
68
69 e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
70 if err != nil {
71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
72 }
73
74 n := notifier.New()
75
76 var vault secrets.Manager
77 switch cfg.Server.Secrets.Provider {
78 case "openbao":
79 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
80 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
81 }
82 vault, err = secrets.NewOpenBaoManager(
83 cfg.Server.Secrets.OpenBao.ProxyAddr,
84 logger,
85 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
86 )
87 if err != nil {
88 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
89 }
90 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
91 case "sqlite", "":
92 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
93 if err != nil {
94 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
95 }
96 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
97 default:
98 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
99 }
100
101 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
102 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
103
104 tap := tap.NewClient(cfg.Server.TapUrl, "")
105
106 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
107
108 spindle := &Spindle{
109 tap: &tap,
110 e: e,
111 db: d,
112 l: logger,
113 n: &n,
114 engs: engines,
115 jq: jq,
116 cfg: cfg,
117 res: resolver,
118 vault: vault,
119 }
120
121 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
122 if err != nil {
123 return nil, err
124 }
125 logger.Info("owner set", "did", cfg.Server.Owner)
126
127 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
128 if err != nil {
129 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
130 }
131
132 // for each incoming sh.tangled.pipeline, we execute
133 // spindle.processPipeline, which in turn enqueues the pipeline
134 // job in the above registered queue.
135 ccfg := eventconsumer.NewConsumerConfig()
136 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
137 ccfg.Dev = cfg.Server.Dev
138 ccfg.ProcessFunc = spindle.processPipeline
139 ccfg.CursorStore = cursorStore
140 knownKnots, err := d.Knots()
141 if err != nil {
142 return nil, err
143 }
144 for _, knot := range knownKnots {
145 logger.Info("adding source start", "knot", knot)
146 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
147 }
148 spindle.ks = eventconsumer.NewConsumer(*ccfg)
149
150 return spindle, nil
151}
152
153// DB returns the database instance.
154func (s *Spindle) DB() *db.DB {
155 return s.db
156}
157
158// Queue returns the job queue instance.
159func (s *Spindle) Queue() *queue.Queue {
160 return s.jq
161}
162
163// Engines returns the map of available engines.
164func (s *Spindle) Engines() map[string]models.Engine {
165 return s.engs
166}
167
168// Vault returns the secrets manager instance.
169func (s *Spindle) Vault() secrets.Manager {
170 return s.vault
171}
172
173// Notifier returns the notifier instance.
174func (s *Spindle) Notifier() *notifier.Notifier {
175 return s.n
176}
177
178// Enforcer returns the RBAC enforcer instance.
179func (s *Spindle) Enforcer() *rbac2.Enforcer {
180 return s.e
181}
182
183// Start starts the Spindle server (blocking).
184func (s *Spindle) Start(ctx context.Context) error {
185 // starts a job queue runner in the background
186 s.jq.Start()
187 defer s.jq.Stop()
188
189 // Stop vault token renewal if it implements Stopper
190 if stopper, ok := s.vault.(secrets.Stopper); ok {
191 defer stopper.Stop()
192 }
193
194 go func() {
195 s.l.Info("starting knot event consumer")
196 s.ks.Start(ctx)
197 }()
198
199 go func() {
200 s.l.Info("starting tap stream consumer")
201 s.tap.Connect(ctx, &tap.SimpleIndexer{
202 EventHandler: s.processEvent,
203 })
204 }()
205
206 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
207 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
208}
209
210func Run(ctx context.Context) error {
211 cfg, err := config.Load(ctx)
212 if err != nil {
213 return fmt.Errorf("failed to load config: %w", err)
214 }
215
216 nixeryEng, err := nixery.New(ctx, cfg)
217 if err != nil {
218 return err
219 }
220
221 s, err := New(ctx, cfg, map[string]models.Engine{
222 "nixery": nixeryEng,
223 })
224 if err != nil {
225 return err
226 }
227
228 return s.Start(ctx)
229}
230
231func (s *Spindle) Router() http.Handler {
232 mux := chi.NewRouter()
233
234 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
235 w.Write(motd)
236 })
237 mux.HandleFunc("/events", s.Events)
238 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
239
240 mux.Mount("/xrpc", s.XrpcRouter())
241 return mux
242}
243
244func (s *Spindle) XrpcRouter() http.Handler {
245 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
246
247 l := log.SubLogger(s.l, "xrpc")
248
249 x := xrpc.Xrpc{
250 Logger: l,
251 Db: s.db,
252 Enforcer: s.e,
253 Engines: s.engs,
254 Config: s.cfg,
255 Resolver: s.res,
256 Vault: s.vault,
257 Notifier: s.Notifier(),
258 ServiceAuth: serviceAuth,
259 }
260
261 return x.Router()
262}
263
264func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
265 l := log.FromContext(ctx).With("handler", "processKnotStream")
266 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
267 if msg.Nsid == tangled.PipelineNSID {
268 return nil
269 tpl := tangled.Pipeline{}
270 err := json.Unmarshal(msg.EventJson, &tpl)
271 if err != nil {
272 fmt.Println("error unmarshalling", err)
273 return err
274 }
275
276 if tpl.TriggerMetadata == nil {
277 return fmt.Errorf("no trigger metadata found")
278 }
279
280 if tpl.TriggerMetadata.Repo == nil {
281 return fmt.Errorf("no repo data found")
282 }
283
284 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
285 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
286 }
287
288 // filter by repos
289 _, err = s.db.GetRepoWithName(
290 syntax.DID(tpl.TriggerMetadata.Repo.Did),
291 tpl.TriggerMetadata.Repo.Repo,
292 )
293 if err != nil {
294 return fmt.Errorf("failed to get repo: %w", err)
295 }
296
297 pipelineId := models.PipelineId{
298 Knot: src.Key(),
299 Rkey: msg.Rkey,
300 }
301
302 workflows := make(map[models.Engine][]models.Workflow)
303
304 // Build pipeline environment variables once for all workflows
305 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
306
307 for _, w := range tpl.Workflows {
308 if w != nil {
309 if _, ok := s.engs[w.Engine]; !ok {
310 err = s.db.StatusFailed(models.WorkflowId{
311 PipelineId: pipelineId,
312 Name: w.Name,
313 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
314 if err != nil {
315 return fmt.Errorf("db.StatusFailed: %w", err)
316 }
317
318 continue
319 }
320
321 eng := s.engs[w.Engine]
322
323 if _, ok := workflows[eng]; !ok {
324 workflows[eng] = []models.Workflow{}
325 }
326
327 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
328 if err != nil {
329 return fmt.Errorf("init workflow: %w", err)
330 }
331
332 // inject TANGLED_* env vars after InitWorkflow
333 // This prevents user-defined env vars from overriding them
334 if ewf.Environment == nil {
335 ewf.Environment = make(map[string]string)
336 }
337 maps.Copy(ewf.Environment, pipelineEnv)
338
339 workflows[eng] = append(workflows[eng], *ewf)
340
341 err = s.db.StatusPending(models.WorkflowId{
342 PipelineId: pipelineId,
343 Name: w.Name,
344 }, s.n)
345 if err != nil {
346 return fmt.Errorf("db.StatusPending: %w", err)
347 }
348 }
349 }
350
351 ok := s.jq.Enqueue(queue.Job{
352 Run: func() error {
353 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
354 RepoOwner: tpl.TriggerMetadata.Repo.Did,
355 RepoName: tpl.TriggerMetadata.Repo.Repo,
356 Workflows: workflows,
357 }, pipelineId)
358 return nil
359 },
360 OnFail: func(jobError error) {
361 s.l.Error("pipeline run failed", "error", jobError)
362 },
363 })
364 if ok {
365 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
366 } else {
367 s.l.Error("failed to enqueue pipeline: queue is full")
368 }
369 } else if msg.Nsid == tangled.GitRefUpdateNSID {
370 event := tangled.GitRefUpdate{}
371 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
372 l.Error("error unmarshalling", "err", err)
373 return err
374 }
375 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
376
377 // use event.RepoAt
378 // sync git repos in {data}/repos/{did}/sh.tangled.repo/{rkey}
379 // if it's nil, don't run pipeline. knot needs upgrade
380 // we will leave sh.tangled.pipeline.trigger for backward compatibility
381
382 // NOTE: we are blindly trusting the knot that it will return only repos it own
383 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
384 repoPath := s.newRepoPath(event.RepoDid, event.RepoName)
385 err := sparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha)
386 if err != nil {
387 l.Error("failed to sync git repo", "err", err)
388 return fmt.Errorf("sync git repo: %w", err)
389 }
390 l.Info("synced git repo")
391
392 // TODO: plan the pipeline
393 }
394
395 return nil
396}
397
398func (s *Spindle) newRepoPath(did, name string) string {
399 return path.Join(s.cfg.Server.RepoDir(), did, name)
400}
401
402func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
403 scheme := "https://"
404 if s.cfg.Server.Dev {
405 scheme = "http://"
406 }
407 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
408}
409
410const RequiredVersion = "2.49.0"
411
412func ensureGitVersion() error {
413 v, err := gitVersion()
414 if err != nil {
415 return fmt.Errorf("fetching git version: %w", err)
416 }
417 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
418 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
419 }
420 return nil
421}
422
423// TODO: move to "git" module shared between knot, appview & spindle
424func gitVersion() (*version.Version, error) {
425 var buf bytes.Buffer
426 cmd := exec.Command("git", "version")
427 cmd.Stdout = &buf
428 cmd.Stderr = os.Stderr
429 err := cmd.Run()
430 if err != nil {
431 return nil, err
432 }
433 fields := strings.Fields(buf.String())
434 if len(fields) < 3 {
435 return nil, fmt.Errorf("invalid git version: %s", buf)
436 }
437
438 // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1"
439 versionString := fields[2]
440 if pos := strings.Index(versionString, "windows"); pos >= 1 {
441 versionString = versionString[:pos-1]
442 }
443 return version.NewVersion(versionString)
444}
445
446func sparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error {
447 exist, err := isDir(path)
448 if err != nil {
449 return err
450 }
451 if !exist {
452 if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil {
453 return fmt.Errorf("git clone: %w", err)
454 }
455 if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", `'/.tangled/workflows'`).Run(); err != nil {
456 return fmt.Errorf("git sparse-checkout set: %w", err)
457 }
458 if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil {
459 return fmt.Errorf("git checkout: %w", err)
460 }
461 } else {
462 if err := exec.Command("git", "-C", path, "pull", "origin", rev).Run(); err != nil {
463 return fmt.Errorf("git pull: %w", err)
464 }
465 }
466 return nil
467}
468
469func isDir(path string) (bool, error) {
470 info, err := os.Stat(path)
471 if err == nil && info.IsDir() {
472 return true, nil
473 }
474 if os.IsNotExist(err) {
475 return false, nil
476 }
477 return false, err
478}