+6
knotclient/events.go
+6
knotclient/events.go
···
150
150
}
151
151
152
152
func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
153
+
// we are already listening to this source
154
+
if _, ok := c.cfg.Sources[s]; ok {
155
+
c.logger.Info("source already present", "source", s)
156
+
return
157
+
}
158
+
153
159
c.cfgMu.Lock()
154
160
c.cfg.Sources[s] = struct{}{}
155
161
c.wg.Add(1)
+7
-6
spindle/server.go
+7
-6
spindle/server.go
···
70
70
tangled.SpindleMemberNSID,
71
71
tangled.RepoNSID,
72
72
}
73
-
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, false, false)
73
+
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, true)
74
74
if err != nil {
75
75
return fmt.Errorf("failed to setup jetstream client: %w", err)
76
76
}
77
+
jc.AddDid(cfg.Server.Owner)
77
78
78
79
spindle := Spindle{
79
80
jc: jc,
···
118
119
ccfg.Dev = cfg.Server.Dev
119
120
ccfg.ProcessFunc = spindle.processPipeline
120
121
ccfg.CursorStore = cursorStore
121
-
knotstream := knotclient.NewEventConsumer(*ccfg)
122
122
knownKnots, err := d.Knots()
123
123
if err != nil {
124
124
return err
125
125
}
126
126
for _, knot := range knownKnots {
127
-
knotstream.AddSource(ctx, knotclient.NewEventSource(knot))
127
+
logger.Info("adding source start", "knot", knot)
128
+
ccfg.Sources[knotclient.EventSource{knot}] = struct{}{}
128
129
}
129
-
spindle.ks = knotstream
130
+
spindle.ks = knotclient.NewEventConsumer(*ccfg)
130
131
131
132
go func() {
132
-
logger.Info("starting knot event consumer", "knots")
133
-
knotstream.Start(ctx)
133
+
logger.Info("starting knot event consumer")
134
+
spindle.ks.Start(ctx)
134
135
}()
135
136
136
137
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)