forked from tangled.org/core
Monorepo for Tangled

spindle: configure initial set of knots dynamically

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 924bcc27 a58af28f

verified
Changed files
+14 -11
spindle
+1 -2
spindle/config/config.go
··· 16 16 } 17 17 18 18 type Config struct { 19 - Server Server `env:",prefix=SPINDLE_SERVER_"` 20 - Knots []string `env:"SPINDLE_SUBSCRIBED_KNOTS,required"` 19 + Server Server `env:",prefix=SPINDLE_SERVER_"` 21 20 } 22 21 23 22 func Load(ctx context.Context) (*Config, error) {
+13 -9
spindle/server.go
··· 35 35 eng *engine.Engine 36 36 jq *queue.Queue 37 37 cfg *config.Config 38 + ks *knotclient.EventConsumer 38 39 } 39 40 40 41 func Run(ctx context.Context) error { ··· 82 83 cfg: cfg, 83 84 } 84 85 85 - err = e.AddDomain(rbacDomain) 86 + err = e.AddKnot(rbacDomain) 86 87 if err != nil { 87 88 return fmt.Errorf("failed to set rbac domain: %w", err) 88 89 } ··· 109 110 // for each incoming sh.tangled.pipeline, we execute 110 111 // spindle.processPipeline, which in turn enqueues the pipeline 111 112 // job in the above registered queue. 112 - 113 113 ccfg := knotclient.NewConsumerConfig() 114 114 ccfg.Logger = logger 115 115 ccfg.Dev = cfg.Server.Dev 116 116 ccfg.ProcessFunc = spindle.processPipeline 117 117 ccfg.CursorStore = cursorStore 118 - for _, knot := range spindle.cfg.Knots { 119 - kes := knotclient.NewEventSource(knot) 120 - ccfg.AddEventSource(kes) 118 + knotstream := knotclient.NewEventConsumer(*ccfg) 119 + knownKnots, err := d.Knots() 120 + if err != nil { 121 + return err 121 122 } 122 - ec := knotclient.NewEventConsumer(*ccfg) 123 + for _, knot := range knownKnots { 124 + knotstream.AddSource(ctx, knotclient.NewEventSource(knot)) 125 + } 126 + spindle.ks = knotstream 123 127 124 128 go func() { 125 - logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots) 126 - ec.Start(ctx) 129 + logger.Info("starting knot event consumer", "knots") 130 + knotstream.Start(ctx) 127 131 }() 128 132 129 133 logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) ··· 193 197 } 194 198 195 199 if len(serverOwner) == 0 { 196 - s.e.AddOwner(rbacDomain, cfgOwner) 200 + s.e.AddKnotOwner(rbacDomain, cfgOwner) 197 201 } else { 198 202 if serverOwner[0] != cfgOwner { 199 203 return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])