+1
-1
knotclient/events.go
+1
-1
knotclient/events.go
···
180
180
}
181
181
182
182
// update cursor
183
-
c.cfg.CursorStore.Set(j.source.Knot, time.Now().Unix())
183
+
c.cfg.CursorStore.Set(j.source.Knot, time.Now().UnixNano())
184
184
185
185
if err := c.cfg.ProcessFunc(ctx, j.source, msg); err != nil {
186
186
c.logger.Error("error processing message", "source", j.source, "err", err)
+1
-1
knotserver/events.go
+1
-1
knotserver/events.go
···
54
54
}
55
55
56
56
// complete backfill first before going to live data
57
-
l.Info("going through backfill", "cursor", cursor)
58
57
l.Debug("going through backfill", "cursor", cursor)
59
58
if err := h.streamOps(conn, &cursor); err != nil {
60
59
l.Error("failed to backfill", "err", err)
61
60
return
62
61
}
62
+
63
63
for {
64
64
// wait for new data or timeout
65
65
select {