+85
-40
spindle/server.go
+85
-40
spindle/server.go
···
49
vault secrets.Manager
50
}
51
52
-
func Run(ctx context.Context) error {
53
logger := log.FromContext(ctx)
54
55
-
cfg, err := config.Load(ctx)
56
-
if err != nil {
57
-
return fmt.Errorf("failed to load config: %w", err)
58
-
}
59
-
60
d, err := db.Make(cfg.Server.DBPath)
61
if err != nil {
62
-
return fmt.Errorf("failed to setup db: %w", err)
63
}
64
65
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
66
if err != nil {
67
-
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
68
}
69
e.E.EnableAutoSave(true)
70
···
74
switch cfg.Server.Secrets.Provider {
75
case "openbao":
76
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77
-
return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78
}
79
vault, err = secrets.NewOpenBaoManager(
80
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
82
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83
)
84
if err != nil {
85
-
return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86
}
87
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88
case "sqlite", "":
89
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
90
if err != nil {
91
-
return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92
}
93
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
94
default:
95
-
return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96
-
}
97
-
98
-
nixeryEng, err := nixery.New(ctx, cfg)
99
-
if err != nil {
100
-
return err
101
}
102
103
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
110
}
111
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
112
if err != nil {
113
-
return fmt.Errorf("failed to setup jetstream client: %w", err)
114
}
115
jc.AddDid(cfg.Server.Owner)
116
117
// Check if the spindle knows about any Dids;
118
dids, err := d.GetAllDids()
119
if err != nil {
120
-
return fmt.Errorf("failed to get all dids: %w", err)
121
}
122
for _, d := range dids {
123
jc.AddDid(d)
···
125
126
resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
127
128
-
spindle := Spindle{
129
jc: jc,
130
e: e,
131
db: d,
132
l: logger,
133
n: &n,
134
-
engs: map[string]models.Engine{"nixery": nixeryEng},
135
jq: jq,
136
cfg: cfg,
137
res: resolver,
···
140
141
err = e.AddSpindle(rbacDomain)
142
if err != nil {
143
-
return fmt.Errorf("failed to set rbac domain: %w", err)
144
}
145
err = spindle.configureOwner()
146
if err != nil {
147
-
return err
148
}
149
logger.Info("owner set", "did", cfg.Server.Owner)
150
151
-
// starts a job queue runner in the background
152
-
jq.Start()
153
-
defer jq.Stop()
154
-
155
-
// Stop vault token renewal if it implements Stopper
156
-
if stopper, ok := vault.(secrets.Stopper); ok {
157
-
defer stopper.Stop()
158
-
}
159
-
160
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
161
if err != nil {
162
-
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
163
}
164
165
err = jc.StartJetstream(ctx, spindle.ingest())
166
if err != nil {
167
-
return fmt.Errorf("failed to start jetstream consumer: %w", err)
168
}
169
170
// for each incoming sh.tangled.pipeline, we execute
···
177
ccfg.CursorStore = cursorStore
178
knownKnots, err := d.Knots()
179
if err != nil {
180
-
return err
181
}
182
for _, knot := range knownKnots {
183
logger.Info("adding source start", "knot", knot)
···
185
}
186
spindle.ks = eventconsumer.NewConsumer(*ccfg)
187
188
go func() {
189
-
logger.Info("starting knot event consumer")
190
-
spindle.ks.Start(ctx)
191
}()
192
193
-
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
194
-
logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
195
196
-
return nil
197
}
198
199
func (s *Spindle) Router() http.Handler {
···
49
vault secrets.Manager
50
}
51
52
+
// New creates a new Spindle server with the provided configuration and engines.
53
+
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
54
logger := log.FromContext(ctx)
55
56
d, err := db.Make(cfg.Server.DBPath)
57
if err != nil {
58
+
return nil, fmt.Errorf("failed to setup db: %w", err)
59
}
60
61
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
62
if err != nil {
63
+
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
64
}
65
e.E.EnableAutoSave(true)
66
···
70
switch cfg.Server.Secrets.Provider {
71
case "openbao":
72
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
73
+
return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
74
}
75
vault, err = secrets.NewOpenBaoManager(
76
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
78
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
79
)
80
if err != nil {
81
+
return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
82
}
83
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
84
case "sqlite", "":
85
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
86
if err != nil {
87
+
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
88
}
89
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
90
default:
91
+
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
92
}
93
94
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
101
}
102
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
103
if err != nil {
104
+
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
105
}
106
jc.AddDid(cfg.Server.Owner)
107
108
// Check if the spindle knows about any Dids;
109
dids, err := d.GetAllDids()
110
if err != nil {
111
+
return nil, fmt.Errorf("failed to get all dids: %w", err)
112
}
113
for _, d := range dids {
114
jc.AddDid(d)
···
116
117
resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
118
119
+
spindle := &Spindle{
120
jc: jc,
121
e: e,
122
db: d,
123
l: logger,
124
n: &n,
125
+
engs: engines,
126
jq: jq,
127
cfg: cfg,
128
res: resolver,
···
131
132
err = e.AddSpindle(rbacDomain)
133
if err != nil {
134
+
return nil, fmt.Errorf("failed to set rbac domain: %w", err)
135
}
136
err = spindle.configureOwner()
137
if err != nil {
138
+
return nil, err
139
}
140
logger.Info("owner set", "did", cfg.Server.Owner)
141
142
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
143
if err != nil {
144
+
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
145
}
146
147
err = jc.StartJetstream(ctx, spindle.ingest())
148
if err != nil {
149
+
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
150
}
151
152
// for each incoming sh.tangled.pipeline, we execute
···
159
ccfg.CursorStore = cursorStore
160
knownKnots, err := d.Knots()
161
if err != nil {
162
+
return nil, err
163
}
164
for _, knot := range knownKnots {
165
logger.Info("adding source start", "knot", knot)
···
167
}
168
spindle.ks = eventconsumer.NewConsumer(*ccfg)
169
170
+
return spindle, nil
171
+
}
172
+
173
+
// DB returns the database instance.
174
+
func (s *Spindle) DB() *db.DB {
175
+
return s.db
176
+
}
177
+
178
+
// Queue returns the job queue instance.
179
+
func (s *Spindle) Queue() *queue.Queue {
180
+
return s.jq
181
+
}
182
+
183
+
// Engines returns the map of available engines.
184
+
func (s *Spindle) Engines() map[string]models.Engine {
185
+
return s.engs
186
+
}
187
+
188
+
// Vault returns the secrets manager instance.
189
+
func (s *Spindle) Vault() secrets.Manager {
190
+
return s.vault
191
+
}
192
+
193
+
// Notifier returns the notifier instance.
194
+
func (s *Spindle) Notifier() *notifier.Notifier {
195
+
return s.n
196
+
}
197
+
198
+
// Enforcer returns the RBAC enforcer instance.
199
+
func (s *Spindle) Enforcer() *rbac.Enforcer {
200
+
return s.e
201
+
}
202
+
203
+
// Start starts the Spindle server (blocking).
204
+
func (s *Spindle) Start(ctx context.Context) error {
205
+
// starts a job queue runner in the background
206
+
s.jq.Start()
207
+
defer s.jq.Stop()
208
+
209
+
// Stop vault token renewal if it implements Stopper
210
+
if stopper, ok := s.vault.(secrets.Stopper); ok {
211
+
defer stopper.Stop()
212
+
}
213
+
214
go func() {
215
+
s.l.Info("starting knot event consumer")
216
+
s.ks.Start(ctx)
217
}()
218
219
+
s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
220
+
return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
221
+
}
222
223
+
func Run(ctx context.Context) error {
224
+
cfg, err := config.Load(ctx)
225
+
if err != nil {
226
+
return fmt.Errorf("failed to load config: %w", err)
227
+
}
228
+
229
+
nixeryEng, err := nixery.New(ctx, cfg)
230
+
if err != nil {
231
+
return err
232
+
}
233
+
234
+
s, err := New(ctx, cfg, map[string]models.Engine{
235
+
"nixery": nixeryEng,
236
+
})
237
+
if err != nil {
238
+
return err
239
+
}
240
+
241
+
return s.Start(ctx)
242
}
243
244
func (s *Spindle) Router() http.Handler {