forked from
tangled.org/core
fork
Configure Feed
Select the types of activity you want to include in your feed.
this repo has no description
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package spindle
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net/http"
9
10 "github.com/go-chi/chi/v5"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/eventconsumer"
13 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
14 "tangled.sh/tangled.sh/core/jetstream"
15 "tangled.sh/tangled.sh/core/log"
16 "tangled.sh/tangled.sh/core/notifier"
17 "tangled.sh/tangled.sh/core/rbac"
18 "tangled.sh/tangled.sh/core/spindle/config"
19 "tangled.sh/tangled.sh/core/spindle/db"
20 "tangled.sh/tangled.sh/core/spindle/engine"
21 "tangled.sh/tangled.sh/core/spindle/models"
22 "tangled.sh/tangled.sh/core/spindle/queue"
23)
24
25const (
26 rbacDomain = "thisserver"
27)
28
29type Spindle struct {
30 jc *jetstream.JetstreamClient
31 db *db.DB
32 e *rbac.Enforcer
33 l *slog.Logger
34 n *notifier.Notifier
35 eng *engine.Engine
36 jq *queue.Queue
37 cfg *config.Config
38 ks *eventconsumer.Consumer
39}
40
41func Run(ctx context.Context) error {
42 logger := log.FromContext(ctx)
43
44 cfg, err := config.Load(ctx)
45 if err != nil {
46 return fmt.Errorf("failed to load config: %w", err)
47 }
48
49 d, err := db.Make(cfg.Server.DBPath)
50 if err != nil {
51 return fmt.Errorf("failed to setup db: %w", err)
52 }
53
54 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
55 if err != nil {
56 return fmt.Errorf("failed to setup rbac enforcer: %w", err)
57 }
58 e.E.EnableAutoSave(true)
59
60 n := notifier.New()
61
62 eng, err := engine.New(ctx, cfg, d, &n)
63 if err != nil {
64 return err
65 }
66
67 jq := queue.NewQueue(100, 2)
68
69 collections := []string{
70 tangled.SpindleMemberNSID,
71 tangled.RepoNSID,
72 }
73 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
74 if err != nil {
75 return fmt.Errorf("failed to setup jetstream client: %w", err)
76 }
77 jc.AddDid(cfg.Server.Owner)
78
79 spindle := Spindle{
80 jc: jc,
81 e: e,
82 db: d,
83 l: logger,
84 n: &n,
85 eng: eng,
86 jq: jq,
87 cfg: cfg,
88 }
89
90 err = e.AddSpindle(rbacDomain)
91 if err != nil {
92 return fmt.Errorf("failed to set rbac domain: %w", err)
93 }
94 err = spindle.configureOwner()
95 if err != nil {
96 return err
97 }
98 logger.Info("owner set", "did", cfg.Server.Owner)
99
100 // starts a job queue runner in the background
101 jq.Start()
102 defer jq.Stop()
103
104 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
105 if err != nil {
106 return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
107 }
108
109 err = jc.StartJetstream(ctx, spindle.ingest())
110 if err != nil {
111 return fmt.Errorf("failed to start jetstream consumer: %w", err)
112 }
113
114 // for each incoming sh.tangled.pipeline, we execute
115 // spindle.processPipeline, which in turn enqueues the pipeline
116 // job in the above registered queue.
117 ccfg := eventconsumer.NewConsumerConfig()
118 ccfg.Logger = logger
119 ccfg.Dev = cfg.Server.Dev
120 ccfg.ProcessFunc = spindle.processPipeline
121 ccfg.CursorStore = cursorStore
122 knownKnots, err := d.Knots()
123 if err != nil {
124 return err
125 }
126 for _, knot := range knownKnots {
127 logger.Info("adding source start", "knot", knot)
128 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
129 }
130 spindle.ks = eventconsumer.NewConsumer(*ccfg)
131
132 go func() {
133 logger.Info("starting knot event consumer")
134 spindle.ks.Start(ctx)
135 }()
136
137 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
138 logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
139
140 return nil
141}
142
143func (s *Spindle) Router() http.Handler {
144 mux := chi.NewRouter()
145
146 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
147 w.Write([]byte(
148 ` ****
149 *** ***
150 *** ** ****** **
151 ** * *****
152 * ** **
153 * * * ***************
154 ** ** *# **
155 * ** ** *** **
156 * * ** ** * ******
157 * ** ** * ** * *
158 ** ** *** ** ** *
159 ** ** * ** * *
160 ** **** ** * *
161 ** *** ** ** **
162 *** ** *****
163 ********************
164 **
165 *
166 #**************
167 **
168 ********
169
170This is a spindle server. More info at https://tangled.sh/@tangled.sh/core/tree/master/docs/spindle`))
171 })
172 mux.HandleFunc("/events", s.Events)
173 mux.HandleFunc("/owner", func(w http.ResponseWriter, r *http.Request) {
174 w.Write([]byte(s.cfg.Server.Owner))
175 })
176 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
177 return mux
178}
179
180func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
181 if msg.Nsid == tangled.PipelineNSID {
182 tpl := tangled.Pipeline{}
183 err := json.Unmarshal(msg.EventJson, &tpl)
184 if err != nil {
185 fmt.Println("error unmarshalling", err)
186 return err
187 }
188
189 if tpl.TriggerMetadata == nil {
190 return fmt.Errorf("no trigger metadata found")
191 }
192
193 if tpl.TriggerMetadata.Repo == nil {
194 return fmt.Errorf("no repo data found")
195 }
196
197 // filter by repos
198 _, err = s.db.GetRepo(
199 tpl.TriggerMetadata.Repo.Knot,
200 tpl.TriggerMetadata.Repo.Did,
201 tpl.TriggerMetadata.Repo.Repo,
202 )
203 if err != nil {
204 return err
205 }
206
207 pipelineId := models.PipelineId{
208 Knot: src.Key(),
209 Rkey: msg.Rkey,
210 }
211
212 for _, w := range tpl.Workflows {
213 if w != nil {
214 err := s.db.StatusPending(models.WorkflowId{
215 PipelineId: pipelineId,
216 Name: w.Name,
217 }, s.n)
218 if err != nil {
219 return err
220 }
221 }
222 }
223
224 spl := models.ToPipeline(tpl, *s.cfg)
225
226 ok := s.jq.Enqueue(queue.Job{
227 Run: func() error {
228 s.eng.StartWorkflows(ctx, spl, pipelineId)
229 return nil
230 },
231 OnFail: func(jobError error) {
232 s.l.Error("pipeline run failed", "error", jobError)
233 },
234 })
235 if ok {
236 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
237 } else {
238 s.l.Error("failed to enqueue pipeline: queue is full")
239 }
240 }
241
242 return nil
243}
244
245func (s *Spindle) configureOwner() error {
246 cfgOwner := s.cfg.Server.Owner
247
248 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
249 if err != nil {
250 return err
251 }
252
253 switch len(existing) {
254 case 0:
255 // no owner configured, continue
256 case 1:
257 // find existing owner
258 existingOwner := existing[0]
259
260 // no ownership change, this is okay
261 if existingOwner == s.cfg.Server.Owner {
262 break
263 }
264
265 // remove existing owner
266 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
267 if err != nil {
268 return nil
269 }
270 default:
271 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
272 }
273
274 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
275}