forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/eventconsumer"
14 "tangled.org/core/eventconsumer/cursor"
15 "tangled.org/core/idresolver"
16 "tangled.org/core/jetstream"
17 "tangled.org/core/log"
18 "tangled.org/core/notifier"
19 "tangled.org/core/rbac"
20 "tangled.org/core/spindle/config"
21 "tangled.org/core/spindle/db"
22 "tangled.org/core/spindle/engine"
23 "tangled.org/core/spindle/engines/nixery"
24 "tangled.org/core/spindle/models"
25 "tangled.org/core/spindle/queue"
26 "tangled.org/core/spindle/secrets"
27 "tangled.org/core/spindle/xrpc"
28 "tangled.org/core/xrpc/serviceauth"
29)
30
31//go:embed motd
32var motd []byte
33
34const (
35 rbacDomain = "thisserver"
36)
37
38type Spindle struct {
39 jc *jetstream.JetstreamClient
40 db *db.DB
41 e *rbac.Enforcer
42 l *slog.Logger
43 n *notifier.Notifier
44 engs map[string]models.Engine
45 jq *queue.Queue
46 cfg *config.Config
47 ks *eventconsumer.Consumer
48 res *idresolver.Resolver
49 vault secrets.Manager
50}
51
52// New creates a new Spindle server with the provided configuration and engines.
53func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
54 logger := log.FromContext(ctx)
55
56 d, err := db.Make(cfg.Server.DBPath)
57 if err != nil {
58 return nil, fmt.Errorf("failed to setup db: %w", err)
59 }
60
61 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
64 }
65 e.E.EnableAutoSave(true)
66
67 n := notifier.New()
68
69 var vault secrets.Manager
70 switch cfg.Server.Secrets.Provider {
71 case "openbao":
72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
73 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
74 }
75 vault, err = secrets.NewOpenBaoManager(
76 cfg.Server.Secrets.OpenBao.ProxyAddr,
77 logger,
78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
79 )
80 if err != nil {
81 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
82 }
83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
84 case "sqlite", "":
85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
86 if err != nil {
87 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
88 }
89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
90 default:
91 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
92 }
93
94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
96
97 collections := []string{
98 tangled.SpindleMemberNSID,
99 tangled.RepoNSID,
100 tangled.RepoCollaboratorNSID,
101 }
102 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
103 if err != nil {
104 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
105 }
106 jc.AddDid(cfg.Server.Owner)
107
108 // Check if the spindle knows about any Dids;
109 dids, err := d.GetAllDids()
110 if err != nil {
111 return nil, fmt.Errorf("failed to get all dids: %w", err)
112 }
113 for _, d := range dids {
114 jc.AddDid(d)
115 }
116
117 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
118
119 spindle := &Spindle{
120 jc: jc,
121 e: e,
122 db: d,
123 l: logger,
124 n: &n,
125 engs: engines,
126 jq: jq,
127 cfg: cfg,
128 res: resolver,
129 vault: vault,
130 }
131
132 err = e.AddSpindle(rbacDomain)
133 if err != nil {
134 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
135 }
136 err = spindle.configureOwner()
137 if err != nil {
138 return nil, err
139 }
140 logger.Info("owner set", "did", cfg.Server.Owner)
141
142 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
143 if err != nil {
144 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
145 }
146
147 err = jc.StartJetstream(ctx, spindle.ingest())
148 if err != nil {
149 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
150 }
151
152 // for each incoming sh.tangled.pipeline, we execute
153 // spindle.processPipeline, which in turn enqueues the pipeline
154 // job in the above registered queue.
155 ccfg := eventconsumer.NewConsumerConfig()
156 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
157 ccfg.Dev = cfg.Server.Dev
158 ccfg.ProcessFunc = spindle.processPipeline
159 ccfg.CursorStore = cursorStore
160 knownKnots, err := d.Knots()
161 if err != nil {
162 return nil, err
163 }
164 for _, knot := range knownKnots {
165 logger.Info("adding source start", "knot", knot)
166 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
167 }
168 spindle.ks = eventconsumer.NewConsumer(*ccfg)
169
170 return spindle, nil
171}
172
173// DB returns the database instance.
174func (s *Spindle) DB() *db.DB {
175 return s.db
176}
177
178// Queue returns the job queue instance.
179func (s *Spindle) Queue() *queue.Queue {
180 return s.jq
181}
182
183// Engines returns the map of available engines.
184func (s *Spindle) Engines() map[string]models.Engine {
185 return s.engs
186}
187
188// Vault returns the secrets manager instance.
189func (s *Spindle) Vault() secrets.Manager {
190 return s.vault
191}
192
193// Notifier returns the notifier instance.
194func (s *Spindle) Notifier() *notifier.Notifier {
195 return s.n
196}
197
198// Enforcer returns the RBAC enforcer instance.
199func (s *Spindle) Enforcer() *rbac.Enforcer {
200 return s.e
201}
202
203// Start starts the Spindle server (blocking).
204func (s *Spindle) Start(ctx context.Context) error {
205 // starts a job queue runner in the background
206 s.jq.Start()
207 defer s.jq.Stop()
208
209 // Stop vault token renewal if it implements Stopper
210 if stopper, ok := s.vault.(secrets.Stopper); ok {
211 defer stopper.Stop()
212 }
213
214 go func() {
215 s.l.Info("starting knot event consumer")
216 s.ks.Start(ctx)
217 }()
218
219 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
220 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
221}
222
223func Run(ctx context.Context) error {
224 cfg, err := config.Load(ctx)
225 if err != nil {
226 return fmt.Errorf("failed to load config: %w", err)
227 }
228
229 nixeryEng, err := nixery.New(ctx, cfg)
230 if err != nil {
231 return err
232 }
233
234 s, err := New(ctx, cfg, map[string]models.Engine{
235 "nixery": nixeryEng,
236 })
237 if err != nil {
238 return err
239 }
240
241 return s.Start(ctx)
242}
243
244func (s *Spindle) Router() http.Handler {
245 mux := chi.NewRouter()
246
247 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
248 w.Write(motd)
249 })
250 mux.HandleFunc("/events", s.Events)
251 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
252
253 mux.Mount("/xrpc", s.XrpcRouter())
254 return mux
255}
256
257func (s *Spindle) XrpcRouter() http.Handler {
258 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
259
260 l := log.SubLogger(s.l, "xrpc")
261
262 x := xrpc.Xrpc{
263 Logger: l,
264 Db: s.db,
265 Enforcer: s.e,
266 Engines: s.engs,
267 Config: s.cfg,
268 Resolver: s.res,
269 Vault: s.vault,
270 ServiceAuth: serviceAuth,
271 }
272
273 return x.Router()
274}
275
276func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
277 if msg.Nsid == tangled.PipelineNSID {
278 tpl := tangled.Pipeline{}
279 err := json.Unmarshal(msg.EventJson, &tpl)
280 if err != nil {
281 fmt.Println("error unmarshalling", err)
282 return err
283 }
284
285 if tpl.TriggerMetadata == nil {
286 return fmt.Errorf("no trigger metadata found")
287 }
288
289 if tpl.TriggerMetadata.Repo == nil {
290 return fmt.Errorf("no repo data found")
291 }
292
293 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
294 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
295 }
296
297 // filter by repos
298 _, err = s.db.GetRepo(
299 tpl.TriggerMetadata.Repo.Knot,
300 tpl.TriggerMetadata.Repo.Did,
301 tpl.TriggerMetadata.Repo.Repo,
302 )
303 if err != nil {
304 return err
305 }
306
307 pipelineId := models.PipelineId{
308 Knot: src.Key(),
309 Rkey: msg.Rkey,
310 }
311
312 workflows := make(map[models.Engine][]models.Workflow)
313
314 for _, w := range tpl.Workflows {
315 if w != nil {
316 if _, ok := s.engs[w.Engine]; !ok {
317 err = s.db.StatusFailed(models.WorkflowId{
318 PipelineId: pipelineId,
319 Name: w.Name,
320 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
321 if err != nil {
322 return err
323 }
324
325 continue
326 }
327
328 eng := s.engs[w.Engine]
329
330 if _, ok := workflows[eng]; !ok {
331 workflows[eng] = []models.Workflow{}
332 }
333
334 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
335 if err != nil {
336 return err
337 }
338
339 workflows[eng] = append(workflows[eng], *ewf)
340
341 err = s.db.StatusPending(models.WorkflowId{
342 PipelineId: pipelineId,
343 Name: w.Name,
344 }, s.n)
345 if err != nil {
346 return err
347 }
348 }
349 }
350
351 ok := s.jq.Enqueue(queue.Job{
352 Run: func() error {
353 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
354 RepoOwner: tpl.TriggerMetadata.Repo.Did,
355 RepoName: tpl.TriggerMetadata.Repo.Repo,
356 Workflows: workflows,
357 }, pipelineId)
358 return nil
359 },
360 OnFail: func(jobError error) {
361 s.l.Error("pipeline run failed", "error", jobError)
362 },
363 })
364 if ok {
365 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
366 } else {
367 s.l.Error("failed to enqueue pipeline: queue is full")
368 }
369 }
370
371 return nil
372}
373
374func (s *Spindle) configureOwner() error {
375 cfgOwner := s.cfg.Server.Owner
376
377 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
378 if err != nil {
379 return err
380 }
381
382 switch len(existing) {
383 case 0:
384 // no owner configured, continue
385 case 1:
386 // find existing owner
387 existingOwner := existing[0]
388
389 // no ownership change, this is okay
390 if existingOwner == s.cfg.Server.Owner {
391 break
392 }
393
394 // remove existing owner
395 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
396 if err != nil {
397 return nil
398 }
399 default:
400 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
401 }
402
403 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
404}