Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "path/filepath"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "github.com/hashicorp/go-version"
16 "tangled.org/core/api/tangled"
17 "tangled.org/core/eventconsumer"
18 "tangled.org/core/eventconsumer/cursor"
19 "tangled.org/core/idresolver"
20 "tangled.org/core/log"
21 "tangled.org/core/notifier"
22 "tangled.org/core/rbac2"
23 "tangled.org/core/spindle/config"
24 "tangled.org/core/spindle/db"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/engines/nixery"
27 "tangled.org/core/spindle/git"
28 "tangled.org/core/spindle/models"
29 "tangled.org/core/spindle/queue"
30 "tangled.org/core/spindle/secrets"
31 "tangled.org/core/spindle/xrpc"
32 "tangled.org/core/tap"
33 "tangled.org/core/xrpc/serviceauth"
34)
35
36//go:embed motd
37var motd []byte
38
39type Spindle struct {
40 tap *tap.Client
41 db *db.DB
42 e *rbac2.Enforcer
43 l *slog.Logger
44 n *notifier.Notifier
45 engs map[string]models.Engine
46 jq *queue.Queue
47 cfg *config.Config
48 ks *eventconsumer.Consumer
49 res *idresolver.Resolver
50 vault secrets.Manager
51}
52
53// New creates a new Spindle server with the provided configuration and engines.
54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
55 logger := log.FromContext(ctx)
56
57 if err := ensureGitVersion(); err != nil {
58 return nil, fmt.Errorf("ensuring git version: %w", err)
59 }
60
61 d, err := db.Make(ctx, cfg.Server.DBPath())
62 if err != nil {
63 return nil, fmt.Errorf("failed to setup db: %w", err)
64 }
65
66 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
67 if err != nil {
68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
69 }
70
71 n := notifier.New()
72
73 var vault secrets.Manager
74 switch cfg.Server.Secrets.Provider {
75 case "openbao":
76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78 }
79 vault, err = secrets.NewOpenBaoManager(
80 cfg.Server.Secrets.OpenBao.ProxyAddr,
81 logger,
82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83 )
84 if err != nil {
85 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86 }
87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88 case "sqlite", "":
89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
90 if err != nil {
91 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92 }
93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
94 default:
95 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96 }
97
98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
99 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
100
101 tap := tap.NewClient(cfg.Server.TapUrl, "")
102
103 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
104
105 spindle := &Spindle{
106 tap: &tap,
107 e: e,
108 db: d,
109 l: logger,
110 n: &n,
111 engs: engines,
112 jq: jq,
113 cfg: cfg,
114 res: resolver,
115 vault: vault,
116 }
117
118 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
119 if err != nil {
120 return nil, err
121 }
122 logger.Info("owner set", "did", cfg.Server.Owner)
123
124 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
125 if err != nil {
126 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
127 }
128
129 // for each incoming sh.tangled.pipeline, we execute
130 // spindle.processPipeline, which in turn enqueues the pipeline
131 // job in the above registered queue.
132 ccfg := eventconsumer.NewConsumerConfig()
133 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
134 ccfg.Dev = cfg.Server.Dev
135 ccfg.ProcessFunc = spindle.processPipeline
136 ccfg.CursorStore = cursorStore
137 knownKnots, err := d.Knots()
138 if err != nil {
139 return nil, err
140 }
141 for _, knot := range knownKnots {
142 logger.Info("adding source start", "knot", knot)
143 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
144 }
145 spindle.ks = eventconsumer.NewConsumer(*ccfg)
146
147 return spindle, nil
148}
149
150// DB returns the database instance.
151func (s *Spindle) DB() *db.DB {
152 return s.db
153}
154
155// Queue returns the job queue instance.
156func (s *Spindle) Queue() *queue.Queue {
157 return s.jq
158}
159
160// Engines returns the map of available engines.
161func (s *Spindle) Engines() map[string]models.Engine {
162 return s.engs
163}
164
165// Vault returns the secrets manager instance.
166func (s *Spindle) Vault() secrets.Manager {
167 return s.vault
168}
169
170// Notifier returns the notifier instance.
171func (s *Spindle) Notifier() *notifier.Notifier {
172 return s.n
173}
174
175// Enforcer returns the RBAC enforcer instance.
176func (s *Spindle) Enforcer() *rbac2.Enforcer {
177 return s.e
178}
179
180// Start starts the Spindle server (blocking).
181func (s *Spindle) Start(ctx context.Context) error {
182 // starts a job queue runner in the background
183 s.jq.Start()
184 defer s.jq.Stop()
185
186 // Stop vault token renewal if it implements Stopper
187 if stopper, ok := s.vault.(secrets.Stopper); ok {
188 defer stopper.Stop()
189 }
190
191 go func() {
192 s.l.Info("starting knot event consumer")
193 s.ks.Start(ctx)
194 }()
195
196 // ensure server owner is tracked
197 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
198 return err
199 }
200
201 go func() {
202 s.l.Info("starting tap stream consumer")
203 s.tap.Connect(ctx, &tap.SimpleIndexer{
204 EventHandler: s.processEvent,
205 })
206 }()
207
208 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
209 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
210}
211
212func Run(ctx context.Context) error {
213 cfg, err := config.Load(ctx)
214 if err != nil {
215 return fmt.Errorf("failed to load config: %w", err)
216 }
217
218 nixeryEng, err := nixery.New(ctx, cfg)
219 if err != nil {
220 return err
221 }
222
223 s, err := New(ctx, cfg, map[string]models.Engine{
224 "nixery": nixeryEng,
225 })
226 if err != nil {
227 return err
228 }
229
230 return s.Start(ctx)
231}
232
233func (s *Spindle) Router() http.Handler {
234 mux := chi.NewRouter()
235
236 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
237 w.Write(motd)
238 })
239 mux.HandleFunc("/events", s.Events)
240 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
241
242 mux.Mount("/xrpc", s.XrpcRouter())
243 return mux
244}
245
246func (s *Spindle) XrpcRouter() http.Handler {
247 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
248
249 l := log.SubLogger(s.l, "xrpc")
250
251 x := xrpc.Xrpc{
252 Logger: l,
253 Db: s.db,
254 Enforcer: s.e,
255 Engines: s.engs,
256 Config: s.cfg,
257 Resolver: s.res,
258 Vault: s.vault,
259 Notifier: s.Notifier(),
260 ServiceAuth: serviceAuth,
261 }
262
263 return x.Router()
264}
265
266func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
267 l := log.FromContext(ctx).With("handler", "processKnotStream")
268 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
269 if msg.Nsid == tangled.PipelineNSID {
270 return nil
271 tpl := tangled.Pipeline{}
272 err := json.Unmarshal(msg.EventJson, &tpl)
273 if err != nil {
274 fmt.Println("error unmarshalling", err)
275 return err
276 }
277
278 if tpl.TriggerMetadata == nil {
279 return fmt.Errorf("no trigger metadata found")
280 }
281
282 if tpl.TriggerMetadata.Repo == nil {
283 return fmt.Errorf("no repo data found")
284 }
285
286 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
287 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
288 }
289
290 // filter by repos
291 _, err = s.db.GetRepoWithName(
292 syntax.DID(tpl.TriggerMetadata.Repo.Did),
293 tpl.TriggerMetadata.Repo.Repo,
294 )
295 if err != nil {
296 return fmt.Errorf("failed to get repo: %w", err)
297 }
298
299 pipelineId := models.PipelineId{
300 Knot: src.Key(),
301 Rkey: msg.Rkey,
302 }
303
304 workflows := make(map[models.Engine][]models.Workflow)
305
306 // Build pipeline environment variables once for all workflows
307 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
308
309 for _, w := range tpl.Workflows {
310 if w != nil {
311 if _, ok := s.engs[w.Engine]; !ok {
312 err = s.db.StatusFailed(models.WorkflowId{
313 PipelineId: pipelineId,
314 Name: w.Name,
315 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
316 if err != nil {
317 return fmt.Errorf("db.StatusFailed: %w", err)
318 }
319
320 continue
321 }
322
323 eng := s.engs[w.Engine]
324
325 if _, ok := workflows[eng]; !ok {
326 workflows[eng] = []models.Workflow{}
327 }
328
329 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
330 if err != nil {
331 return fmt.Errorf("init workflow: %w", err)
332 }
333
334 // inject TANGLED_* env vars after InitWorkflow
335 // This prevents user-defined env vars from overriding them
336 if ewf.Environment == nil {
337 ewf.Environment = make(map[string]string)
338 }
339 maps.Copy(ewf.Environment, pipelineEnv)
340
341 workflows[eng] = append(workflows[eng], *ewf)
342
343 err = s.db.StatusPending(models.WorkflowId{
344 PipelineId: pipelineId,
345 Name: w.Name,
346 }, s.n)
347 if err != nil {
348 return fmt.Errorf("db.StatusPending: %w", err)
349 }
350 }
351 }
352
353 ok := s.jq.Enqueue(queue.Job{
354 Run: func() error {
355 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
356 RepoOwner: tpl.TriggerMetadata.Repo.Did,
357 RepoName: tpl.TriggerMetadata.Repo.Repo,
358 Workflows: workflows,
359 }, pipelineId)
360 return nil
361 },
362 OnFail: func(jobError error) {
363 s.l.Error("pipeline run failed", "error", jobError)
364 },
365 })
366 if ok {
367 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
368 } else {
369 s.l.Error("failed to enqueue pipeline: queue is full")
370 }
371 } else if msg.Nsid == tangled.GitRefUpdateNSID {
372 event := tangled.GitRefUpdate{}
373 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
374 l.Error("error unmarshalling", "err", err)
375 return err
376 }
377 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
378
379 // resolve repo name to rkey
380 // TODO: git.refUpdate should respond with rkey instead of repo name
381 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
382 if err != nil {
383 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
384 }
385
386 // NOTE: we are blindly trusting the knot that it will return only repos it own
387 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
388 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
389 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
390 return fmt.Errorf("sync git repo: %w", err)
391 }
392 l.Info("synced git repo")
393
394 // TODO: plan the pipeline
395 }
396
397 return nil
398}
399
400// newRepoPath creates a path to store repository by its did and rkey.
401// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
402func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
403 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
404}
405
406func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
407 scheme := "https://"
408 if s.cfg.Server.Dev {
409 scheme = "http://"
410 }
411 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
412}
413
414const RequiredVersion = "2.49.0"
415
416func ensureGitVersion() error {
417 v, err := git.Version()
418 if err != nil {
419 return fmt.Errorf("fetching git version: %w", err)
420 }
421 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
422 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
423 }
424 return nil
425}