+24
-1
consumer.go
+24
-1
consumer.go
···
34
34
logger: logger,
35
35
handler: handler{
36
36
store: store,
37
+
did: did,
37
38
},
38
39
}
39
40
}
···
47
48
return fmt.Errorf("failed to create client: %w", err)
48
49
}
49
50
50
-
cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
51
+
cursor, err := c.handler.store.GetCursor(ctx, c.handler.did)
52
+
// if error or not found set to be around the time this app was create so that it starts from the begining
53
+
// of when the type of records were first created
54
+
if err != nil || cursor == 0 {
55
+
cursor = time.Date(2025, time.October, 5, 12, 0, 0, 0, time.UTC).UnixMicro()
56
+
}
57
+
58
+
slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor)
51
59
52
60
if err := client.ConnectAndRead(ctx, &cursor); err != nil {
53
61
return fmt.Errorf("connect and read: %w", err)
···
60
68
type HandlerStore interface {
61
69
CreateURL(id, url, did, originHost string, createdAt int64) error
62
70
DeleteURL(id, did string) error
71
+
SaveCursor(ctx context.Context, did string, cursor int64) error
72
+
GetCursor(ctx context.Context, did string) (int64, error)
63
73
}
64
74
65
75
type handler struct {
66
76
store HandlerStore
77
+
did string
67
78
}
68
79
69
80
func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
81
+
if event == nil {
82
+
return nil
83
+
}
84
+
85
+
defer func() {
86
+
err := h.store.SaveCursor(ctx, h.did, event.TimeUS)
87
+
if err != nil {
88
+
slog.Error("failed to save cursor", "error", err)
89
+
}
90
+
}()
70
91
if event.Commit == nil {
71
92
return nil
72
93
}
94
+
95
+
slog.Info("handle event")
73
96
74
97
switch event.Commit.Operation {
75
98
case models.CommitOperationCreate:
+4
database/database.go
+4
database/database.go
+59
database/jetstream_cursor.go
+59
database/jetstream_cursor.go
···
1
+
package database
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
"log/slog"
8
+
)
9
+
10
+
func createJetstreamTable(db *sql.DB) error {
11
+
createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream (
12
+
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
13
+
"did" TEXT,
14
+
"cursor" INTEGER,
15
+
UNIQUE(did)
16
+
);`
17
+
18
+
slog.Info("Create jetstream table...")
19
+
statement, err := db.Prepare(createJetstreamTableSQL)
20
+
if err != nil {
21
+
return fmt.Errorf("prepare DB statement to create jetstream table: %w", err)
22
+
}
23
+
_, err = statement.Exec()
24
+
if err != nil {
25
+
return fmt.Errorf("exec sql statement to create jetstream table: %w", err)
26
+
}
27
+
slog.Info("jetstream table created")
28
+
29
+
return nil
30
+
}
31
+
32
+
func (d *DB) SaveCursor(ctx context.Context, did string, cursor int64) error {
33
+
sql := `INSERT INTO jetstream (did, cursor) VALUES (?, ?) ON CONFLICT(did) DO UPDATE SET cursor = ?;`
34
+
_, err := d.db.Exec(sql, did, cursor, cursor)
35
+
if err != nil {
36
+
return fmt.Errorf("exec insert or update cursor: %w", err)
37
+
}
38
+
39
+
return nil
40
+
}
41
+
42
+
func (d *DB) GetCursor(ctx context.Context, did string) (int64, error) {
43
+
sql := "SELECT cursor FROM jetstream where did = ?;"
44
+
rows, err := d.db.Query(sql, did)
45
+
if err != nil {
46
+
return 0, fmt.Errorf("run query to get cursor: %w", err)
47
+
}
48
+
defer rows.Close()
49
+
50
+
cursor := 0
51
+
for rows.Next() {
52
+
if err := rows.Scan(&cursor); err != nil {
53
+
return 0, fmt.Errorf("scan row: %w", err)
54
+
}
55
+
56
+
return int64(cursor), nil
57
+
}
58
+
return 0, fmt.Errorf("not found")
59
+
}
+3
-2
docker-compose.yaml
+3
-2
docker-compose.yaml