forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
Monorepo for Tangled
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/eventconsumer"
14 "tangled.org/core/eventconsumer/cursor"
15 "tangled.org/core/idresolver"
16 "tangled.org/core/jetstream"
17 "tangled.org/core/log"
18 "tangled.org/core/notifier"
19 "tangled.org/core/rbac"
20 "tangled.org/core/spindle/config"
21 "tangled.org/core/spindle/db"
22 "tangled.org/core/spindle/engine"
23 "tangled.org/core/spindle/engines/nixery"
24 "tangled.org/core/spindle/models"
25 "tangled.org/core/spindle/queue"
26 "tangled.org/core/spindle/secrets"
27 "tangled.org/core/spindle/xrpc"
28 "tangled.org/core/xrpc/serviceauth"
29)
30
31//go:embed motd
32var motd []byte
33
34const (
35 rbacDomain = "thisserver"
36)
37
38type Spindle struct {
39 jc *jetstream.JetstreamClient
40 db *db.DB
41 e *rbac.Enforcer
42 l *slog.Logger
43 n *notifier.Notifier
44 engs map[string]models.Engine
45 jq *queue.Queue
46 cfg *config.Config
47 ks *eventconsumer.Consumer
48 res *idresolver.Resolver
49 vault secrets.Manager
50}
51
52func Run(ctx context.Context) error {
53 logger := log.FromContext(ctx)
54
55 cfg, err := config.Load(ctx)
56 if err != nil {
57 return fmt.Errorf("failed to load config: %w", err)
58 }
59
60 d, err := db.Make(cfg.Server.DBPath)
61 if err != nil {
62 return fmt.Errorf("failed to setup db: %w", err)
63 }
64
65 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
66 if err != nil {
67 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
68 }
69 e.E.EnableAutoSave(true)
70
71 n := notifier.New()
72
73 var vault secrets.Manager
74 switch cfg.Server.Secrets.Provider {
75 case "openbao":
76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78 }
79 vault, err = secrets.NewOpenBaoManager(
80 cfg.Server.Secrets.OpenBao.ProxyAddr,
81 logger,
82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83 )
84 if err != nil {
85 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86 }
87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88 case "sqlite", "":
89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
90 if err != nil {
91 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92 }
93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
94 default:
95 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96 }
97
98 nixeryEng, err := nixery.New(ctx, cfg)
99 if err != nil {
100 return err
101 }
102
103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
105
106 collections := []string{
107 tangled.SpindleMemberNSID,
108 tangled.RepoNSID,
109 tangled.RepoCollaboratorNSID,
110 }
111 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
112 if err != nil {
113 return fmt.Errorf("failed to setup jetstream client: %w", err)
114 }
115 jc.AddDid(cfg.Server.Owner)
116
117 // Check if the spindle knows about any Dids;
118 dids, err := d.GetAllDids()
119 if err != nil {
120 return fmt.Errorf("failed to get all dids: %w", err)
121 }
122 for _, d := range dids {
123 jc.AddDid(d)
124 }
125
126 resolver := idresolver.DefaultResolver()
127
128 spindle := Spindle{
129 jc: jc,
130 e: e,
131 db: d,
132 l: logger,
133 n: &n,
134 engs: map[string]models.Engine{"nixery": nixeryEng},
135 jq: jq,
136 cfg: cfg,
137 res: resolver,
138 vault: vault,
139 }
140
141 err = e.AddSpindle(rbacDomain)
142 if err != nil {
143 return fmt.Errorf("failed to set rbac domain: %w", err)
144 }
145 err = spindle.configureOwner()
146 if err != nil {
147 return err
148 }
149 logger.Info("owner set", "did", cfg.Server.Owner)
150
151 // starts a job queue runner in the background
152 jq.Start()
153 defer jq.Stop()
154
155 // Stop vault token renewal if it implements Stopper
156 if stopper, ok := vault.(secrets.Stopper); ok {
157 defer stopper.Stop()
158 }
159
160 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
161 if err != nil {
162 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
163 }
164
165 err = jc.StartJetstream(ctx, spindle.ingest())
166 if err != nil {
167 return fmt.Errorf("failed to start jetstream consumer: %w", err)
168 }
169
170 // for each incoming sh.tangled.pipeline, we execute
171 // spindle.processPipeline, which in turn enqueues the pipeline
172 // job in the above registered queue.
173 ccfg := eventconsumer.NewConsumerConfig()
174 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
175 ccfg.Dev = cfg.Server.Dev
176 ccfg.ProcessFunc = spindle.processPipeline
177 ccfg.CursorStore = cursorStore
178 knownKnots, err := d.Knots()
179 if err != nil {
180 return err
181 }
182 for _, knot := range knownKnots {
183 logger.Info("adding source start", "knot", knot)
184 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
185 }
186 spindle.ks = eventconsumer.NewConsumer(*ccfg)
187
188 go func() {
189 logger.Info("starting knot event consumer")
190 spindle.ks.Start(ctx)
191 }()
192
193 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
194 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
195
196 return nil
197}
198
199func (s *Spindle) Router() http.Handler {
200 mux := chi.NewRouter()
201
202 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
203 w.Write(motd)
204 })
205 mux.HandleFunc("/events", s.Events)
206 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
207
208 mux.Mount("/xrpc", s.XrpcRouter())
209 return mux
210}
211
212func (s *Spindle) XrpcRouter() http.Handler {
213 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
214
215 l := log.SubLogger(s.l, "xrpc")
216
217 x := xrpc.Xrpc{
218 Logger: l,
219 Db: s.db,
220 Enforcer: s.e,
221 Engines: s.engs,
222 Config: s.cfg,
223 Resolver: s.res,
224 Vault: s.vault,
225 ServiceAuth: serviceAuth,
226 }
227
228 return x.Router()
229}
230
231func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
232 if msg.Nsid == tangled.PipelineNSID {
233 tpl := tangled.Pipeline{}
234 err := json.Unmarshal(msg.EventJson, &tpl)
235 if err != nil {
236 fmt.Println("error unmarshalling", err)
237 return err
238 }
239
240 if tpl.TriggerMetadata == nil {
241 return fmt.Errorf("no trigger metadata found")
242 }
243
244 if tpl.TriggerMetadata.Repo == nil {
245 return fmt.Errorf("no repo data found")
246 }
247
248 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
249 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
250 }
251
252 // filter by repos
253 _, err = s.db.GetRepo(
254 tpl.TriggerMetadata.Repo.Knot,
255 tpl.TriggerMetadata.Repo.Did,
256 tpl.TriggerMetadata.Repo.Repo,
257 )
258 if err != nil {
259 return err
260 }
261
262 pipelineId := models.PipelineId{
263 Knot: src.Key(),
264 Rkey: msg.Rkey,
265 }
266
267 workflows := make(map[models.Engine][]models.Workflow)
268
269 for _, w := range tpl.Workflows {
270 if w != nil {
271 if _, ok := s.engs[w.Engine]; !ok {
272 err = s.db.StatusFailed(models.WorkflowId{
273 PipelineId: pipelineId,
274 Name: w.Name,
275 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
276 if err != nil {
277 return err
278 }
279
280 continue
281 }
282
283 eng := s.engs[w.Engine]
284
285 if _, ok := workflows[eng]; !ok {
286 workflows[eng] = []models.Workflow{}
287 }
288
289 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
290 if err != nil {
291 return err
292 }
293
294 workflows[eng] = append(workflows[eng], *ewf)
295
296 err = s.db.StatusPending(models.WorkflowId{
297 PipelineId: pipelineId,
298 Name: w.Name,
299 }, s.n)
300 if err != nil {
301 return err
302 }
303 }
304 }
305
306 ok := s.jq.Enqueue(queue.Job{
307 Run: func() error {
308 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
309 RepoOwner: tpl.TriggerMetadata.Repo.Did,
310 RepoName: tpl.TriggerMetadata.Repo.Repo,
311 Workflows: workflows,
312 }, pipelineId)
313 return nil
314 },
315 OnFail: func(jobError error) {
316 s.l.Error("pipeline run failed", "error", jobError)
317 },
318 })
319 if ok {
320 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
321 } else {
322 s.l.Error("failed to enqueue pipeline: queue is full")
323 }
324 }
325
326 return nil
327}
328
329func (s *Spindle) configureOwner() error {
330 cfgOwner := s.cfg.Server.Owner
331
332 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
333 if err != nil {
334 return err
335 }
336
337 switch len(existing) {
338 case 0:
339 // no owner configured, continue
340 case 1:
341 // find existing owner
342 existingOwner := existing[0]
343
344 // no ownership change, this is okay
345 if existingOwner == s.cfg.Server.Owner {
346 break
347 }
348
349 // remove existing owner
350 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
351 if err != nil {
352 return nil
353 }
354 default:
355 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
356 }
357
358 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
359}