+1
-2
spindle/config/config.go
+1
-2
spindle/config/config.go
+13
-9
spindle/server.go
+13
-9
spindle/server.go
···
35
eng *engine.Engine
36
jq *queue.Queue
37
cfg *config.Config
38
}
39
40
func Run(ctx context.Context) error {
···
82
cfg: cfg,
83
}
84
85
-
err = e.AddDomain(rbacDomain)
86
if err != nil {
87
return fmt.Errorf("failed to set rbac domain: %w", err)
88
}
···
109
// for each incoming sh.tangled.pipeline, we execute
110
// spindle.processPipeline, which in turn enqueues the pipeline
111
// job in the above registered queue.
112
-
113
ccfg := knotclient.NewConsumerConfig()
114
ccfg.Logger = logger
115
ccfg.Dev = cfg.Server.Dev
116
ccfg.ProcessFunc = spindle.processPipeline
117
ccfg.CursorStore = cursorStore
118
-
for _, knot := range spindle.cfg.Knots {
119
-
kes := knotclient.NewEventSource(knot)
120
-
ccfg.AddEventSource(kes)
121
}
122
-
ec := knotclient.NewEventConsumer(*ccfg)
123
124
go func() {
125
-
logger.Info("starting knot event consumer", "knots", spindle.cfg.Knots)
126
-
ec.Start(ctx)
127
}()
128
129
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
···
193
}
194
195
if len(serverOwner) == 0 {
196
-
s.e.AddOwner(rbacDomain, cfgOwner)
197
} else {
198
if serverOwner[0] != cfgOwner {
199
return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])
···
35
eng *engine.Engine
36
jq *queue.Queue
37
cfg *config.Config
38
+
ks *knotclient.EventConsumer
39
}
40
41
func Run(ctx context.Context) error {
···
83
cfg: cfg,
84
}
85
86
+
err = e.AddKnot(rbacDomain)
87
if err != nil {
88
return fmt.Errorf("failed to set rbac domain: %w", err)
89
}
···
110
// for each incoming sh.tangled.pipeline, we execute
111
// spindle.processPipeline, which in turn enqueues the pipeline
112
// job in the above registered queue.
113
ccfg := knotclient.NewConsumerConfig()
114
ccfg.Logger = logger
115
ccfg.Dev = cfg.Server.Dev
116
ccfg.ProcessFunc = spindle.processPipeline
117
ccfg.CursorStore = cursorStore
118
+
knotstream := knotclient.NewEventConsumer(*ccfg)
119
+
knownKnots, err := d.Knots()
120
+
if err != nil {
121
+
return err
122
}
123
+
for _, knot := range knownKnots {
124
+
knotstream.AddSource(ctx, knotclient.NewEventSource(knot))
125
+
}
126
+
spindle.ks = knotstream
127
128
go func() {
129
+
logger.Info("starting knot event consumer", "knots")
130
+
knotstream.Start(ctx)
131
}()
132
133
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
···
197
}
198
199
if len(serverOwner) == 0 {
200
+
s.e.AddKnotOwner(rbacDomain, cfgOwner)
201
} else {
202
if serverOwner[0] != cfgOwner {
203
return fmt.Errorf("server owner mismatch: %s != %s", cfgOwner, serverOwner[0])