Monorepo for Tangled
tangled.org
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11
12 "github.com/go-chi/chi/v5"
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/eventconsumer"
15 "tangled.org/core/eventconsumer/cursor"
16 "tangled.org/core/idresolver"
17 "tangled.org/core/jetstream"
18 "tangled.org/core/log"
19 "tangled.org/core/notifier"
20 "tangled.org/core/rbac2"
21 "tangled.org/core/spindle/config"
22 "tangled.org/core/spindle/db"
23 "tangled.org/core/spindle/engine"
24 "tangled.org/core/spindle/engines/nixery"
25 "tangled.org/core/spindle/models"
26 "tangled.org/core/spindle/queue"
27 "tangled.org/core/spindle/secrets"
28 "tangled.org/core/spindle/xrpc"
29 "tangled.org/core/xrpc/serviceauth"
30)
31
32//go:embed motd
33var motd []byte
34
35type Spindle struct {
36 jc *jetstream.JetstreamClient
37 db *db.DB
38 e *rbac2.Enforcer
39 l *slog.Logger
40 n *notifier.Notifier
41 engs map[string]models.Engine
42 jq *queue.Queue
43 cfg *config.Config
44 ks *eventconsumer.Consumer
45 res *idresolver.Resolver
46 vault secrets.Manager
47}
48
49// New creates a new Spindle server with the provided configuration and engines.
50func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
51 logger := log.FromContext(ctx)
52
53 d, err := db.Make(cfg.Server.DBPath)
54 if err != nil {
55 return nil, fmt.Errorf("failed to setup db: %w", err)
56 }
57
58 e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
59 if err != nil {
60 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
61 }
62
63 n := notifier.New()
64
65 var vault secrets.Manager
66 switch cfg.Server.Secrets.Provider {
67 case "openbao":
68 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
69 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
70 }
71 vault, err = secrets.NewOpenBaoManager(
72 cfg.Server.Secrets.OpenBao.ProxyAddr,
73 logger,
74 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
75 )
76 if err != nil {
77 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
78 }
79 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
80 case "sqlite", "":
81 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
82 if err != nil {
83 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
84 }
85 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
86 default:
87 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
88 }
89
90 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
91 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
92
93 collections := []string{
94 tangled.SpindleMemberNSID,
95 tangled.RepoNSID,
96 tangled.RepoCollaboratorNSID,
97 }
98 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
99 if err != nil {
100 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
101 }
102 jc.AddDid(cfg.Server.Owner.String())
103
104 // Check if the spindle knows about any Dids;
105 dids, err := d.GetAllDids()
106 if err != nil {
107 return nil, fmt.Errorf("failed to get all dids: %w", err)
108 }
109 for _, d := range dids {
110 jc.AddDid(d)
111 }
112
113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
114
115 spindle := &Spindle{
116 jc: jc,
117 e: e,
118 db: d,
119 l: logger,
120 n: &n,
121 engs: engines,
122 jq: jq,
123 cfg: cfg,
124 res: resolver,
125 vault: vault,
126 }
127
128 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
129 if err != nil {
130 return nil, err
131 }
132 logger.Info("owner set", "did", cfg.Server.Owner)
133
134 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
135 if err != nil {
136 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
137 }
138
139 err = jc.StartJetstream(ctx, spindle.ingest())
140 if err != nil {
141 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
142 }
143
144 // for each incoming sh.tangled.pipeline, we execute
145 // spindle.processPipeline, which in turn enqueues the pipeline
146 // job in the above registered queue.
147 ccfg := eventconsumer.NewConsumerConfig()
148 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
149 ccfg.Dev = cfg.Server.Dev
150 ccfg.ProcessFunc = spindle.processPipeline
151 ccfg.CursorStore = cursorStore
152 knownKnots, err := d.Knots()
153 if err != nil {
154 return nil, err
155 }
156 for _, knot := range knownKnots {
157 logger.Info("adding source start", "knot", knot)
158 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
159 }
160 spindle.ks = eventconsumer.NewConsumer(*ccfg)
161
162 return spindle, nil
163}
164
165// DB returns the database instance.
166func (s *Spindle) DB() *db.DB {
167 return s.db
168}
169
170// Queue returns the job queue instance.
171func (s *Spindle) Queue() *queue.Queue {
172 return s.jq
173}
174
175// Engines returns the map of available engines.
176func (s *Spindle) Engines() map[string]models.Engine {
177 return s.engs
178}
179
180// Vault returns the secrets manager instance.
181func (s *Spindle) Vault() secrets.Manager {
182 return s.vault
183}
184
185// Notifier returns the notifier instance.
186func (s *Spindle) Notifier() *notifier.Notifier {
187 return s.n
188}
189
190// Enforcer returns the RBAC enforcer instance.
191func (s *Spindle) Enforcer() *rbac2.Enforcer {
192 return s.e
193}
194
195// Start starts the Spindle server (blocking).
196func (s *Spindle) Start(ctx context.Context) error {
197 // starts a job queue runner in the background
198 s.jq.Start()
199 defer s.jq.Stop()
200
201 // Stop vault token renewal if it implements Stopper
202 if stopper, ok := s.vault.(secrets.Stopper); ok {
203 defer stopper.Stop()
204 }
205
206 go func() {
207 s.l.Info("starting knot event consumer")
208 s.ks.Start(ctx)
209 }()
210
211 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
212 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
213}
214
215func Run(ctx context.Context) error {
216 cfg, err := config.Load(ctx)
217 if err != nil {
218 return fmt.Errorf("failed to load config: %w", err)
219 }
220
221 nixeryEng, err := nixery.New(ctx, cfg)
222 if err != nil {
223 return err
224 }
225
226 s, err := New(ctx, cfg, map[string]models.Engine{
227 "nixery": nixeryEng,
228 })
229 if err != nil {
230 return err
231 }
232
233 return s.Start(ctx)
234}
235
236func (s *Spindle) Router() http.Handler {
237 mux := chi.NewRouter()
238
239 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
240 w.Write(motd)
241 })
242 mux.HandleFunc("/events", s.Events)
243 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
244
245 mux.Mount("/xrpc", s.XrpcRouter())
246 return mux
247}
248
249func (s *Spindle) XrpcRouter() http.Handler {
250 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
251
252 l := log.SubLogger(s.l, "xrpc")
253
254 x := xrpc.Xrpc{
255 Logger: l,
256 Db: s.db,
257 Enforcer: s.e,
258 Engines: s.engs,
259 Config: s.cfg,
260 Resolver: s.res,
261 Vault: s.vault,
262 Notifier: s.Notifier(),
263 ServiceAuth: serviceAuth,
264 }
265
266 return x.Router()
267}
268
269func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
270 if msg.Nsid == tangled.PipelineNSID {
271 tpl := tangled.Pipeline{}
272 err := json.Unmarshal(msg.EventJson, &tpl)
273 if err != nil {
274 fmt.Println("error unmarshalling", err)
275 return err
276 }
277
278 if tpl.TriggerMetadata == nil {
279 return fmt.Errorf("no trigger metadata found")
280 }
281
282 if tpl.TriggerMetadata.Repo == nil {
283 return fmt.Errorf("no repo data found")
284 }
285
286 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
287 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
288 }
289
290 // filter by repos
291 _, err = s.db.GetRepo(
292 tpl.TriggerMetadata.Repo.Knot,
293 tpl.TriggerMetadata.Repo.Did,
294 tpl.TriggerMetadata.Repo.Repo,
295 )
296 if err != nil {
297 return fmt.Errorf("failed to get repo: %w", err)
298 }
299
300 pipelineId := models.PipelineId{
301 Knot: src.Key(),
302 Rkey: msg.Rkey,
303 }
304
305 workflows := make(map[models.Engine][]models.Workflow)
306
307 // Build pipeline environment variables once for all workflows
308 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
309
310 for _, w := range tpl.Workflows {
311 if w != nil {
312 if _, ok := s.engs[w.Engine]; !ok {
313 err = s.db.StatusFailed(models.WorkflowId{
314 PipelineId: pipelineId,
315 Name: w.Name,
316 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
317 if err != nil {
318 return fmt.Errorf("db.StatusFailed: %w", err)
319 }
320
321 continue
322 }
323
324 eng := s.engs[w.Engine]
325
326 if _, ok := workflows[eng]; !ok {
327 workflows[eng] = []models.Workflow{}
328 }
329
330 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
331 if err != nil {
332 return fmt.Errorf("init workflow: %w", err)
333 }
334
335 // inject TANGLED_* env vars after InitWorkflow
336 // This prevents user-defined env vars from overriding them
337 if ewf.Environment == nil {
338 ewf.Environment = make(map[string]string)
339 }
340 maps.Copy(ewf.Environment, pipelineEnv)
341
342 workflows[eng] = append(workflows[eng], *ewf)
343
344 err = s.db.StatusPending(models.WorkflowId{
345 PipelineId: pipelineId,
346 Name: w.Name,
347 }, s.n)
348 if err != nil {
349 return fmt.Errorf("db.StatusPending: %w", err)
350 }
351 }
352 }
353
354 ok := s.jq.Enqueue(queue.Job{
355 Run: func() error {
356 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
357 RepoOwner: tpl.TriggerMetadata.Repo.Did,
358 RepoName: tpl.TriggerMetadata.Repo.Repo,
359 Workflows: workflows,
360 }, pipelineId)
361 return nil
362 },
363 OnFail: func(jobError error) {
364 s.l.Error("pipeline run failed", "error", jobError)
365 },
366 })
367 if ok {
368 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
369 } else {
370 s.l.Error("failed to enqueue pipeline: queue is full")
371 }
372 }
373
374 return nil
375}