+6
-1
cmd/hepa/consumer.go
+6
-1
cmd/hepa/consumer.go
···
22
22
func (s *Server) RunConsumer(ctx context.Context) error {
23
23
24
24
// TODO: persist cursor in a database or local disk
25
-
cur := 0
25
+
cur, err := s.ReadLastCursor(ctx)
26
+
if err != nil {
27
+
return err
28
+
}
26
29
27
30
dialer := websocket.DefaultDialer
28
31
u, err := url.Parse(s.bgshost)
···
43
46
44
47
rsc := &events.RepoStreamCallbacks{
45
48
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
49
+
s.lastSeq = evt.Seq
46
50
return s.HandleRepoCommit(ctx, evt)
47
51
},
48
52
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
53
+
s.lastSeq = evt.Seq
49
54
did, err := syntax.ParseDID(evt.Did)
50
55
if err != nil {
51
56
s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
+6
cmd/hepa/main.go
+6
cmd/hepa/main.go
···
166
166
}
167
167
}()
168
168
169
+
go func() {
170
+
if err := srv.RunPersistCursor(ctx); err != nil {
171
+
slog.Error("cursor routine failed", "err", err)
172
+
}
173
+
}()
174
+
169
175
// the main service loop
170
176
if err := srv.RunConsumer(ctx); err != nil {
171
177
return fmt.Errorf("failure consuming and processing firehose: %w", err)
+83
-9
cmd/hepa/server.go
+83
-9
cmd/hepa/server.go
···
17
17
"github.com/bluesky-social/indigo/xrpc"
18
18
19
19
"github.com/prometheus/client_golang/prometheus/promhttp"
20
+
"github.com/redis/go-redis/v9"
20
21
)
21
22
22
23
type Server struct {
23
24
bgshost string
24
25
logger *slog.Logger
25
26
engine *automod.Engine
27
+
rdb *redis.Client
28
+
lastSeq int64
26
29
}
27
30
28
31
type Config struct {
···
83
86
}
84
87
85
88
var counters automod.CountStore
89
+
var cache automod.CacheStore
90
+
var rdb *redis.Client
86
91
if config.RedisURL != "" {
87
-
c, err := automod.NewRedisCountStore(config.RedisURL)
92
+
// generic client, for cursor state
93
+
opt, err := redis.ParseURL(config.RedisURL)
88
94
if err != nil {
89
95
return nil, err
90
96
}
91
-
counters = c
92
-
} else {
93
-
counters = automod.NewMemCountStore()
94
-
}
97
+
rdb = redis.NewClient(opt)
98
+
// check redis connection
99
+
_, err = rdb.Ping(context.TODO()).Result()
100
+
if err != nil {
101
+
return nil, err
102
+
}
95
103
96
-
var cache automod.CacheStore
97
-
if config.RedisURL != "" {
98
-
c, err := automod.NewRedisCacheStore(config.RedisURL, 30*time.Minute)
104
+
cnt, err := automod.NewRedisCountStore(config.RedisURL)
99
105
if err != nil {
100
106
return nil, err
101
107
}
102
-
cache = c
108
+
counters = cnt
109
+
110
+
csh, err := automod.NewRedisCacheStore(config.RedisURL, 30*time.Minute)
111
+
if err != nil {
112
+
return nil, err
113
+
}
114
+
cache = csh
103
115
} else {
116
+
counters = automod.NewMemCountStore()
104
117
cache = automod.NewMemCacheStore(5_000, 30*time.Minute)
105
118
}
106
119
···
122
135
bgshost: config.BGSHost,
123
136
logger: logger,
124
137
engine: &engine,
138
+
rdb: rdb,
125
139
}
126
140
127
141
return s, nil
···
131
145
http.Handle("/metrics", promhttp.Handler())
132
146
return http.ListenAndServe(listen, nil)
133
147
}
148
+
149
+
var cursorKey = "hepa/seq"
150
+
151
+
func (s *Server) ReadLastCursor(ctx context.Context) (int64, error) {
152
+
// if redis isn't configured, just skip
153
+
if s.rdb == nil {
154
+
s.logger.Info("redis not configured, skipping cursor read")
155
+
return 0, nil
156
+
}
157
+
158
+
val, err := s.rdb.Get(ctx, cursorKey).Int64()
159
+
if err == redis.Nil {
160
+
s.logger.Info("no pre-existing cursor in redis")
161
+
return 0, nil
162
+
}
163
+
s.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val)
164
+
return val, err
165
+
}
166
+
167
+
func (s *Server) PersistCursor(ctx context.Context) error {
168
+
// if redis isn't configured, just skip
169
+
if s.rdb == nil {
170
+
return nil
171
+
}
172
+
if s.lastSeq <= 0 {
173
+
return nil
174
+
}
175
+
err := s.rdb.Set(ctx, cursorKey, s.lastSeq, 14*24*time.Hour).Err()
176
+
return err
177
+
}
178
+
179
+
// this method runs in a loop, persisting the current cursor state every 5 seconds
180
+
func (s *Server) RunPersistCursor(ctx context.Context) error {
181
+
182
+
// if redis isn't configured, just skip
183
+
if s.rdb == nil {
184
+
return nil
185
+
}
186
+
ticker := time.NewTicker(5 * time.Second)
187
+
for {
188
+
select {
189
+
case <-ctx.Done():
190
+
if s.lastSeq >= 1 {
191
+
s.logger.Info("persisting final cursor seq value", "seq", s.lastSeq)
192
+
err := s.PersistCursor(ctx)
193
+
if err != nil {
194
+
s.logger.Error("failed to persist cursor", "err", err, "seq", s.lastSeq)
195
+
}
196
+
}
197
+
return nil
198
+
case <-ticker.C:
199
+
if s.lastSeq >= 1 {
200
+
err := s.PersistCursor(ctx)
201
+
if err != nil {
202
+
s.logger.Error("failed to persist cursor", "err", err, "seq", s.lastSeq)
203
+
}
204
+
}
205
+
}
206
+
}
207
+
}