+6
-65
cmd/monarch/cursors.go
+6
-65
cmd/monarch/cursors.go
···
2
2
3
3
import (
4
4
"context"
5
-
"fmt"
6
5
"log/slog"
7
-
"strconv"
8
6
"sync"
9
7
"time"
10
8
11
9
"gorm.io/gorm"
12
10
)
13
11
14
-
type cursorRecord struct {
15
-
ID uint `gorm:"primaryKey"`
16
-
Key string
17
-
Val string
18
-
}
19
-
20
12
type CursorService struct {
21
13
store *gorm.DB
22
14
23
15
firehoseLk sync.Mutex
24
16
firehoseSeq int64
25
-
26
-
reposLk sync.Mutex
27
-
reposSeq string
28
17
}
29
18
30
19
func NewCursorService(store *gorm.DB) *CursorService {
31
-
store.AutoMigrate(&cursorRecord{})
32
-
33
-
var rec cursorRecord
34
-
store.First(&rec, 1)
35
-
if rec.ID == 0 {
36
-
store.Create(&cursorRecord{ID: 1, Key: "firehose", Val: ""})
37
-
}
38
-
39
-
store.First(&rec, 2)
40
-
if rec.ID == 0 {
41
-
store.Create(&cursorRecord{ID: 2, Key: "repos", Val: ""})
42
-
}
20
+
store.AutoMigrate(&firehoseCursor{})
43
21
44
22
return &CursorService{
45
23
store: store,
46
24
}
47
25
}
48
26
49
-
func (cs *CursorService) Get(key string) (string, error) {
50
-
var rec cursorRecord
51
-
if err := cs.store.Where("key = ?", key).First(&rec).Error; err != nil {
52
-
return "", fmt.Errorf("error fetching cursor record: %w", err)
53
-
}
54
-
return rec.Val, nil
55
-
}
56
-
57
-
func (cs *CursorService) SetFirehoseCursor(seq int64) {
58
-
cs.firehoseLk.Lock()
59
-
cs.firehoseSeq = seq
60
-
cs.firehoseLk.Unlock()
61
-
}
62
-
63
-
func (cs *CursorService) SetReposCursor(value string) {
64
-
cs.reposLk.Lock()
65
-
cs.reposSeq = value
66
-
cs.reposLk.Unlock()
67
-
}
68
-
69
-
func (cs *CursorService) Flush() error {
70
-
cs.firehoseLk.Lock()
71
-
fcursor := strconv.Itoa(int(cs.firehoseSeq))
72
-
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "firehose").Update("val", fcursor).Error; err != nil {
73
-
return fmt.Errorf("error updating cursor record: %w", err)
74
-
}
75
-
cs.firehoseLk.Unlock()
76
-
77
-
cs.reposLk.Lock()
78
-
if err := cs.store.Model(&cursorRecord{}).Where("key = ?", "repos").Update("val", cs.reposSeq).Error; err != nil {
79
-
return fmt.Errorf("error updating cursor record: %w", err)
80
-
}
81
-
cs.reposLk.Unlock()
82
-
83
-
return nil
84
-
}
85
-
86
-
func (cs *CursorService) CheckpointCursors(ctx context.Context) {
27
+
func (cs *CursorService) Checkpoint(ctx context.Context) {
87
28
t := time.NewTicker(time.Second * 5)
88
29
defer t.Stop()
89
30
90
31
for {
91
32
select {
92
33
case <-ctx.Done():
93
-
slog.Info("stopping cursor checkpointer")
34
+
slog.Info("stopping cursor checkpointer", "err", ctx.Err())
94
35
return
95
36
case <-t.C:
96
37
}
97
38
98
-
slog.Info("flushing cursors", "firehose", cs.firehoseSeq, "repos", cs.reposSeq)
99
-
if err := cs.Flush(); err != nil {
100
-
slog.Error("error flushing cursors", "err", err)
39
+
slog.Info("persisting firehose cursor", "seq", cs.firehoseSeq)
40
+
if err := cs.PersistFirehoseCursor(); err != nil {
41
+
slog.Error("error persisting firehose cursor", "err", err)
101
42
return
102
43
}
103
44
}
+38
-3
cmd/monarch/firehose.go
+38
-3
cmd/monarch/firehose.go
···
11
11
12
12
func NewFirehoseConnection(ctx context.Context, cctx *cli.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
13
13
url := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", cctx.String("relay-host"))
14
-
curs, _ := cursorSvc.Get("firehose")
15
-
if curs != "" {
16
-
url += "?cursor=" + curs
14
+
curs, err := cursorSvc.GetFirehoseCursor()
15
+
if err == nil { // reversed
16
+
url += fmt.Sprintf("?cursor=%d", curs)
17
17
}
18
18
19
19
conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, http.Header{
···
25
25
26
26
return conn, nil
27
27
}
28
+
29
+
type firehoseCursor struct {
30
+
ID int `gorm:"primaryKey"`
31
+
Key string
32
+
Val int64
33
+
}
34
+
35
+
func (cs *CursorService) GetFirehoseCursor() (int64, error) {
36
+
cs.firehoseLk.Lock()
37
+
defer cs.firehoseLk.Unlock()
38
+
39
+
var fcur firehoseCursor
40
+
if err := cs.store.Where("key = ?", "firehose").Attrs(firehoseCursor{Val: cs.firehoseSeq}).FirstOrCreate(&fcur).Error; err != nil {
41
+
return 0, fmt.Errorf("error getting firehose seq from DB: %w", err)
42
+
}
43
+
return fcur.Val, nil
44
+
}
45
+
46
+
func (cs *CursorService) SetFirehoseCursor(seq int64) {
47
+
cs.firehoseLk.Lock()
48
+
defer cs.firehoseLk.Unlock()
49
+
50
+
cs.firehoseSeq = seq
51
+
}
52
+
53
+
func (cs *CursorService) PersistFirehoseCursor() error {
54
+
cs.firehoseLk.Lock()
55
+
defer cs.firehoseLk.Unlock()
56
+
57
+
var fcur firehoseCursor
58
+
if err := cs.store.Where("key = ?", "firehose").Assign(firehoseCursor{Val: cs.firehoseSeq}).FirstOrCreate(&fcur).Error; err != nil {
59
+
return fmt.Errorf("error persisting firehose seq: %w", err)
60
+
}
61
+
return nil
62
+
}