Signed-off-by: Will Andrews did:plc:dadhhalkfcq3gucaq25hjqon
+16
-1
consumer.go
+16
-1
consumer.go
···
47
47
return fmt.Errorf("failed to create client: %w", err)
48
48
}
49
49
50
-
cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
50
+
cursor, err := c.handler.store.GetCursor(ctx)
51
+
// if cursor can't be fetched, just start from a couple days ago.
52
+
if err != nil || cursor == 0 {
53
+
cursor = time.Now().Add(-time.Hour * 48).UnixMicro()
54
+
}
55
+
56
+
slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor)
51
57
52
58
if err := client.ConnectAndRead(ctx, &cursor); err != nil {
53
59
return fmt.Errorf("connect and read: %w", err)
···
59
65
60
66
type HandlerStore interface {
61
67
CreateStatus(status Status) error
68
+
SaveCursor(ctx context.Context, cursor int64) error
69
+
GetCursor(ctx context.Context) (int64, error)
62
70
}
63
71
64
72
type handler struct {
···
70
78
return nil
71
79
}
72
80
81
+
defer func() {
82
+
err := h.store.SaveCursor(ctx, event.TimeUS)
83
+
if err != nil {
84
+
slog.Error("failed to save cursor", "error", err)
85
+
}
86
+
}()
87
+
73
88
switch event.Commit.Operation {
74
89
case models.CommitOperationCreate:
75
90
return h.handleCreateEvent(ctx, event)
+5
database/database.go
+5
database/database.go
+58
database/jetstream_cursor.go
+58
database/jetstream_cursor.go
···
1
+
package database
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
"log/slog"
8
+
)
9
+
10
+
func createJetstreamTable(db *sql.DB) error {
11
+
createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream (
12
+
"id" integer NOT NULL PRIMARY KEY,
13
+
"cursor" INTEGER,
14
+
UNIQUE(id)
15
+
);`
16
+
17
+
slog.Info("Create jetstream table...")
18
+
statement, err := db.Prepare(createJetstreamTableSQL)
19
+
if err != nil {
20
+
return fmt.Errorf("prepare DB statement to create jetstream table: %w", err)
21
+
}
22
+
_, err = statement.Exec()
23
+
if err != nil {
24
+
return fmt.Errorf("exec sql statement to create jetstream table: %w", err)
25
+
}
26
+
slog.Info("jetstream table created")
27
+
28
+
return nil
29
+
}
30
+
31
+
func (d *DB) SaveCursor(ctx context.Context, cursor int64) error {
32
+
sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;`
33
+
_, err := d.db.Exec(sql, cursor, cursor)
34
+
if err != nil {
35
+
return fmt.Errorf("exec insert or update cursor: %w", err)
36
+
}
37
+
38
+
return nil
39
+
}
40
+
41
+
func (d *DB) GetCursor(ctx context.Context) (int64, error) {
42
+
sql := "SELECT cursor FROM jetstream where id = 1;"
43
+
rows, err := d.db.Query(sql)
44
+
if err != nil {
45
+
return 0, fmt.Errorf("run query to get cursor: %w", err)
46
+
}
47
+
defer rows.Close()
48
+
49
+
cursor := 0
50
+
for rows.Next() {
51
+
if err := rows.Scan(&cursor); err != nil {
52
+
return 0, fmt.Errorf("scan row: %w", err)
53
+
}
54
+
55
+
return int64(cursor), nil
56
+
}
57
+
return 0, fmt.Errorf("not found")
58
+
}