+7
-1
spindle/server.go
+7
-1
spindle/server.go
···
11
"tangled.sh/tangled.sh/core/api/tangled"
12
"tangled.sh/tangled.sh/core/jetstream"
13
"tangled.sh/tangled.sh/core/knotclient"
14
"tangled.sh/tangled.sh/core/log"
15
"tangled.sh/tangled.sh/core/notifier"
16
"tangled.sh/tangled.sh/core/rbac"
···
78
// for each incoming sh.tangled.pipeline, we execute
79
// spindle.processPipeline, which in turn enqueues the pipeline
80
// job in the above registered queue.
81
go func() {
82
logger.Info("starting event consumer")
83
-
knotEventSource := knotclient.NewEventSource("localhost:5555")
84
85
ccfg := knotclient.NewConsumerConfig()
86
ccfg.Logger = logger
87
ccfg.Dev = cfg.Server.Dev
88
ccfg.ProcessFunc = spindle.processPipeline
89
ccfg.AddEventSource(knotEventSource)
90
91
ec := knotclient.NewEventConsumer(*ccfg)
···
11
"tangled.sh/tangled.sh/core/api/tangled"
12
"tangled.sh/tangled.sh/core/jetstream"
13
"tangled.sh/tangled.sh/core/knotclient"
14
+
"tangled.sh/tangled.sh/core/knotclient/cursor"
15
"tangled.sh/tangled.sh/core/log"
16
"tangled.sh/tangled.sh/core/notifier"
17
"tangled.sh/tangled.sh/core/rbac"
···
79
// for each incoming sh.tangled.pipeline, we execute
80
// spindle.processPipeline, which in turn enqueues the pipeline
81
// job in the above registered queue.
82
+
cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
83
+
if err != nil {
84
+
return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
85
+
}
86
go func() {
87
logger.Info("starting event consumer")
88
+
knotEventSource := knotclient.NewEventSource("localhost:6000")
89
90
ccfg := knotclient.NewConsumerConfig()
91
ccfg.Logger = logger
92
ccfg.Dev = cfg.Server.Dev
93
ccfg.ProcessFunc = spindle.processPipeline
94
+
ccfg.CursorStore = cursorStore
95
ccfg.AddEventSource(knotEventSource)
96
97
ec := knotclient.NewEventConsumer(*ccfg)