forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/http"
10
11 "github.com/go-chi/chi/v5"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/eventconsumer"
14 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
15 "tangled.sh/tangled.sh/core/idresolver"
16 "tangled.sh/tangled.sh/core/jetstream"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/notifier"
19 "tangled.sh/tangled.sh/core/rbac"
20 "tangled.sh/tangled.sh/core/spindle/config"
21 "tangled.sh/tangled.sh/core/spindle/db"
22 "tangled.sh/tangled.sh/core/spindle/engine"
23 "tangled.sh/tangled.sh/core/spindle/models"
24 "tangled.sh/tangled.sh/core/spindle/queue"
25 "tangled.sh/tangled.sh/core/spindle/secrets"
26 "tangled.sh/tangled.sh/core/spindle/xrpc"
27)
28
29//go:embed motd
30var motd []byte
31
32const (
33 rbacDomain = "thisserver"
34)
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 eng *engine.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48}
49
50func Run(ctx context.Context) error {
51 logger := log.FromContext(ctx)
52
53 cfg, err := config.Load(ctx)
54 if err != nil {
55 return fmt.Errorf("failed to load config: %w", err)
56 }
57
58 d, err := db.Make(cfg.Server.DBPath)
59 if err != nil {
60 return fmt.Errorf("failed to setup db: %w", err)
61 }
62
63 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
64 if err != nil {
65 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
66 }
67 e.E.EnableAutoSave(true)
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
75 return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
76 }
77 vault, err = secrets.NewOpenBaoManager(
78 cfg.Server.Secrets.OpenBao.ProxyAddr,
79 logger,
80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
81 )
82 if err != nil {
83 return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
84 }
85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
86 case "sqlite", "":
87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
88 if err != nil {
89 return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
90 }
91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
92 default:
93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
94 }
95
96 eng, err := engine.New(ctx, cfg, d, &n, vault)
97 if err != nil {
98 return err
99 }
100
101 jq := queue.NewQueue(100, 5)
102
103 collections := []string{
104 tangled.SpindleMemberNSID,
105 tangled.RepoNSID,
106 tangled.RepoCollaboratorNSID,
107 }
108 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
109 if err != nil {
110 return fmt.Errorf("failed to setup jetstream client: %w", err)
111 }
112 jc.AddDid(cfg.Server.Owner)
113
114 // Check if the spindle knows about any Dids;
115 dids, err := d.GetAllDids()
116 if err != nil {
117 return fmt.Errorf("failed to get all dids: %w", err)
118 }
119 for _, d := range dids {
120 jc.AddDid(d)
121 }
122
123 resolver := idresolver.DefaultResolver()
124
125 spindle := Spindle{
126 jc: jc,
127 e: e,
128 db: d,
129 l: logger,
130 n: &n,
131 eng: eng,
132 jq: jq,
133 cfg: cfg,
134 res: resolver,
135 vault: vault,
136 }
137
138 err = e.AddSpindle(rbacDomain)
139 if err != nil {
140 return fmt.Errorf("failed to set rbac domain: %w", err)
141 }
142 err = spindle.configureOwner()
143 if err != nil {
144 return err
145 }
146 logger.Info("owner set", "did", cfg.Server.Owner)
147
148 // starts a job queue runner in the background
149 jq.Start()
150 defer jq.Stop()
151
152 // Stop vault token renewal if it implements Stopper
153 if stopper, ok := vault.(secrets.Stopper); ok {
154 defer stopper.Stop()
155 }
156
157 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
158 if err != nil {
159 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
160 }
161
162 err = jc.StartJetstream(ctx, spindle.ingest())
163 if err != nil {
164 return fmt.Errorf("failed to start jetstream consumer: %w", err)
165 }
166
167 // for each incoming sh.tangled.pipeline, we execute
168 // spindle.processPipeline, which in turn enqueues the pipeline
169 // job in the above registered queue.
170 ccfg := eventconsumer.NewConsumerConfig()
171 ccfg.Logger = logger
172 ccfg.Dev = cfg.Server.Dev
173 ccfg.ProcessFunc = spindle.processPipeline
174 ccfg.CursorStore = cursorStore
175 knownKnots, err := d.Knots()
176 if err != nil {
177 return err
178 }
179 for _, knot := range knownKnots {
180 logger.Info("adding source start", "knot", knot)
181 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
182 }
183 spindle.ks = eventconsumer.NewConsumer(*ccfg)
184
185 go func() {
186 logger.Info("starting knot event consumer")
187 spindle.ks.Start(ctx)
188 }()
189
190 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
191 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
192
193 return nil
194}
195
196func (s *Spindle) Router() http.Handler {
197 mux := chi.NewRouter()
198
199 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
200 w.Write(motd)
201 })
202 mux.HandleFunc("/events", s.Events)
203 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
204 w.Write([]byte(s.cfg.Server.Owner))
205 })
206 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
207
208 mux.Mount("/xrpc", s.XrpcRouter())
209 return mux
210}
211
212func (s *Spindle) XrpcRouter() http.Handler {
213 logger := s.l.With("route", "xrpc")
214
215 x := xrpc.Xrpc{
216 Logger: logger,
217 Db: s.db,
218 Enforcer: s.e,
219 Engine: s.eng,
220 Config: s.cfg,
221 Resolver: s.res,
222 Vault: s.vault,
223 }
224
225 return x.Router()
226}
227
228func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
229 if msg.Nsid == tangled.PipelineNSID {
230 tpl := tangled.Pipeline{}
231 err := json.Unmarshal(msg.EventJson, &tpl)
232 if err != nil {
233 fmt.Println("error unmarshalling", err)
234 return err
235 }
236
237 if tpl.TriggerMetadata == nil {
238 return fmt.Errorf("no trigger metadata found")
239 }
240
241 if tpl.TriggerMetadata.Repo == nil {
242 return fmt.Errorf("no repo data found")
243 }
244
245 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
246 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
247 }
248
249 // filter by repos
250 _, err = s.db.GetRepo(
251 tpl.TriggerMetadata.Repo.Knot,
252 tpl.TriggerMetadata.Repo.Did,
253 tpl.TriggerMetadata.Repo.Repo,
254 )
255 if err != nil {
256 return err
257 }
258
259 pipelineId := models.PipelineId{
260 Knot: src.Key(),
261 Rkey: msg.Rkey,
262 }
263
264 for _, w := range tpl.Workflows {
265 if w != nil {
266 err := s.db.StatusPending(models.WorkflowId{
267 PipelineId: pipelineId,
268 Name: w.Name,
269 }, s.n)
270 if err != nil {
271 return err
272 }
273 }
274 }
275
276 spl := models.ToPipeline(tpl, *s.cfg)
277
278 ok := s.jq.Enqueue(queue.Job{
279 Run: func() error {
280 s.eng.StartWorkflows(ctx, spl, pipelineId)
281 return nil
282 },
283 OnFail: func(jobError error) {
284 s.l.Error("pipeline run failed", "error", jobError)
285 },
286 })
287 if ok {
288 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
289 } else {
290 s.l.Error("failed to enqueue pipeline: queue is full")
291 }
292 }
293
294 return nil
295}
296
297func (s *Spindle) configureOwner() error {
298 cfgOwner := s.cfg.Server.Owner
299
300 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
301 if err != nil {
302 return err
303 }
304
305 switch len(existing) {
306 case 0:
307 // no owner configured, continue
308 case 1:
309 // find existing owner
310 existingOwner := existing[0]
311
312 // no ownership change, this is okay
313 if existingOwner == s.cfg.Server.Owner {
314 break
315 }
316
317 // remove existing owner
318 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
319 if err != nil {
320 return nil
321 }
322 default:
323 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
324 }
325
326 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
327}