spindle/server: refactor spindle code to allow calling functions separate from Run() #779

merged
opened by evan.jarrett.net targeting master from evan.jarrett.net/core: spindle
Changed files
+85 -40
spindle
+85 -40
spindle/server.go
··· 49 49 vault secrets.Manager 50 50 } 51 51 52 - func Run(ctx context.Context) error { 52 + // New creates a new Spindle server with the provided configuration and engines. 53 + func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 53 54 logger := log.FromContext(ctx) 54 55 55 - cfg, err := config.Load(ctx) 56 - if err != nil { 57 - return fmt.Errorf("failed to load config: %w", err) 58 - } 59 - 60 56 d, err := db.Make(cfg.Server.DBPath) 61 57 if err != nil { 62 - return fmt.Errorf("failed to setup db: %w", err) 58 + return nil, fmt.Errorf("failed to setup db: %w", err) 63 59 } 64 60 65 61 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 66 62 if err != nil { 67 - return fmt.Errorf("failed to setup rbac enforcer: %w", err) 63 + return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 64 } 69 65 e.E.EnableAutoSave(true) 70 66 ··· 74 70 switch cfg.Server.Secrets.Provider { 75 71 case "openbao": 76 72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 77 - return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 73 + return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 78 74 } 79 75 vault, err = secrets.NewOpenBaoManager( 80 76 cfg.Server.Secrets.OpenBao.ProxyAddr, ··· 82 78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 83 79 ) 84 80 if err != nil { 85 - return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 81 + return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 86 82 } 87 83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 88 84 case "sqlite", "": 89 85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 90 86 if err != nil { 91 - return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 87 + return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 92 88 } 93 89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 94 90 default: 95 - return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 96 - } 97 - 98 - nixeryEng, err := nixery.New(ctx, cfg) 99 - if err != nil { 100 - return err 91 + return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 92 } 102 93 103 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) ··· 110 101 } 111 102 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 112 103 if err != nil { 113 - return fmt.Errorf("failed to setup jetstream client: %w", err) 104 + return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 114 105 } 115 106 jc.AddDid(cfg.Server.Owner) 116 107 117 108 // Check if the spindle knows about any Dids; 118 109 dids, err := d.GetAllDids() 119 110 if err != nil { 120 - return fmt.Errorf("failed to get all dids: %w", err) 111 + return nil, fmt.Errorf("failed to get all dids: %w", err) 121 112 } 122 113 for _, d := range dids { 123 114 jc.AddDid(d) ··· 125 116 126 117 resolver := idresolver.DefaultResolver() 127 118 128 - spindle := Spindle{ 119 + spindle := &Spindle{ 129 120 jc: jc, 130 121 e: e, 131 122 db: d, 132 123 l: logger, 133 124 n: &n, 134 - engs: map[string]models.Engine{"nixery": nixeryEng}, 125 + engs: engines, 135 126 jq: jq, 136 127 cfg: cfg, 137 128 res: resolver, ··· 140 131 141 132 err = e.AddSpindle(rbacDomain) 142 133 if err != nil { 143 - return fmt.Errorf("failed to set rbac domain: %w", err) 134 + return nil, fmt.Errorf("failed to set rbac domain: %w", err) 144 135 } 145 136 err = spindle.configureOwner() 146 137 if err != nil { 147 - return err 138 + return nil, err 148 139 } 149 140 logger.Info("owner set", "did", cfg.Server.Owner) 150 141 151 - // starts a job queue runner in the background 152 - jq.Start() 153 - defer jq.Stop() 154 - 155 - // Stop vault token renewal if it implements Stopper 156 - if stopper, ok := vault.(secrets.Stopper); ok { 157 - defer stopper.Stop() 158 - } 159 - 160 142 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 161 143 if err != nil { 162 - return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 144 + return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 163 145 } 164 146 165 147 err = jc.StartJetstream(ctx, spindle.ingest()) 166 148 if err != nil { 167 - return fmt.Errorf("failed to start jetstream consumer: %w", err) 149 + return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 168 150 } 169 151 170 152 // for each incoming sh.tangled.pipeline, we execute ··· 177 159 ccfg.CursorStore = cursorStore 178 160 knownKnots, err := d.Knots() 179 161 if err != nil { 180 - return err 162 + return nil, err 181 163 } 182 164 for _, knot := range knownKnots { 183 165 logger.Info("adding source start", "knot", knot) ··· 185 167 } 186 168 spindle.ks = eventconsumer.NewConsumer(*ccfg) 187 169 170 + return spindle, nil 171 + } 172 + 173 + // DB returns the database instance. 174 + func (s *Spindle) DB() *db.DB { 175 + return s.db 176 + } 177 + 178 + // Queue returns the job queue instance. 179 + func (s *Spindle) Queue() *queue.Queue { 180 + return s.jq 181 + } 182 + 183 + // Engines returns the map of available engines. 184 + func (s *Spindle) Engines() map[string]models.Engine { 185 + return s.engs 186 + } 187 + 188 + // Vault returns the secrets manager instance. 189 + func (s *Spindle) Vault() secrets.Manager { 190 + return s.vault 191 + } 192 + 193 + // Notifier returns the notifier instance. 194 + func (s *Spindle) Notifier() *notifier.Notifier { 195 + return s.n 196 + } 197 + 198 + // Enforcer returns the RBAC enforcer instance. 199 + func (s *Spindle) Enforcer() *rbac.Enforcer { 200 + return s.e 201 + } 202 + 203 + // Start starts the Spindle server (blocking). 204 + func (s *Spindle) Start(ctx context.Context) error { 205 + // starts a job queue runner in the background 206 + s.jq.Start() 207 + defer s.jq.Stop() 208 + 209 + // Stop vault token renewal if it implements Stopper 210 + if stopper, ok := s.vault.(secrets.Stopper); ok { 211 + defer stopper.Stop() 212 + } 213 + 188 214 go func() { 189 - logger.Info("starting knot event consumer") 190 - spindle.ks.Start(ctx) 215 + s.l.Info("starting knot event consumer") 216 + s.ks.Start(ctx) 191 217 }() 192 218 193 - logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 194 - logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 219 + s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 220 + return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 221 + } 195 222 196 - return nil 223 + func Run(ctx context.Context) error { 224 + cfg, err := config.Load(ctx) 225 + if err != nil { 226 + return fmt.Errorf("failed to load config: %w", err) 227 + } 228 + 229 + nixeryEng, err := nixery.New(ctx, cfg) 230 + if err != nil { 231 + return err 232 + } 233 + 234 + s, err := New(ctx, cfg, map[string]models.Engine{ 235 + "nixery": nixeryEng, 236 + }) 237 + if err != nil { 238 + return err 239 + } 240 + 241 + return s.Start(ctx) 197 242 } 198 243 199 244 func (s *Spindle) Router() http.Handler {