Track Jetstream cursor #1

merged
opened by willdot.net targeting main from jetstream-cursor

Signed-off-by: Will Andrews did:plc:dadhhalkfcq3gucaq25hjqon

Changed files
+79 -1
database
+16 -1
consumer.go
··· 47 47 return fmt.Errorf("failed to create client: %w", err) 48 48 } 49 49 50 - cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 50 + cursor, err := c.handler.store.GetCursor(ctx) 51 + // if cursor can't be fetched, just start from a couple days ago. 52 + if err != nil || cursor == 0 { 53 + cursor = time.Now().Add(-time.Hour * 48).UnixMicro() 54 + } 55 + 56 + slog.Info("starting from cursor", "time", time.UnixMicro(cursor), "cursor", cursor) 51 57 52 58 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 53 59 return fmt.Errorf("connect and read: %w", err) ··· 59 65 60 66 type HandlerStore interface { 61 67 CreateStatus(status Status) error 68 + SaveCursor(ctx context.Context, cursor int64) error 69 + GetCursor(ctx context.Context) (int64, error) 62 70 } 63 71 64 72 type handler struct { ··· 70 78 return nil 71 79 } 72 80 81 + defer func() { 82 + err := h.store.SaveCursor(ctx, event.TimeUS) 83 + if err != nil { 84 + slog.Error("failed to save cursor", "error", err) 85 + } 86 + }() 87 + 73 88 switch event.Commit.Operation { 74 89 case models.CommitOperationCreate: 75 90 return h.handleCreateEvent(ctx, event)
+5
database/database.go
··· 52 52 return nil, fmt.Errorf("creating profile table: %w", err) 53 53 } 54 54 55 + err = createJetstreamTable(db) 56 + if err != nil { 57 + return nil, fmt.Errorf("creating jetstream table: %w", err) 58 + } 59 + 55 60 return &DB{db: db}, nil 56 61 } 57 62
+58
database/jetstream_cursor.go
··· 1 + package database 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + ) 9 + 10 + func createJetstreamTable(db *sql.DB) error { 11 + createJetstreamTableSQL := `CREATE TABLE IF NOT EXISTS jetstream ( 12 + "id" integer NOT NULL PRIMARY KEY, 13 + "cursor" INTEGER, 14 + UNIQUE(id) 15 + );` 16 + 17 + slog.Info("Create jetstream table...") 18 + statement, err := db.Prepare(createJetstreamTableSQL) 19 + if err != nil { 20 + return fmt.Errorf("prepare DB statement to create jetstream table: %w", err) 21 + } 22 + _, err = statement.Exec() 23 + if err != nil { 24 + return fmt.Errorf("exec sql statement to create jetstream table: %w", err) 25 + } 26 + slog.Info("jetstream table created") 27 + 28 + return nil 29 + } 30 + 31 + func (d *DB) SaveCursor(ctx context.Context, cursor int64) error { 32 + sql := `INSERT INTO jetstream (id, cursor) VALUES (1, ?) ON CONFLICT(id) DO UPDATE SET cursor = ?;` 33 + _, err := d.db.Exec(sql, cursor, cursor) 34 + if err != nil { 35 + return fmt.Errorf("exec insert or update cursor: %w", err) 36 + } 37 + 38 + return nil 39 + } 40 + 41 + func (d *DB) GetCursor(ctx context.Context) (int64, error) { 42 + sql := "SELECT cursor FROM jetstream where id = 1;" 43 + rows, err := d.db.Query(sql) 44 + if err != nil { 45 + return 0, fmt.Errorf("run query to get cursor: %w", err) 46 + } 47 + defer rows.Close() 48 + 49 + cursor := 0 50 + for rows.Next() { 51 + if err := rows.Scan(&cursor); err != nil { 52 + return 0, fmt.Errorf("scan row: %w", err) 53 + } 54 + 55 + return int64(cursor), nil 56 + } 57 + return 0, fmt.Errorf("not found") 58 + }