+397
backfill/backfill.go
+397
backfill/backfill.go
···
1
+
package backfill
2
+
3
+
import (
4
+
"context"
5
+
"errors"
6
+
"fmt"
7
+
"net/http"
8
+
"sync"
9
+
"time"
10
+
11
+
// Blank import to register types for CBORGEN
12
+
_ "github.com/bluesky-social/indigo/api/bsky"
13
+
"github.com/bluesky-social/indigo/repo"
14
+
"github.com/bluesky-social/indigo/repomgr"
15
+
"github.com/ipfs/go-cid"
16
+
typegen "github.com/whyrusleeping/cbor-gen"
17
+
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
18
+
"go.opentelemetry.io/otel"
19
+
"go.uber.org/zap"
20
+
"golang.org/x/time/rate"
21
+
)
22
+
23
+
// Job is an interface for a backfill job
24
+
type Job interface {
25
+
Repo() string
26
+
State() string
27
+
SetState(ctx context.Context, state string) error
28
+
29
+
// FlushBufferedOps calls the given callback for each buffered operation
30
+
// Once done it clears the buffer and marks the job as "complete"
31
+
// Allowing the Job interface to abstract away the details of how buffered
32
+
// operations are stored and/or locked
33
+
FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error
34
+
35
+
ClearBufferedOps(ctx context.Context) error
36
+
}
37
+
38
+
// Store is an interface for a backfill store which holds Jobs
39
+
type Store interface {
40
+
// BufferOp buffers an operation for a job and returns true if the operation was buffered
41
+
// If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete)
42
+
BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error)
43
+
GetJob(ctx context.Context, repo string) (Job, error)
44
+
GetNextEnqueuedJob(ctx context.Context) (Job, error)
45
+
}
46
+
47
+
// Backfiller is a struct which handles backfilling a repo
48
+
type Backfiller struct {
49
+
Name string
50
+
HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error
51
+
HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error
52
+
HandleDeleteRecord func(ctx context.Context, repo string, path string) error
53
+
Store Store
54
+
55
+
// Number of backfills to process in parallel
56
+
ParallelBackfills int
57
+
// Number of records to process in parallel for each backfill
58
+
ParallelRecordCreates int
59
+
// Prefix match for records to backfill i.e. app.bsky.feed.app/
60
+
// If empty, all records will be backfilled
61
+
NSIDFilter string
62
+
CheckoutPath string
63
+
64
+
Logger *zap.SugaredLogger
65
+
syncLimiter *rate.Limiter
66
+
67
+
magicHeaderKey string
68
+
magicHeaderVal string
69
+
70
+
stop chan chan struct{}
71
+
}
72
+
73
+
var (
74
+
// StateEnqueued is the state of a backfill job when it is first created
75
+
StateEnqueued = "enqueued"
76
+
// StateInProgress is the state of a backfill job when it is being processed
77
+
StateInProgress = "in_progress"
78
+
// StateComplete is the state of a backfill job when it has been processed
79
+
StateComplete = "complete"
80
+
)
81
+
82
+
// ErrJobComplete is returned when trying to buffer an op for a job that is complete
83
+
var ErrJobComplete = errors.New("job is complete")
84
+
85
+
// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist
86
+
var ErrJobNotFound = errors.New("job not found")
87
+
88
+
var tracer = otel.Tracer("backfiller")
89
+
90
+
type BackfillOptions struct {
91
+
ParallelBackfills int
92
+
ParallelRecordCreates int
93
+
NSIDFilter string
94
+
SyncRequestsPerSecond int
95
+
CheckoutPath string
96
+
}
97
+
98
+
func DefaultBackfillOptions() *BackfillOptions {
99
+
return &BackfillOptions{
100
+
ParallelBackfills: 10,
101
+
ParallelRecordCreates: 100,
102
+
NSIDFilter: "",
103
+
SyncRequestsPerSecond: 2,
104
+
CheckoutPath: "https://bsky.social/xrpc/com.atproto.sync.getCheckout",
105
+
}
106
+
}
107
+
108
+
// NewBackfiller creates a new Backfiller
109
+
func NewBackfiller(
110
+
name string,
111
+
store Store,
112
+
handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error,
113
+
handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error,
114
+
handleDelete func(ctx context.Context, repo string, path string) error,
115
+
logger *zap.SugaredLogger,
116
+
opts *BackfillOptions,
117
+
) *Backfiller {
118
+
if opts == nil {
119
+
opts = DefaultBackfillOptions()
120
+
}
121
+
return &Backfiller{
122
+
Name: name,
123
+
Store: store,
124
+
HandleCreateRecord: handleCreate,
125
+
HandleUpdateRecord: handleUpdate,
126
+
HandleDeleteRecord: handleDelete,
127
+
ParallelBackfills: opts.ParallelBackfills,
128
+
ParallelRecordCreates: opts.ParallelRecordCreates,
129
+
NSIDFilter: opts.NSIDFilter,
130
+
Logger: logger,
131
+
syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1),
132
+
CheckoutPath: opts.CheckoutPath,
133
+
stop: make(chan chan struct{}),
134
+
}
135
+
}
136
+
137
+
// Start starts the backfill processor routine
138
+
func (b *Backfiller) Start() {
139
+
ctx := context.Background()
140
+
141
+
log := b.Logger.With("source", "backfiller_main")
142
+
log.Info("starting backfill processor")
143
+
144
+
sem := make(chan struct{}, b.ParallelBackfills)
145
+
146
+
for {
147
+
select {
148
+
case stopped := <-b.stop:
149
+
log.Info("stopping backfill processor")
150
+
close(stopped)
151
+
return
152
+
default:
153
+
}
154
+
155
+
// Get the next job
156
+
job, err := b.Store.GetNextEnqueuedJob(ctx)
157
+
if err != nil {
158
+
log.Errorf("failed to get next backfill: %+v", err)
159
+
time.Sleep(1 * time.Second)
160
+
continue
161
+
} else if job == nil {
162
+
time.Sleep(1 * time.Second)
163
+
continue
164
+
}
165
+
166
+
// Mark the backfill as "in progress"
167
+
err = job.SetState(ctx, StateInProgress)
168
+
if err != nil {
169
+
log.Errorf("failed to set backfill state: %+v", err)
170
+
continue
171
+
}
172
+
173
+
sem <- struct{}{}
174
+
go func(j Job) {
175
+
b.BackfillRepo(ctx, j)
176
+
backfillJobsProcessed.WithLabelValues(b.Name).Inc()
177
+
<-sem
178
+
}(job)
179
+
}
180
+
}
181
+
182
+
// Stop stops the backfill processor
183
+
func (b *Backfiller) Stop() {
184
+
b.Logger.Info("stopping backfill processor")
185
+
stopped := make(chan struct{})
186
+
b.stop <- stopped
187
+
<-stopped
188
+
b.Logger.Info("backfill processor stopped")
189
+
}
190
+
191
+
// FlushBuffer processes buffered operations for a job
192
+
func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int {
193
+
ctx, span := tracer.Start(ctx, "FlushBuffer")
194
+
defer span.End()
195
+
log := b.Logger.With("source", "backfiller_buffer_flush", "repo", job.Repo())
196
+
197
+
processed := 0
198
+
199
+
repo := job.Repo()
200
+
201
+
// Flush buffered operations, clear the buffer, and mark the job as "complete"
202
+
// Clearning and marking are handled by the job interface
203
+
err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error {
204
+
switch repomgr.EventKind(kind) {
205
+
case repomgr.EvtKindCreateRecord:
206
+
err := b.HandleCreateRecord(ctx, repo, path, rec, cid)
207
+
if err != nil {
208
+
log.Errorf("failed to handle create record: %+v", err)
209
+
}
210
+
case repomgr.EvtKindUpdateRecord:
211
+
err := b.HandleUpdateRecord(ctx, repo, path, rec, cid)
212
+
if err != nil {
213
+
log.Errorf("failed to handle update record: %+v", err)
214
+
}
215
+
case repomgr.EvtKindDeleteRecord:
216
+
err := b.HandleDeleteRecord(ctx, repo, path)
217
+
if err != nil {
218
+
log.Errorf("failed to handle delete record: %+v", err)
219
+
}
220
+
}
221
+
backfillOpsBuffered.WithLabelValues(b.Name).Dec()
222
+
processed++
223
+
return nil
224
+
})
225
+
if err != nil {
226
+
log.Errorf("failed to flush buffered ops: %+v", err)
227
+
}
228
+
229
+
return processed
230
+
}
231
+
232
+
type recordQueueItem struct {
233
+
recordPath string
234
+
nodeCid cid.Cid
235
+
}
236
+
237
+
type recordResult struct {
238
+
recordPath string
239
+
err error
240
+
}
241
+
242
+
// BackfillRepo backfills a repo
243
+
func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) {
244
+
ctx, span := tracer.Start(ctx, "BackfillRepo")
245
+
defer span.End()
246
+
247
+
start := time.Now()
248
+
249
+
repoDid := job.Repo()
250
+
251
+
log := b.Logger.With("source", "backfiller_backfill_repo", "repo", repoDid)
252
+
log.Infof("processing backfill for %s", repoDid)
253
+
254
+
var url = fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid)
255
+
256
+
// GET and CAR decode the body
257
+
client := &http.Client{
258
+
Transport: otelhttp.NewTransport(http.DefaultTransport),
259
+
Timeout: 120 * time.Second,
260
+
}
261
+
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
262
+
if err != nil {
263
+
log.Errorf("Error creating request: %v", err)
264
+
return
265
+
}
266
+
267
+
req.Header.Set("Accept", "application/vnd.ipld.car")
268
+
req.Header.Set("User-Agent", fmt.Sprintf("atproto-backfill-%s/0.0.1", b.Name))
269
+
if b.magicHeaderKey != "" && b.magicHeaderVal != "" {
270
+
req.Header.Set(b.magicHeaderKey, b.magicHeaderVal)
271
+
}
272
+
273
+
b.syncLimiter.Wait(ctx)
274
+
275
+
resp, err := client.Do(req)
276
+
if err != nil {
277
+
log.Errorf("Error sending request: %v", err)
278
+
return
279
+
}
280
+
281
+
if resp.StatusCode != http.StatusOK {
282
+
log.Errorf("Error response: %v", resp.StatusCode)
283
+
reason := "unknown error"
284
+
if resp.StatusCode == http.StatusBadRequest {
285
+
reason = "repo not found"
286
+
}
287
+
state := fmt.Sprintf("failed (%s)", reason)
288
+
289
+
// Mark the job as "failed"
290
+
err := job.SetState(ctx, state)
291
+
if err != nil {
292
+
log.Errorf("failed to set job state: %+v", err)
293
+
}
294
+
295
+
// Clear buffered ops
296
+
err = job.ClearBufferedOps(ctx)
297
+
if err != nil {
298
+
log.Errorf("failed to clear buffered ops: %+v", err)
299
+
}
300
+
return
301
+
}
302
+
303
+
instrumentedReader := instrumentedReader{
304
+
source: resp.Body,
305
+
counter: backfillBytesProcessed.WithLabelValues(b.Name),
306
+
}
307
+
308
+
defer instrumentedReader.Close()
309
+
310
+
r, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
311
+
if err != nil {
312
+
log.Errorf("Error reading repo: %v", err)
313
+
314
+
state := "failed (couldn't read repo CAR from response body)"
315
+
316
+
// Mark the job as "failed"
317
+
err := job.SetState(ctx, state)
318
+
if err != nil {
319
+
log.Errorf("failed to set job state: %+v", err)
320
+
}
321
+
322
+
// Clear buffered ops
323
+
err = job.ClearBufferedOps(ctx)
324
+
if err != nil {
325
+
log.Errorf("failed to clear buffered ops: %+v", err)
326
+
}
327
+
return
328
+
}
329
+
330
+
numRecords := 0
331
+
numRoutines := b.ParallelRecordCreates
332
+
recordQueue := make(chan recordQueueItem, numRoutines)
333
+
recordResults := make(chan recordResult, numRoutines)
334
+
335
+
wg := sync.WaitGroup{}
336
+
337
+
// Producer routine
338
+
go func() {
339
+
defer close(recordQueue)
340
+
r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
341
+
numRecords++
342
+
recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
343
+
return nil
344
+
})
345
+
}()
346
+
347
+
// Consumer routines
348
+
for i := 0; i < numRoutines; i++ {
349
+
wg.Add(1)
350
+
go func() {
351
+
defer wg.Done()
352
+
for item := range recordQueue {
353
+
recordCid, rec, err := r.GetRecord(ctx, item.recordPath)
354
+
if err != nil {
355
+
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get record: %w", err)}
356
+
continue
357
+
}
358
+
359
+
// Verify that the record cid matches the cid in the event
360
+
if recordCid != item.nodeCid {
361
+
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("mismatch in record and op cid: %s != %s", recordCid, item.nodeCid)}
362
+
continue
363
+
}
364
+
365
+
err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &rec, &recordCid)
366
+
if err != nil {
367
+
recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
368
+
continue
369
+
}
370
+
371
+
backfillRecordsProcessed.WithLabelValues(b.Name).Inc()
372
+
recordResults <- recordResult{recordPath: item.recordPath, err: err}
373
+
}
374
+
}()
375
+
}
376
+
377
+
resultWG := sync.WaitGroup{}
378
+
resultWG.Add(1)
379
+
// Handle results
380
+
go func() {
381
+
defer resultWG.Done()
382
+
for result := range recordResults {
383
+
if result.err != nil {
384
+
log.Errorf("Error processing record %s: %v", result.recordPath, result.err)
385
+
}
386
+
}
387
+
}()
388
+
389
+
wg.Wait()
390
+
close(recordResults)
391
+
resultWG.Wait()
392
+
393
+
// Process buffered operations, marking the job as "complete" when done
394
+
numProcessed := b.FlushBuffer(ctx, job)
395
+
396
+
log.Infow("backfill complete", "buffered_records_processed", numProcessed, "records_backfilled", numRecords, "duration", time.Since(start))
397
+
}
+124
backfill/backfill_test.go
+124
backfill/backfill_test.go
···
1
+
package backfill_test
2
+
3
+
import (
4
+
"context"
5
+
"sync"
6
+
"testing"
7
+
"time"
8
+
9
+
"github.com/bluesky-social/indigo/backfill"
10
+
"github.com/ipfs/go-cid"
11
+
typegen "github.com/whyrusleeping/cbor-gen"
12
+
"go.uber.org/zap"
13
+
)
14
+
15
+
var logger *zap.SugaredLogger
16
+
17
+
type testState struct {
18
+
creates int
19
+
updates int
20
+
deletes int
21
+
lk sync.Mutex
22
+
}
23
+
24
+
func TestBackfill(t *testing.T) {
25
+
ctx := context.Background()
26
+
27
+
testRepos := []string{
28
+
"did:plc:q6gjnaw2blty4crticxkmujt",
29
+
"did:plc:f5f4diimystr7ima7nqvamhe",
30
+
"did:plc:t7y4sud4dhptvzz7ibnv5cbt",
31
+
}
32
+
33
+
mem := backfill.NewMemstore()
34
+
35
+
rawLog, err := zap.NewDevelopment()
36
+
if err != nil {
37
+
t.Fatal(err)
38
+
}
39
+
40
+
logger = rawLog.Sugar()
41
+
42
+
ts := &testState{}
43
+
44
+
opts := backfill.DefaultBackfillOptions()
45
+
opts.NSIDFilter = "app.bsky.feed.follow/"
46
+
47
+
bf := backfill.NewBackfiller(
48
+
"backfill-test",
49
+
mem,
50
+
ts.handleCreate,
51
+
ts.handleUpdate,
52
+
ts.handleDelete,
53
+
logger,
54
+
opts,
55
+
)
56
+
57
+
logger.Info("starting backfiller")
58
+
59
+
go bf.Start()
60
+
61
+
for _, repo := range testRepos {
62
+
mem.EnqueueJob(repo)
63
+
}
64
+
65
+
// Wait until job 0 is in progress
66
+
for {
67
+
s, err := mem.GetJob(ctx, testRepos[0])
68
+
if err != nil {
69
+
t.Fatal(err)
70
+
}
71
+
if s.State() == backfill.StateInProgress {
72
+
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef)
73
+
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef)
74
+
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef)
75
+
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef)
76
+
mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef)
77
+
78
+
mem.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef)
79
+
80
+
mem.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef)
81
+
82
+
break
83
+
}
84
+
time.Sleep(100 * time.Millisecond)
85
+
}
86
+
87
+
for {
88
+
ts.lk.Lock()
89
+
if ts.deletes >= 5 && ts.creates >= 1 && ts.updates >= 1 {
90
+
ts.lk.Unlock()
91
+
break
92
+
}
93
+
ts.lk.Unlock()
94
+
time.Sleep(100 * time.Millisecond)
95
+
}
96
+
97
+
bf.Stop()
98
+
99
+
logger.Infof("shutting down")
100
+
}
101
+
102
+
func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error {
103
+
logger.Infof("got create: %s %s", repo, path)
104
+
ts.lk.Lock()
105
+
ts.creates++
106
+
ts.lk.Unlock()
107
+
return nil
108
+
}
109
+
110
+
func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error {
111
+
logger.Infof("got update: %s %s", repo, path)
112
+
ts.lk.Lock()
113
+
ts.updates++
114
+
ts.lk.Unlock()
115
+
return nil
116
+
}
117
+
118
+
func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error {
119
+
logger.Infof("got delete: %s %s", repo, path)
120
+
ts.lk.Lock()
121
+
ts.deletes++
122
+
ts.lk.Unlock()
123
+
return nil
124
+
}
+215
backfill/gormstore.go
+215
backfill/gormstore.go
···
1
+
package backfill
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"sync"
7
+
"time"
8
+
9
+
"github.com/ipfs/go-cid"
10
+
typegen "github.com/whyrusleeping/cbor-gen"
11
+
"gorm.io/gorm"
12
+
)
13
+
14
+
type Gormjob struct {
15
+
repo string
16
+
state string
17
+
lk sync.Mutex
18
+
bufferedOps map[string][]*bufferedOp
19
+
20
+
dbj *GormDBJob
21
+
db *gorm.DB
22
+
23
+
createdAt time.Time
24
+
updatedAt time.Time
25
+
}
26
+
27
+
type GormDBJob struct {
28
+
gorm.Model
29
+
Repo string `gorm:"unique;index"`
30
+
State string `gorm:"index"`
31
+
}
32
+
33
+
// Gormstore is a gorm-backed implementation of the Backfill Store interface
34
+
type Gormstore struct {
35
+
lk sync.RWMutex
36
+
jobs map[string]*Gormjob
37
+
db *gorm.DB
38
+
}
39
+
40
+
func NewGormstore(db *gorm.DB) *Gormstore {
41
+
return &Gormstore{
42
+
jobs: make(map[string]*Gormjob),
43
+
db: db,
44
+
}
45
+
}
46
+
47
+
func (s *Gormstore) LoadJobs(ctx context.Context) error {
48
+
// Load all jobs from the database
49
+
var dbjobs []*GormDBJob
50
+
if err := s.db.Find(&dbjobs).Error; err != nil {
51
+
return err
52
+
}
53
+
54
+
s.lk.Lock()
55
+
defer s.lk.Unlock()
56
+
57
+
// Convert them to in-memory jobs
58
+
for i := range dbjobs {
59
+
dbj := dbjobs[i]
60
+
j := &Gormjob{
61
+
repo: dbj.Repo,
62
+
state: dbj.State,
63
+
bufferedOps: map[string][]*bufferedOp{},
64
+
createdAt: dbj.CreatedAt,
65
+
updatedAt: dbj.UpdatedAt,
66
+
67
+
dbj: dbj,
68
+
db: s.db,
69
+
}
70
+
s.jobs[dbj.Repo] = j
71
+
}
72
+
73
+
return nil
74
+
}
75
+
76
+
func (s *Gormstore) EnqueueJob(repo string) error {
77
+
// Persist the job to the database
78
+
dbj := &GormDBJob{
79
+
Repo: repo,
80
+
State: StateEnqueued,
81
+
}
82
+
if err := s.db.Create(dbj).Error; err != nil {
83
+
if err == gorm.ErrDuplicatedKey {
84
+
return nil
85
+
}
86
+
return err
87
+
}
88
+
89
+
s.lk.Lock()
90
+
defer s.lk.Unlock()
91
+
92
+
// Convert it to an in-memory job
93
+
if _, ok := s.jobs[repo]; ok {
94
+
// The DB create should have errored if the job already existed, but just in case
95
+
return fmt.Errorf("job already exists for repo %s", repo)
96
+
}
97
+
98
+
j := &Gormjob{
99
+
repo: repo,
100
+
createdAt: time.Now(),
101
+
updatedAt: time.Now(),
102
+
state: StateEnqueued,
103
+
bufferedOps: map[string][]*bufferedOp{},
104
+
105
+
dbj: dbj,
106
+
db: s.db,
107
+
}
108
+
s.jobs[repo] = j
109
+
110
+
return nil
111
+
}
112
+
113
+
func (s *Gormstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) {
114
+
s.lk.RLock()
115
+
116
+
// If the job doesn't exist, we can't buffer an op for it
117
+
j, ok := s.jobs[repo]
118
+
s.lk.RUnlock()
119
+
if !ok {
120
+
return false, ErrJobNotFound
121
+
}
122
+
123
+
j.lk.Lock()
124
+
defer j.lk.Unlock()
125
+
126
+
switch j.state {
127
+
case StateComplete:
128
+
return false, ErrJobComplete
129
+
case StateInProgress:
130
+
// keep going and buffer the op
131
+
default:
132
+
return false, nil
133
+
}
134
+
135
+
j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{
136
+
kind: kind,
137
+
rec: rec,
138
+
cid: cid,
139
+
})
140
+
j.updatedAt = time.Now()
141
+
return true, nil
142
+
}
143
+
144
+
func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) {
145
+
s.lk.RLock()
146
+
defer s.lk.RUnlock()
147
+
148
+
j, ok := s.jobs[repo]
149
+
if !ok || j == nil {
150
+
return nil, nil
151
+
}
152
+
return j, nil
153
+
}
154
+
155
+
func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) {
156
+
s.lk.RLock()
157
+
defer s.lk.RUnlock()
158
+
159
+
for _, j := range s.jobs {
160
+
if j.State() == StateEnqueued {
161
+
return j, nil
162
+
}
163
+
}
164
+
return nil, nil
165
+
}
166
+
167
+
func (j *Gormjob) Repo() string {
168
+
return j.repo
169
+
}
170
+
171
+
func (j *Gormjob) State() string {
172
+
j.lk.Lock()
173
+
defer j.lk.Unlock()
174
+
175
+
return j.state
176
+
}
177
+
178
+
func (j *Gormjob) SetState(ctx context.Context, state string) error {
179
+
j.lk.Lock()
180
+
defer j.lk.Unlock()
181
+
182
+
j.state = state
183
+
j.updatedAt = time.Now()
184
+
185
+
// Persist the job to the database
186
+
j.dbj.State = state
187
+
return j.db.Save(j.dbj).Error
188
+
}
189
+
190
+
func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error {
191
+
j.lk.Lock()
192
+
defer j.lk.Unlock()
193
+
194
+
for path, ops := range j.bufferedOps {
195
+
for _, op := range ops {
196
+
if err := fn(op.kind, path, op.rec, op.cid); err != nil {
197
+
return err
198
+
}
199
+
}
200
+
}
201
+
202
+
j.bufferedOps = map[string][]*bufferedOp{}
203
+
j.state = StateComplete
204
+
205
+
return nil
206
+
}
207
+
208
+
func (j *Gormjob) ClearBufferedOps(ctx context.Context) error {
209
+
j.lk.Lock()
210
+
defer j.lk.Unlock()
211
+
212
+
j.bufferedOps = map[string][]*bufferedOp{}
213
+
j.updatedAt = time.Now()
214
+
return nil
215
+
}
+159
backfill/memstore.go
+159
backfill/memstore.go
···
1
+
package backfill
2
+
3
+
import (
4
+
"context"
5
+
"fmt"
6
+
"sync"
7
+
"time"
8
+
9
+
"github.com/ipfs/go-cid"
10
+
typegen "github.com/whyrusleeping/cbor-gen"
11
+
)
12
+
13
+
type bufferedOp struct {
14
+
kind string
15
+
rec *typegen.CBORMarshaler
16
+
cid *cid.Cid
17
+
}
18
+
19
+
type Memjob struct {
20
+
repo string
21
+
state string
22
+
lk sync.Mutex
23
+
bufferedOps map[string][]*bufferedOp
24
+
25
+
createdAt time.Time
26
+
updatedAt time.Time
27
+
}
28
+
29
+
// Memstore is a simple in-memory implementation of the Backfill Store interface
30
+
type Memstore struct {
31
+
lk sync.RWMutex
32
+
jobs map[string]*Memjob
33
+
}
34
+
35
+
func NewMemstore() *Memstore {
36
+
return &Memstore{
37
+
jobs: make(map[string]*Memjob),
38
+
}
39
+
}
40
+
41
+
func (s *Memstore) EnqueueJob(repo string) error {
42
+
s.lk.Lock()
43
+
defer s.lk.Unlock()
44
+
45
+
if _, ok := s.jobs[repo]; ok {
46
+
return fmt.Errorf("job already exists for repo %s", repo)
47
+
}
48
+
49
+
j := &Memjob{
50
+
repo: repo,
51
+
createdAt: time.Now(),
52
+
updatedAt: time.Now(),
53
+
state: StateEnqueued,
54
+
bufferedOps: map[string][]*bufferedOp{},
55
+
}
56
+
s.jobs[repo] = j
57
+
return nil
58
+
}
59
+
60
+
func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) {
61
+
s.lk.Lock()
62
+
63
+
// If the job doesn't exist, we can't buffer an op for it
64
+
j, ok := s.jobs[repo]
65
+
s.lk.Unlock()
66
+
if !ok {
67
+
return false, ErrJobNotFound
68
+
}
69
+
70
+
j.lk.Lock()
71
+
defer j.lk.Unlock()
72
+
73
+
switch j.state {
74
+
case StateComplete:
75
+
return false, ErrJobComplete
76
+
case StateInProgress:
77
+
// keep going and buffer the op
78
+
default:
79
+
return false, nil
80
+
}
81
+
82
+
j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{
83
+
kind: kind,
84
+
rec: rec,
85
+
cid: cid,
86
+
})
87
+
j.updatedAt = time.Now()
88
+
return true, nil
89
+
}
90
+
91
+
func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error) {
92
+
s.lk.RLock()
93
+
defer s.lk.RUnlock()
94
+
95
+
j, ok := s.jobs[repo]
96
+
if !ok || j == nil {
97
+
return nil, nil
98
+
}
99
+
return j, nil
100
+
}
101
+
102
+
func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) {
103
+
s.lk.RLock()
104
+
defer s.lk.RUnlock()
105
+
106
+
for _, j := range s.jobs {
107
+
if j.State() == StateEnqueued {
108
+
return j, nil
109
+
}
110
+
}
111
+
return nil, nil
112
+
}
113
+
114
+
func (j *Memjob) Repo() string {
115
+
return j.repo
116
+
}
117
+
118
+
func (j *Memjob) State() string {
119
+
j.lk.Lock()
120
+
defer j.lk.Unlock()
121
+
122
+
return j.state
123
+
}
124
+
125
+
func (j *Memjob) SetState(ctx context.Context, state string) error {
126
+
j.lk.Lock()
127
+
defer j.lk.Unlock()
128
+
129
+
j.state = state
130
+
j.updatedAt = time.Now()
131
+
return nil
132
+
}
133
+
134
+
func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error {
135
+
j.lk.Lock()
136
+
defer j.lk.Unlock()
137
+
138
+
for path, ops := range j.bufferedOps {
139
+
for _, op := range ops {
140
+
if err := fn(op.kind, path, op.rec, op.cid); err != nil {
141
+
return err
142
+
}
143
+
}
144
+
}
145
+
146
+
j.bufferedOps = map[string][]*bufferedOp{}
147
+
j.state = StateComplete
148
+
149
+
return nil
150
+
}
151
+
152
+
func (j *Memjob) ClearBufferedOps(ctx context.Context) error {
153
+
j.lk.Lock()
154
+
defer j.lk.Unlock()
155
+
156
+
j.bufferedOps = map[string][]*bufferedOp{}
157
+
j.updatedAt = time.Now()
158
+
return nil
159
+
}
+31
backfill/metrics.go
+31
backfill/metrics.go
···
1
+
package backfill
2
+
3
+
import (
4
+
"github.com/prometheus/client_golang/prometheus"
5
+
"github.com/prometheus/client_golang/prometheus/promauto"
6
+
)
7
+
8
+
var backfillJobsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
9
+
Name: "backfill_jobs_enqueued_total",
10
+
Help: "The total number of backfill jobs enqueued",
11
+
}, []string{"backfiller_name"})
12
+
13
+
var backfillJobsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
14
+
Name: "backfill_jobs_processed_total",
15
+
Help: "The total number of backfill jobs processed",
16
+
}, []string{"backfiller_name"})
17
+
18
+
var backfillRecordsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
19
+
Name: "backfill_records_processed_total",
20
+
Help: "The total number of backfill records processed",
21
+
}, []string{"backfiller_name"})
22
+
23
+
var backfillOpsBuffered = promauto.NewGaugeVec(prometheus.GaugeOpts{
24
+
Name: "backfill_ops_buffered",
25
+
Help: "The number of backfill operations buffered",
26
+
}, []string{"backfiller_name"})
27
+
28
+
var backfillBytesProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
29
+
Name: "backfill_bytes_processed_total",
30
+
Help: "The total number of backfill bytes processed",
31
+
}, []string{"backfiller_name"})
+33
backfill/util.go
+33
backfill/util.go
···
1
+
package backfill
2
+
3
+
import (
4
+
"io"
5
+
6
+
"github.com/prometheus/client_golang/prometheus"
7
+
)
8
+
9
+
type instrumentedReader struct {
10
+
source io.ReadCloser
11
+
counter prometheus.Counter
12
+
}
13
+
14
+
func (r instrumentedReader) Read(b []byte) (int, error) {
15
+
n, err := r.source.Read(b)
16
+
r.counter.Add(float64(n))
17
+
return n, err
18
+
}
19
+
20
+
func (r instrumentedReader) Close() error {
21
+
var buf [32]byte
22
+
var n int
23
+
var err error
24
+
for err == nil {
25
+
n, err = r.source.Read(buf[:])
26
+
r.counter.Add(float64(n))
27
+
}
28
+
closeerr := r.source.Close()
29
+
if err != nil && err != io.EOF {
30
+
return err
31
+
}
32
+
return closeerr
33
+
}
+280
search/firehose.go
+280
search/firehose.go
···
1
+
package search
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"net/http"
8
+
"strings"
9
+
10
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
11
+
bsky "github.com/bluesky-social/indigo/api/bsky"
12
+
"github.com/bluesky-social/indigo/backfill"
13
+
"github.com/bluesky-social/indigo/events"
14
+
"github.com/bluesky-social/indigo/events/schedulers/autoscaling"
15
+
lexutil "github.com/bluesky-social/indigo/lex/util"
16
+
"github.com/bluesky-social/indigo/repo"
17
+
"github.com/bluesky-social/indigo/repomgr"
18
+
"github.com/gorilla/websocket"
19
+
"github.com/ipfs/go-cid"
20
+
typegen "github.com/whyrusleeping/cbor-gen"
21
+
)
22
+
23
+
func (s *Server) getLastCursor() (int64, error) {
24
+
var lastSeq LastSeq
25
+
if err := s.db.Find(&lastSeq).Error; err != nil {
26
+
return 0, err
27
+
}
28
+
29
+
if lastSeq.ID == 0 {
30
+
return 0, s.db.Create(&lastSeq).Error
31
+
}
32
+
33
+
return lastSeq.Seq, nil
34
+
}
35
+
36
+
func (s *Server) updateLastCursor(curs int64) error {
37
+
return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error
38
+
}
39
+
40
+
func (s *Server) RunIndexer(ctx context.Context) error {
41
+
cur, err := s.getLastCursor()
42
+
if err != nil {
43
+
return fmt.Errorf("get last cursor: %w", err)
44
+
}
45
+
46
+
err = s.bfs.LoadJobs(ctx)
47
+
if err != nil {
48
+
return fmt.Errorf("loading backfill jobs: %w", err)
49
+
}
50
+
s.bf.Start()
51
+
52
+
d := websocket.DefaultDialer
53
+
con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{})
54
+
if err != nil {
55
+
return fmt.Errorf("events dial failed: %w", err)
56
+
}
57
+
58
+
rsc := &events.RepoStreamCallbacks{
59
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
60
+
defer func() {
61
+
if evt.Seq%50 == 0 {
62
+
if err := s.updateLastCursor(evt.Seq); err != nil {
63
+
log.Error("Failed to update cursor: ", err)
64
+
}
65
+
}
66
+
}()
67
+
if evt.TooBig && evt.Prev != nil {
68
+
log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq)
69
+
return nil
70
+
}
71
+
72
+
if evt.TooBig {
73
+
if err := s.processTooBigCommit(ctx, evt); err != nil {
74
+
log.Errorf("failed to process tooBig event: %s", err)
75
+
return nil
76
+
}
77
+
78
+
return nil
79
+
}
80
+
81
+
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
82
+
if err != nil {
83
+
log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err)
84
+
return nil
85
+
}
86
+
87
+
for _, op := range evt.Ops {
88
+
ek := repomgr.EventKind(op.Action)
89
+
switch ek {
90
+
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
91
+
rc, rec, err := r.GetRecord(ctx, op.Path)
92
+
if err != nil {
93
+
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
94
+
log.Error(e)
95
+
return nil
96
+
}
97
+
98
+
if lexutil.LexLink(rc) != *op.Cid {
99
+
log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
100
+
return nil
101
+
}
102
+
103
+
if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil {
104
+
log.Errorf("failed to handle op: %s", err)
105
+
return nil
106
+
}
107
+
108
+
case repomgr.EvtKindDeleteRecord:
109
+
if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil {
110
+
log.Errorf("failed to handle delete: %s", err)
111
+
return nil
112
+
}
113
+
}
114
+
}
115
+
116
+
return nil
117
+
118
+
},
119
+
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
120
+
if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil {
121
+
log.Errorf("failed to update user handle: %s", err)
122
+
}
123
+
return nil
124
+
},
125
+
}
126
+
127
+
return events.HandleRepoStream(
128
+
ctx, con, autoscaling.NewScheduler(
129
+
autoscaling.DefaultAutoscaleSettings(),
130
+
s.bgshost,
131
+
rsc.EventHandler,
132
+
),
133
+
)
134
+
}
135
+
136
+
func (s *Server) handleCreateOrUpdate(ctx context.Context, did string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error {
137
+
// Since this gets called in a backfill job, we need to check if the path is a post or profile
138
+
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
139
+
return nil
140
+
}
141
+
142
+
u, err := s.getOrCreateUser(ctx, did)
143
+
if err != nil {
144
+
return fmt.Errorf("checking user: %w", err)
145
+
}
146
+
rec := *recP
147
+
148
+
switch rec := rec.(type) {
149
+
case *bsky.FeedPost:
150
+
if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil {
151
+
return fmt.Errorf("indexing post: %w", err)
152
+
}
153
+
case *bsky.ActorProfile:
154
+
if err := s.indexProfile(ctx, u, rec); err != nil {
155
+
return fmt.Errorf("indexing profile: %w", err)
156
+
}
157
+
default:
158
+
}
159
+
return nil
160
+
}
161
+
162
+
func (s *Server) handleDelete(ctx context.Context, did string, path string) error {
163
+
// Since this gets called in a backfill job, we need to check if the path is a post or profile
164
+
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
165
+
return nil
166
+
}
167
+
168
+
u, err := s.getOrCreateUser(ctx, did)
169
+
if err != nil {
170
+
return err
171
+
}
172
+
173
+
switch {
174
+
// TODO: handle profile deletes, its an edge case, but worth doing still
175
+
case strings.Contains(path, "app.bsky.feed.post"):
176
+
if err := s.deletePost(ctx, u, path); err != nil {
177
+
return err
178
+
}
179
+
}
180
+
181
+
return nil
182
+
}
183
+
184
+
func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec typegen.CBORMarshaler) error {
185
+
var err error
186
+
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
187
+
return nil
188
+
}
189
+
190
+
if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord {
191
+
log.Infof("handling create(%d): %s - %s", seq, did, path)
192
+
193
+
// Try to buffer the op, if it fails, we need to create a backfill job
194
+
_, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid)
195
+
if err == backfill.ErrJobNotFound {
196
+
log.Infof("no job found for repo %s, creating one", did)
197
+
198
+
if err := s.bfs.EnqueueJob(did); err != nil {
199
+
return fmt.Errorf("enqueueing job: %w", err)
200
+
}
201
+
202
+
// Try to buffer the op again so it gets picked up by the backfill job
203
+
_, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid)
204
+
if err != nil {
205
+
return fmt.Errorf("buffering op: %w", err)
206
+
}
207
+
} else if err == backfill.ErrJobComplete {
208
+
// Backfill is done for this repo so we can just index it now
209
+
err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid)
210
+
}
211
+
} else if op == repomgr.EvtKindDeleteRecord {
212
+
log.Infof("handling delete(%d): %s - %s", seq, did, path)
213
+
214
+
// Try to buffer the op, if it fails, we need to create a backfill job
215
+
_, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid)
216
+
if err == backfill.ErrJobNotFound {
217
+
log.Infof("no job found for repo %s, creating one", did)
218
+
219
+
if err := s.bfs.EnqueueJob(did); err != nil {
220
+
return fmt.Errorf("enqueueing job: %w", err)
221
+
}
222
+
223
+
// Try to buffer the op again so it gets picked up by the backfill job
224
+
_, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid)
225
+
if err != nil {
226
+
return fmt.Errorf("buffering op: %w", err)
227
+
}
228
+
} else if err == backfill.ErrJobComplete {
229
+
// Backfill is done for this repo so we can delete imemdiately
230
+
err = s.handleDelete(ctx, did, path)
231
+
}
232
+
}
233
+
234
+
if err != nil {
235
+
return fmt.Errorf("failed to handle op: %w", err)
236
+
}
237
+
238
+
return nil
239
+
}
240
+
241
+
func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
242
+
repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String())
243
+
if err != nil {
244
+
return err
245
+
}
246
+
247
+
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata))
248
+
if err != nil {
249
+
return err
250
+
}
251
+
252
+
u, err := s.getOrCreateUser(ctx, evt.Repo)
253
+
if err != nil {
254
+
return err
255
+
}
256
+
257
+
return r.ForEach(ctx, "", func(k string, v cid.Cid) error {
258
+
if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") {
259
+
rcid, rec, err := r.GetRecord(ctx, k)
260
+
if err != nil {
261
+
log.Errorf("failed to get record from repo checkout: %s", err)
262
+
return nil
263
+
}
264
+
265
+
switch rec := rec.(type) {
266
+
case *bsky.FeedPost:
267
+
if err := s.indexPost(ctx, u, rec, k, rcid); err != nil {
268
+
return fmt.Errorf("indexing post: %w", err)
269
+
}
270
+
case *bsky.ActorProfile:
271
+
if err := s.indexProfile(ctx, u, rec); err != nil {
272
+
return fmt.Errorf("indexing profile: %w", err)
273
+
}
274
+
default:
275
+
}
276
+
277
+
}
278
+
return nil
279
+
})
280
+
}
+18
-193
search/server.go
+18
-193
search/server.go
···
1
1
package search
2
2
3
3
import (
4
-
"bytes"
5
4
"context"
6
5
"encoding/base32"
7
6
"encoding/json"
8
7
"fmt"
9
-
"net/http"
10
8
"strconv"
11
9
"strings"
12
10
13
11
api "github.com/bluesky-social/indigo/api"
14
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
15
12
bsky "github.com/bluesky-social/indigo/api/bsky"
16
-
"github.com/bluesky-social/indigo/events"
17
-
"github.com/bluesky-social/indigo/events/schedulers/autoscaling"
18
-
lexutil "github.com/bluesky-social/indigo/lex/util"
19
-
"github.com/bluesky-social/indigo/repo"
20
-
"github.com/bluesky-social/indigo/repomgr"
13
+
"github.com/bluesky-social/indigo/backfill"
21
14
"github.com/bluesky-social/indigo/util/version"
22
15
"github.com/bluesky-social/indigo/xrpc"
23
16
24
-
"github.com/gorilla/websocket"
25
17
lru "github.com/hashicorp/golang-lru"
26
-
"github.com/ipfs/go-cid"
27
18
flatfs "github.com/ipfs/go-ds-flatfs"
28
19
blockstore "github.com/ipfs/go-ipfs-blockstore"
29
20
logging "github.com/ipfs/go-log"
···
43
34
bgsxrpc *xrpc.Client
44
35
plc *api.PLCServer
45
36
echo *echo.Echo
37
+
38
+
bfs *backfill.Gormstore
39
+
bf *backfill.Backfiller
46
40
47
41
userCache *lru.Cache
48
42
}
···
72
66
db.AutoMigrate(&PostRef{})
73
67
db.AutoMigrate(&User{})
74
68
db.AutoMigrate(&LastSeq{})
69
+
db.AutoMigrate(&backfill.GormDBJob{})
75
70
76
71
// TODO: robust client
77
72
xc := &xrpc.Client{
···
102
97
plc: plc,
103
98
userCache: ucache,
104
99
}
105
-
return s, nil
106
-
}
107
-
108
-
func (s *Server) getLastCursor() (int64, error) {
109
-
var lastSeq LastSeq
110
-
if err := s.db.Find(&lastSeq).Error; err != nil {
111
-
return 0, err
112
-
}
113
-
114
-
if lastSeq.ID == 0 {
115
-
return 0, s.db.Create(&lastSeq).Error
116
-
}
117
-
118
-
return lastSeq.Seq, nil
119
-
}
120
-
121
-
func (s *Server) updateLastCursor(curs int64) error {
122
-
return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error
123
-
}
124
-
125
-
func (s *Server) RunIndexer(ctx context.Context) error {
126
-
cur, err := s.getLastCursor()
127
-
if err != nil {
128
-
return fmt.Errorf("get last cursor: %w", err)
129
-
}
130
-
131
-
d := websocket.DefaultDialer
132
-
con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{})
133
-
if err != nil {
134
-
return fmt.Errorf("events dial failed: %w", err)
135
-
}
136
-
137
-
rsc := &events.RepoStreamCallbacks{
138
-
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
139
-
if evt.TooBig && evt.Prev != nil {
140
-
log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq)
141
-
return nil
142
-
}
143
100
144
-
if evt.TooBig {
145
-
if err := s.processTooBigCommit(ctx, evt); err != nil {
146
-
log.Errorf("failed to process tooBig event: %s", err)
147
-
return nil
148
-
}
149
-
150
-
return nil
151
-
}
152
-
153
-
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
154
-
if err != nil {
155
-
log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err)
156
-
return nil
157
-
}
158
-
159
-
for _, op := range evt.Ops {
160
-
ek := repomgr.EventKind(op.Action)
161
-
switch ek {
162
-
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
163
-
rc, rec, err := r.GetRecord(ctx, op.Path)
164
-
if err != nil {
165
-
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
166
-
log.Error(e)
167
-
return nil
168
-
}
169
-
170
-
if lexutil.LexLink(rc) != *op.Cid {
171
-
log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
172
-
return nil
173
-
}
174
-
175
-
if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil {
176
-
log.Errorf("failed to handle op: %s", err)
177
-
return nil
178
-
}
179
-
180
-
case repomgr.EvtKindDeleteRecord:
181
-
if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil {
182
-
log.Errorf("failed to handle delete: %s", err)
183
-
return nil
184
-
}
185
-
}
186
-
}
187
-
188
-
return nil
189
-
190
-
},
191
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
192
-
if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil {
193
-
log.Errorf("failed to update user handle: %s", err)
194
-
}
195
-
return nil
196
-
},
197
-
}
198
-
199
-
return events.HandleRepoStream(
200
-
ctx, con, autoscaling.NewScheduler(
201
-
autoscaling.DefaultAutoscaleSettings(),
202
-
s.bgshost,
203
-
rsc.EventHandler,
204
-
),
101
+
bfstore := backfill.NewGormstore(db)
102
+
opts := backfill.DefaultBackfillOptions()
103
+
bf := backfill.NewBackfiller(
104
+
"search",
105
+
bfstore,
106
+
s.handleCreateOrUpdate,
107
+
s.handleCreateOrUpdate,
108
+
s.handleDelete,
109
+
log.Desugar().Sugar(),
110
+
opts,
205
111
)
206
-
}
207
112
208
-
func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error {
209
-
if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord {
113
+
s.bfs = bfstore
114
+
s.bf = bf
210
115
211
-
log.Infof("handling event(%d): %s - %s", seq, did, path)
212
-
u, err := s.getOrCreateUser(ctx, did)
213
-
if err != nil {
214
-
return fmt.Errorf("checking user: %w", err)
215
-
}
216
-
switch rec := rec.(type) {
217
-
case *bsky.FeedPost:
218
-
if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil {
219
-
return fmt.Errorf("indexing post: %w", err)
220
-
}
221
-
case *bsky.ActorProfile:
222
-
if err := s.indexProfile(ctx, u, rec); err != nil {
223
-
return fmt.Errorf("indexing profile: %w", err)
224
-
}
225
-
default:
226
-
}
227
-
228
-
} else if op == repomgr.EvtKindDeleteRecord {
229
-
u, err := s.getOrCreateUser(ctx, did)
230
-
if err != nil {
231
-
return err
232
-
}
233
-
234
-
switch {
235
-
// TODO: handle profile deletes, its an edge case, but worth doing still
236
-
case strings.Contains(path, "app.bsky.feed.post"):
237
-
if err := s.deletePost(ctx, u, path); err != nil {
238
-
return err
239
-
}
240
-
}
241
-
242
-
}
243
-
244
-
if seq%50 == 0 {
245
-
if err := s.updateLastCursor(seq); err != nil {
246
-
log.Error("Failed to update cursor: ", err)
247
-
}
248
-
}
249
-
250
-
return nil
251
-
}
252
-
253
-
func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
254
-
repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "", evt.Commit.String())
255
-
if err != nil {
256
-
return err
257
-
}
258
-
259
-
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata))
260
-
if err != nil {
261
-
return err
262
-
}
263
-
264
-
u, err := s.getOrCreateUser(ctx, evt.Repo)
265
-
if err != nil {
266
-
return err
267
-
}
268
-
269
-
return r.ForEach(ctx, "", func(k string, v cid.Cid) error {
270
-
if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") {
271
-
rcid, rec, err := r.GetRecord(ctx, k)
272
-
if err != nil {
273
-
log.Errorf("failed to get record from repo checkout: %s", err)
274
-
return nil
275
-
}
276
-
277
-
switch rec := rec.(type) {
278
-
case *bsky.FeedPost:
279
-
if err := s.indexPost(ctx, u, rec, k, rcid); err != nil {
280
-
return fmt.Errorf("indexing post: %w", err)
281
-
}
282
-
case *bsky.ActorProfile:
283
-
if err := s.indexProfile(ctx, u, rec); err != nil {
284
-
return fmt.Errorf("indexing profile: %w", err)
285
-
}
286
-
default:
287
-
}
288
-
289
-
}
290
-
return nil
291
-
})
116
+
return s, nil
292
117
}
293
118
294
119
func (s *Server) SearchPosts(ctx context.Context, srch string, offset, size int) ([]PostSearchResult, error) {