+85
-40
spindle/server.go
+85
-40
spindle/server.go
···
49
49
vault secrets.Manager
50
50
}
51
51
52
-
func Run(ctx context.Context) error {
52
+
// New creates a new Spindle server with the provided configuration and engines.
53
+
func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
53
54
logger := log.FromContext(ctx)
54
55
55
-
cfg, err := config.Load(ctx)
56
-
if err != nil {
57
-
return fmt.Errorf("failed to load config: %w", err)
58
-
}
59
-
60
56
d, err := db.Make(cfg.Server.DBPath)
61
57
if err != nil {
62
-
return fmt.Errorf("failed to setup db: %w", err)
58
+
return nil, fmt.Errorf("failed to setup db: %w", err)
63
59
}
64
60
65
61
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
66
62
if err != nil {
67
-
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
63
+
return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
68
64
}
69
65
e.E.EnableAutoSave(true)
70
66
···
74
70
switch cfg.Server.Secrets.Provider {
75
71
case "openbao":
76
72
if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
77
-
return fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
73
+
return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
78
74
}
79
75
vault, err = secrets.NewOpenBaoManager(
80
76
cfg.Server.Secrets.OpenBao.ProxyAddr,
···
82
78
secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
83
79
)
84
80
if err != nil {
85
-
return fmt.Errorf("failed to setup openbao secrets provider: %w", err)
81
+
return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
86
82
}
87
83
logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
88
84
case "sqlite", "":
89
85
vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
90
86
if err != nil {
91
-
return fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
87
+
return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
92
88
}
93
89
logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
94
90
default:
95
-
return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
96
-
}
97
-
98
-
nixeryEng, err := nixery.New(ctx, cfg)
99
-
if err != nil {
100
-
return err
91
+
return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
101
92
}
102
93
103
94
jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
···
110
101
}
111
102
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
112
103
if err != nil {
113
-
return fmt.Errorf("failed to setup jetstream client: %w", err)
104
+
return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
114
105
}
115
106
jc.AddDid(cfg.Server.Owner)
116
107
117
108
// Check if the spindle knows about any Dids;
118
109
dids, err := d.GetAllDids()
119
110
if err != nil {
120
-
return fmt.Errorf("failed to get all dids: %w", err)
111
+
return nil, fmt.Errorf("failed to get all dids: %w", err)
121
112
}
122
113
for _, d := range dids {
123
114
jc.AddDid(d)
···
125
116
126
117
resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
127
118
128
-
spindle := Spindle{
119
+
spindle := &Spindle{
129
120
jc: jc,
130
121
e: e,
131
122
db: d,
132
123
l: logger,
133
124
n: &n,
134
-
engs: map[string]models.Engine{"nixery": nixeryEng},
125
+
engs: engines,
135
126
jq: jq,
136
127
cfg: cfg,
137
128
res: resolver,
···
140
131
141
132
err = e.AddSpindle(rbacDomain)
142
133
if err != nil {
143
-
return fmt.Errorf("failed to set rbac domain: %w", err)
134
+
return nil, fmt.Errorf("failed to set rbac domain: %w", err)
144
135
}
145
136
err = spindle.configureOwner()
146
137
if err != nil {
147
-
return err
138
+
return nil, err
148
139
}
149
140
logger.Info("owner set", "did", cfg.Server.Owner)
150
141
151
-
// starts a job queue runner in the background
152
-
jq.Start()
153
-
defer jq.Stop()
154
-
155
-
// Stop vault token renewal if it implements Stopper
156
-
if stopper, ok := vault.(secrets.Stopper); ok {
157
-
defer stopper.Stop()
158
-
}
159
-
160
142
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
161
143
if err != nil {
162
-
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
144
+
return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
163
145
}
164
146
165
147
err = jc.StartJetstream(ctx, spindle.ingest())
166
148
if err != nil {
167
-
return fmt.Errorf("failed to start jetstream consumer: %w", err)
149
+
return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
168
150
}
169
151
170
152
// for each incoming sh.tangled.pipeline, we execute
···
177
159
ccfg.CursorStore = cursorStore
178
160
knownKnots, err := d.Knots()
179
161
if err != nil {
180
-
return err
162
+
return nil, err
181
163
}
182
164
for _, knot := range knownKnots {
183
165
logger.Info("adding source start", "knot", knot)
···
185
167
}
186
168
spindle.ks = eventconsumer.NewConsumer(*ccfg)
187
169
170
+
return spindle, nil
171
+
}
172
+
173
+
// DB returns the database instance.
174
+
func (s *Spindle) DB() *db.DB {
175
+
return s.db
176
+
}
177
+
178
+
// Queue returns the job queue instance.
179
+
func (s *Spindle) Queue() *queue.Queue {
180
+
return s.jq
181
+
}
182
+
183
+
// Engines returns the map of available engines.
184
+
func (s *Spindle) Engines() map[string]models.Engine {
185
+
return s.engs
186
+
}
187
+
188
+
// Vault returns the secrets manager instance.
189
+
func (s *Spindle) Vault() secrets.Manager {
190
+
return s.vault
191
+
}
192
+
193
+
// Notifier returns the notifier instance.
194
+
func (s *Spindle) Notifier() *notifier.Notifier {
195
+
return s.n
196
+
}
197
+
198
+
// Enforcer returns the RBAC enforcer instance.
199
+
func (s *Spindle) Enforcer() *rbac.Enforcer {
200
+
return s.e
201
+
}
202
+
203
+
// Start starts the Spindle server (blocking).
204
+
func (s *Spindle) Start(ctx context.Context) error {
205
+
// starts a job queue runner in the background
206
+
s.jq.Start()
207
+
defer s.jq.Stop()
208
+
209
+
// Stop vault token renewal if it implements Stopper
210
+
if stopper, ok := s.vault.(secrets.Stopper); ok {
211
+
defer stopper.Stop()
212
+
}
213
+
188
214
go func() {
189
-
logger.Info("starting knot event consumer")
190
-
spindle.ks.Start(ctx)
215
+
s.l.Info("starting knot event consumer")
216
+
s.ks.Start(ctx)
191
217
}()
192
218
193
-
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
194
-
logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
219
+
s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
220
+
return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
221
+
}
195
222
196
-
return nil
223
+
func Run(ctx context.Context) error {
224
+
cfg, err := config.Load(ctx)
225
+
if err != nil {
226
+
return fmt.Errorf("failed to load config: %w", err)
227
+
}
228
+
229
+
nixeryEng, err := nixery.New(ctx, cfg)
230
+
if err != nil {
231
+
return err
232
+
}
233
+
234
+
s, err := New(ctx, cfg, map[string]models.Engine{
235
+
"nixery": nixeryEng,
236
+
})
237
+
if err != nil {
238
+
return err
239
+
}
240
+
241
+
return s.Start(ctx)
197
242
}
198
243
199
244
func (s *Spindle) Router() http.Handler {