forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

spindle/server: refactor spindle code to allow calling functions separate from Run()

Signed-off-by: Evan Jarrett <evan@evanjarrett.com>

evan.jarrett.net 596cca86 8692900c

verified
Changed files
+85 -40
spindle
+85 -40
spindle/server.go
··· 49 vault secrets.Manager 50 } 51 52 - func Run(ctx context.Context) error { 53 logger := log.FromContext(ctx) 54 - 55 - cfg, err := config.Load(ctx) 56 - if err != nil { 57 - return fmt.Errorf("failed to load config: %w", err) 58 - } 59 60 d, err := db.Make(cfg.Server.DBPath) 61 if err != nil { 62 - return fmt.Errorf("failed to setup db: %w", err) 63 } 64 65 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 66 if err != nil { 67 - return fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 } 69 e.E.EnableAutoSave(true) 70 ··· 74 switch cfg.Server.Secrets.Provider { 75 case "openbao": 76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 77 - return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 78 } 79 vault, err = secrets.NewOpenBaoManager( 80 cfg.Server.Secrets.OpenBao.ProxyAddr, ··· 82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 83 ) 84 if err != nil { 85 - return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 86 } 87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 88 case "sqlite", "": 89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 90 if err != nil { 91 - return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 92 } 93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 94 default: 95 - return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 96 - } 97 - 98 - nixeryEng, err := nixery.New(ctx, cfg) 99 - if err != nil { 100 - return err 101 } 102 103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) ··· 110 } 111 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 112 if err != nil { 113 - return fmt.Errorf("failed to setup jetstream client: %w", err) 114 } 115 jc.AddDid(cfg.Server.Owner) 116 117 // Check if the spindle knows about any Dids; 118 dids, err := d.GetAllDids() 119 if err != nil { 120 - return fmt.Errorf("failed to get all dids: %w", err) 121 } 122 for _, d := range dids { 123 jc.AddDid(d) ··· 125 126 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 127 128 - spindle := Spindle{ 129 jc: jc, 130 e: e, 131 db: d, 132 l: logger, 133 n: &n, 134 - engs: map[string]models.Engine{"nixery": nixeryEng}, 135 jq: jq, 136 cfg: cfg, 137 res: resolver, ··· 140 141 err = e.AddSpindle(rbacDomain) 142 if err != nil { 143 - return fmt.Errorf("failed to set rbac domain: %w", err) 144 } 145 err = spindle.configureOwner() 146 if err != nil { 147 - return err 148 } 149 logger.Info("owner set", "did", cfg.Server.Owner) 150 151 - // starts a job queue runner in the background 152 - jq.Start() 153 - defer jq.Stop() 154 - 155 - // Stop vault token renewal if it implements Stopper 156 - if stopper, ok := vault.(secrets.Stopper); ok { 157 - defer stopper.Stop() 158 - } 159 - 160 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 161 if err != nil { 162 - return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 163 } 164 165 err = jc.StartJetstream(ctx, spindle.ingest()) 166 if err != nil { 167 - return fmt.Errorf("failed to start jetstream consumer: %w", err) 168 } 169 170 // for each incoming sh.tangled.pipeline, we execute ··· 177 ccfg.CursorStore = cursorStore 178 knownKnots, err := d.Knots() 179 if err != nil { 180 - return err 181 } 182 for _, knot := range knownKnots { 183 logger.Info("adding source start", "knot", knot) ··· 185 } 186 spindle.ks = eventconsumer.NewConsumer(*ccfg) 187 188 go func() { 189 - logger.Info("starting knot event consumer") 190 - spindle.ks.Start(ctx) 191 }() 192 193 - logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 194 - logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 195 196 - return nil 197 } 198 199 func (s *Spindle) Router() http.Handler {
··· 49 vault secrets.Manager 50 } 51 52 + // New creates a new Spindle server with the provided configuration and engines. 53 + func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 54 logger := log.FromContext(ctx) 55 56 d, err := db.Make(cfg.Server.DBPath) 57 if err != nil { 58 + return nil, fmt.Errorf("failed to setup db: %w", err) 59 } 60 61 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 62 if err != nil { 63 + return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 64 } 65 e.E.EnableAutoSave(true) 66 ··· 70 switch cfg.Server.Secrets.Provider { 71 case "openbao": 72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 73 + return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 74 } 75 vault, err = secrets.NewOpenBaoManager( 76 cfg.Server.Secrets.OpenBao.ProxyAddr, ··· 78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 79 ) 80 if err != nil { 81 + return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 82 } 83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 84 case "sqlite", "": 85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 86 if err != nil { 87 + return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 88 } 89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 90 default: 91 + return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 92 } 93 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) ··· 101 } 102 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 103 if err != nil { 104 + return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 105 } 106 jc.AddDid(cfg.Server.Owner) 107 108 // Check if the spindle knows about any Dids; 109 dids, err := d.GetAllDids() 110 if err != nil { 111 + return nil, fmt.Errorf("failed to get all dids: %w", err) 112 } 113 for _, d := range dids { 114 jc.AddDid(d) ··· 116 117 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 118 119 + spindle := &Spindle{ 120 jc: jc, 121 e: e, 122 db: d, 123 l: logger, 124 n: &n, 125 + engs: engines, 126 jq: jq, 127 cfg: cfg, 128 res: resolver, ··· 131 132 err = e.AddSpindle(rbacDomain) 133 if err != nil { 134 + return nil, fmt.Errorf("failed to set rbac domain: %w", err) 135 } 136 err = spindle.configureOwner() 137 if err != nil { 138 + return nil, err 139 } 140 logger.Info("owner set", "did", cfg.Server.Owner) 141 142 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 143 if err != nil { 144 + return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 145 } 146 147 err = jc.StartJetstream(ctx, spindle.ingest()) 148 if err != nil { 149 + return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 150 } 151 152 // for each incoming sh.tangled.pipeline, we execute ··· 159 ccfg.CursorStore = cursorStore 160 knownKnots, err := d.Knots() 161 if err != nil { 162 + return nil, err 163 } 164 for _, knot := range knownKnots { 165 logger.Info("adding source start", "knot", knot) ··· 167 } 168 spindle.ks = eventconsumer.NewConsumer(*ccfg) 169 170 + return spindle, nil 171 + } 172 + 173 + // DB returns the database instance. 174 + func (s *Spindle) DB() *db.DB { 175 + return s.db 176 + } 177 + 178 + // Queue returns the job queue instance. 179 + func (s *Spindle) Queue() *queue.Queue { 180 + return s.jq 181 + } 182 + 183 + // Engines returns the map of available engines. 184 + func (s *Spindle) Engines() map[string]models.Engine { 185 + return s.engs 186 + } 187 + 188 + // Vault returns the secrets manager instance. 189 + func (s *Spindle) Vault() secrets.Manager { 190 + return s.vault 191 + } 192 + 193 + // Notifier returns the notifier instance. 194 + func (s *Spindle) Notifier() *notifier.Notifier { 195 + return s.n 196 + } 197 + 198 + // Enforcer returns the RBAC enforcer instance. 199 + func (s *Spindle) Enforcer() *rbac.Enforcer { 200 + return s.e 201 + } 202 + 203 + // Start starts the Spindle server (blocking). 204 + func (s *Spindle) Start(ctx context.Context) error { 205 + // starts a job queue runner in the background 206 + s.jq.Start() 207 + defer s.jq.Stop() 208 + 209 + // Stop vault token renewal if it implements Stopper 210 + if stopper, ok := s.vault.(secrets.Stopper); ok { 211 + defer stopper.Stop() 212 + } 213 + 214 go func() { 215 + s.l.Info("starting knot event consumer") 216 + s.ks.Start(ctx) 217 }() 218 219 + s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 220 + return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 221 + } 222 + 223 + func Run(ctx context.Context) error { 224 + cfg, err := config.Load(ctx) 225 + if err != nil { 226 + return fmt.Errorf("failed to load config: %w", err) 227 + } 228 + 229 + nixeryEng, err := nixery.New(ctx, cfg) 230 + if err != nil { 231 + return err 232 + } 233 + 234 + s, err := New(ctx, cfg, map[string]models.Engine{ 235 + "nixery": nixeryEng, 236 + }) 237 + if err != nil { 238 + return err 239 + } 240 241 + return s.Start(ctx) 242 } 243 244 func (s *Spindle) Router() http.Handler {