+33
-17
cmd/recordcollector/main.go
+33
-17
cmd/recordcollector/main.go
···
30
30
"app.bsky",
31
31
"chat.bsky",
32
32
"blue.flashes",
33
+
"social.pinksky",
33
34
"jp.5leaf",
34
35
}
35
36
···
113
114
log.Fatalf("failed to create jetstream client: %v", err)
114
115
}
115
116
116
-
var cursor int64
117
-
err = dbCnx.QueryRowContext(ctx, "select val from config where key = 'cursor'").Scan(&cursor)
118
-
if err == sql.ErrNoRows {
119
-
logger.Info("no persisted cursor found")
120
-
} else if err != nil {
121
-
logger.Error("failed obtaining past cursor", "err", err)
122
-
} else {
123
-
logger.Info("found cursor in db", "cursor", cursor)
124
-
}
117
+
// https://bsky.app/profile/icyphox.sh/post/3lkt5wpjbcc2f
118
+
for {
119
+
var cursor int64
120
+
err = dbCnx.QueryRowContext(ctx, "select val from config where key = 'cursor'").Scan(&cursor)
121
+
if err == sql.ErrNoRows {
122
+
logger.Info("no persisted cursor found")
123
+
} else if err != nil {
124
+
logger.Error("failed obtaining past cursor", "err", err)
125
+
} else {
126
+
logger.Info("found cursor in db", "cursor", cursor)
127
+
}
125
128
126
-
var connectCursor *int64
127
-
if cursor > 0 {
128
-
connectCursor = &cursor
129
-
}
129
+
var connectCursor *int64
130
+
if cursor > 0 {
131
+
connectCursor = &cursor
132
+
}
133
+
134
+
connCtx, cancel := context.WithCancel(ctx)
135
+
defer cancel()
136
+
if err := jetstreamClient.ConnectAndRead(connCtx, connectCursor); err != nil {
137
+
logger.Error("error in ConnectAndRead", "err", err)
138
+
cancel()
139
+
}
130
140
131
-
if err := jetstreamClient.ConnectAndRead(ctx, connectCursor); err != nil {
132
-
log.Fatalf("failed to connect: %v", err)
141
+
select {
142
+
case <-ctx.Done():
143
+
logger.Info("stopping")
144
+
return
145
+
case <-connCtx.Done():
146
+
logger.Info("restarting")
147
+
continue
148
+
}
133
149
}
134
150
135
151
logger.Info("shutdown")
···
238
254
h.count++
239
255
if h.count%5000 == 0 {
240
256
cursor := event.TimeUS
241
-
delta := time.Now().UTC().UnixMicro() - cursor
242
-
h.logger.Info("persisting cursor", "cursor", cursor, "delta", delta)
257
+
delta := (time.Now().UTC().UnixMicro() - cursor) / 1_000_000
258
+
h.logger.Info("persisting cursor", "cursor", cursor, "deltaSeconds", delta)
243
259
updateQuery := "insert into config (key, val) values ('cursor', ?) on conflict do update set val = ?"
244
260
if _, err := h.db.ExecContext(ctx, updateQuery, cursor, cursor); err != nil {
245
261
h.logger.Error("failed persisting cursor", "err", err)