From 1f076c8bbf5cadfceb253985809ab530a84eae85 Mon Sep 17 00:00:00 2001 From: Will Andrews Date: Thu, 8 Jan 2026 06:12:13 +0000 Subject: [PATCH] track the jetstream cursor Signed-off-by: Will Andrews --- consumer.go | 17 ++++++++++- database/database.go | 5 ++++ database/jetstream_cursor.go | 58 ++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 database/jetstream_cursor.go diff --git a/consumer.go b/consumer.go index 1f5d99f..a0ff84a 100644 --- a/consumer.go +++ b/consumer.go @@ -47,7 +47,13 @@ func (c *consumer) Consume(ctx context.Context) error { return fmt.Errorf("failed to create client: %w", err) } - cursor := time.Now().Add(1 * -time.Minute).UnixMicro() + cursor, err := c.handler.store.GetCursor(ctx) + // if cursor can't be fetched, just start from a couple days ago. + if err != nil || cursor == 0 { + cursor = time.Now().Add(-time.Hour * 48).UnixMicro() + } + + slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor) if err := client.ConnectAndRead(ctx, &cursor); err != nil { return fmt.Errorf("connect and read: %w", err) @@ -59,6 +65,8 @@ func (c *consumer) Consume(ctx context.Context) error { type HandlerStore interface { CreateStatus(status Status) error + SaveCursor(ctx context.Context, cursor int64) error + GetCursor(ctx context.Context) (int64, error) } type handler struct { @@ -70,6 +78,13 @@ func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { return nil } + defer func() { + err := h.store.SaveCursor(ctx, event.TimeUS) + if err != nil { + slog.Error("failed to save cursor", "error", err) + } + }() + switch event.Commit.Operation { case models.CommitOperationCreate: return h.handleCreateEvent(ctx, event) diff --git a/database/database.go b/database/database.go index 799bd77..4d97fb5 100644 --- a/database/database.go +++ b/database/database.go @@ -52,6 +52,11 @@ func New(dbPath string) (*DB, error) { return nil, fmt.Errorf("creating profile table: %w", err) } + err = createJetstreamTable(db) + if err != nil { + return nil, fmt.Errorf("creating jetstream table: %w", err) + } + return &DB{db: db}, nil } diff --git a/database/jetstream_cursor.go b/database/jetstream_cursor.go new file mode 100644 index 0000000..6b522ed --- /dev/null +++ b/database/jetstream_cursor.go @@ -0,0 +1,58 @@ +package database + +import ( + "context" + "database/sql" + "fmt" + "log/slog" +) + +func createJetstreamTable(db *sql.DB) error { + createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream ( + "id" integer NOT NULL PRIMARY KEY, + "cursor" INTEGER, + UNIQUE(id) + );` + + slog.Info("Create jetstream table...") + statement, err := db.Prepare(createJetstreamTableSQL) + if err != nil { + return fmt.Errorf("prepare DB statement to create jetstream table: %w", err) + } + _, err = statement.Exec() + if err != nil { + return fmt.Errorf("exec sql statement to create jetstream table: %w", err) + } + slog.Info("jetstream table created") + + return nil +} + +func (d *DB) SaveCursor(ctx context.Context, cursor int64) error { + sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;` + _, err := d.db.Exec(sql, cursor, cursor) + if err != nil { + return fmt.Errorf("exec insert or update cursor: %w", err) + } + + return nil +} + +func (d *DB) GetCursor(ctx context.Context) (int64, error) { + sql := "SELECT cursor FROM jetstream where id = 1;" + rows, err := d.db.Query(sql) + if err != nil { + return 0, fmt.Errorf("run query to get cursor: %w", err) + } + defer rows.Close() + + cursor := 0 + for rows.Next() { + if err := rows.Scan(&cursor); err != nil { + return 0, fmt.Errorf("scan row: %w", err) + } + + return int64(cursor), nil + } + return 0, fmt.Errorf("not found") +} -- 2.43.0