+5
-3
cmd/bigsky/main.go
+5
-3
cmd/bigsky/main.go
···
19
19
"github.com/bluesky-social/indigo/carstore"
20
20
"github.com/bluesky-social/indigo/did"
21
21
"github.com/bluesky-social/indigo/events"
22
+
"github.com/bluesky-social/indigo/events/dbpersist"
23
+
"github.com/bluesky-social/indigo/events/diskpersist"
22
24
"github.com/bluesky-social/indigo/indexer"
23
25
"github.com/bluesky-social/indigo/plc"
24
26
"github.com/bluesky-social/indigo/repomgr"
···
426
428
if dpd := cctx.String("disk-persister-dir"); dpd != "" {
427
429
slog.Info("setting up disk persister")
428
430
429
-
pOpts := events.DefaultDiskPersistOptions()
431
+
pOpts := diskpersist.DefaultDiskPersistOptions()
430
432
pOpts.Retention = cctx.Duration("event-playback-ttl")
431
-
dp, err := events.NewDiskPersistence(dpd, "", db, pOpts)
433
+
dp, err := diskpersist.NewDiskPersistence(dpd, "", db, pOpts)
432
434
if err != nil {
433
435
return fmt.Errorf("setting up disk persister: %w", err)
434
436
}
435
437
persister = dp
436
438
} else {
437
-
dbp, err := events.NewDbPersistence(db, cstore, nil)
439
+
dbp, err := dbpersist.NewDbPersistence(db, cstore, nil)
438
440
if err != nil {
439
441
return fmt.Errorf("setting up db event persistence: %w", err)
440
442
}
+2
-2
cmd/rainbow/main.go
+2
-2
cmd/rainbow/main.go
···
9
9
"syscall"
10
10
"time"
11
11
12
-
"github.com/bluesky-social/indigo/events"
12
+
"github.com/bluesky-social/indigo/events/pebblepersist"
13
13
"github.com/bluesky-social/indigo/splitter"
14
14
15
15
"github.com/carlmjohnson/versioninfo"
···
154
154
var err error
155
155
if persistPath != "" {
156
156
log.Info("building splitter with storage at", "path", persistPath)
157
-
ppopts := events.PebblePersistOptions{
157
+
ppopts := pebblepersist.PebblePersistOptions{
158
158
DbPath: persistPath,
159
159
PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")),
160
160
GCPeriod: 5 * time.Minute,
+2
-1
cmd/supercollider/main.go
+2
-1
cmd/supercollider/main.go
···
24
24
"github.com/bluesky-social/indigo/carstore"
25
25
"github.com/bluesky-social/indigo/did"
26
26
"github.com/bluesky-social/indigo/events"
27
+
"github.com/bluesky-social/indigo/events/yolopersist"
27
28
"github.com/bluesky-social/indigo/indexer"
28
29
"github.com/bluesky-social/indigo/models"
29
30
"github.com/bluesky-social/indigo/plc"
···
205
206
logger.Info(fmt.Sprintf("Generating %d total events and writing them to %s",
206
207
cctx.Int("total-events"), cctx.String("output-file")))
207
208
208
-
em := events.NewEventManager(events.NewYoloPersister())
209
+
em := events.NewEventManager(yolopersist.NewYoloPersister())
209
210
210
211
// Try to read the key from disk
211
212
keyBytes, err := os.ReadFile(cctx.String("key-file"))
+28
-24
events/dbpersist.go
events/dbpersist/dbpersist.go
+28
-24
events/dbpersist.go
events/dbpersist/dbpersist.go
···
1
-
package events
1
+
package dbpersist
2
2
3
3
import (
4
4
"bytes"
5
5
"context"
6
6
"encoding/json"
7
7
"fmt"
8
+
"log/slog"
8
9
"sync"
9
10
"time"
10
11
11
12
comatproto "github.com/bluesky-social/indigo/api/atproto"
12
13
"github.com/bluesky-social/indigo/carstore"
14
+
"github.com/bluesky-social/indigo/events"
13
15
lexutil "github.com/bluesky-social/indigo/lex/util"
14
16
"github.com/bluesky-social/indigo/models"
15
17
"github.com/bluesky-social/indigo/util"
···
19
21
"gorm.io/gorm"
20
22
)
21
23
24
+
var log = slog.Default().With("system", "dbpersist")
25
+
22
26
type PersistenceBatchItem struct {
23
27
Record *RepoEventRecord
24
-
Event *XRPCStreamEvent
28
+
Event *events.XRPCStreamEvent
25
29
}
26
30
27
31
type Options struct {
···
55
59
56
60
lk sync.Mutex
57
61
58
-
broadcast func(*XRPCStreamEvent)
62
+
broadcast func(*events.XRPCStreamEvent)
59
63
60
64
batch []*PersistenceBatchItem
61
65
batchOptions Options
···
137
141
}
138
142
}
139
143
140
-
func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
144
+
func (p *DbPersistence) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
141
145
p.broadcast = brc
142
146
}
143
147
···
187
191
return nil
188
192
}
189
193
190
-
func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error {
194
+
func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *events.XRPCStreamEvent) error {
191
195
p.lk.Lock()
192
196
defer p.lk.Unlock()
193
197
p.batch = append(p.batch, &PersistenceBatchItem{
···
204
208
return nil
205
209
}
206
210
207
-
func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error {
211
+
func (p *DbPersistence) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
208
212
var rer *RepoEventRecord
209
213
var err error
210
214
···
367
371
return &rer, nil
368
372
}
369
373
370
-
func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
374
+
func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
371
375
pageSize := 1000
372
376
373
377
for {
···
416
420
return nil
417
421
}
418
422
419
-
func (p *DbPersistence) hydrateBatch(ctx context.Context, batch []*RepoEventRecord, cb func(*XRPCStreamEvent) error) error {
420
-
events := make([]*XRPCStreamEvent, len(batch))
423
+
func (p *DbPersistence) hydrateBatch(ctx context.Context, batch []*RepoEventRecord, cb func(*events.XRPCStreamEvent) error) error {
424
+
evts := make([]*events.XRPCStreamEvent, len(batch))
421
425
422
426
type Result struct {
423
-
Event *XRPCStreamEvent
427
+
Event *events.XRPCStreamEvent
424
428
Index int
425
429
Err error
426
430
}
···
439
443
// release the semaphore at the end of the goroutine
440
444
defer func() { <-sem }()
441
445
442
-
var streamEvent *XRPCStreamEvent
446
+
var streamEvent *events.XRPCStreamEvent
443
447
var err error
444
448
445
449
switch {
···
473
477
return result.Err
474
478
}
475
479
476
-
events[result.Index] = result.Event
480
+
evts[result.Index] = result.Event
477
481
478
-
for ; cur < len(events) && events[cur] != nil; cur++ {
479
-
if err := cb(events[cur]); err != nil {
482
+
for ; cur < len(evts) && evts[cur] != nil; cur++ {
483
+
if err := cb(evts[cur]); err != nil {
480
484
return err
481
485
}
482
486
}
···
515
519
return u.Did, nil
516
520
}
517
521
518
-
func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) {
522
+
func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
519
523
if rer.NewHandle == nil {
520
524
return nil, fmt.Errorf("NewHandle is nil")
521
525
}
···
525
529
return nil, err
526
530
}
527
531
528
-
return &XRPCStreamEvent{
532
+
return &events.XRPCStreamEvent{
529
533
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
530
534
Did: did,
531
535
Handle: *rer.NewHandle,
···
534
538
}, nil
535
539
}
536
540
537
-
func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) {
541
+
func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
538
542
did, err := p.didForUid(ctx, rer.Repo)
539
543
if err != nil {
540
544
return nil, err
541
545
}
542
546
543
-
return &XRPCStreamEvent{
547
+
return &events.XRPCStreamEvent{
544
548
RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
545
549
Did: did,
546
550
Time: rer.Time.Format(util.ISO8601),
···
548
552
}, nil
549
553
}
550
554
551
-
func (p *DbPersistence) hydrateAccountEvent(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) {
555
+
func (p *DbPersistence) hydrateAccountEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
552
556
did, err := p.didForUid(ctx, rer.Repo)
553
557
if err != nil {
554
558
return nil, err
555
559
}
556
560
557
-
return &XRPCStreamEvent{
561
+
return &events.XRPCStreamEvent{
558
562
RepoAccount: &comatproto.SyncSubscribeRepos_Account{
559
563
Did: did,
560
564
Time: rer.Time.Format(util.ISO8601),
···
564
568
}, nil
565
569
}
566
570
567
-
func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) {
571
+
func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
568
572
did, err := p.didForUid(ctx, rer.Repo)
569
573
if err != nil {
570
574
return nil, err
571
575
}
572
576
573
-
return &XRPCStreamEvent{
577
+
return &events.XRPCStreamEvent{
574
578
RepoTombstone: &comatproto.SyncSubscribeRepos_Tombstone{
575
579
Did: did,
576
580
Time: rer.Time.Format(util.ISO8601),
···
578
582
}, nil
579
583
}
580
584
581
-
func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) {
585
+
func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
582
586
if rer.Commit == nil {
583
587
return nil, fmt.Errorf("commit is nil")
584
588
}
···
632
636
out.Blocks = cs
633
637
}
634
638
635
-
return &XRPCStreamEvent{RepoCommit: out}, nil
639
+
return &events.XRPCStreamEvent{RepoCommit: out}, nil
636
640
}
637
641
638
642
func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
+10
-9
events/dbpersist_test.go
events/dbpersist/dbpersist_test.go
+10
-9
events/dbpersist_test.go
events/dbpersist/dbpersist_test.go
···
1
-
package events
1
+
package dbpersist
2
2
3
3
import (
4
4
"context"
···
11
11
atproto "github.com/bluesky-social/indigo/api/atproto"
12
12
"github.com/bluesky-social/indigo/api/bsky"
13
13
"github.com/bluesky-social/indigo/carstore"
14
+
"github.com/bluesky-social/indigo/events"
14
15
lexutil "github.com/bluesky-social/indigo/lex/util"
15
16
"github.com/bluesky-social/indigo/models"
16
17
pds "github.com/bluesky-social/indigo/pds/data"
···
61
62
}
62
63
63
64
// Create a bunch of events
64
-
evtman := NewEventManager(dbp)
65
+
evtman := events.NewEventManager(dbp)
65
66
66
67
userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
67
68
if err != nil {
68
69
b.Fatal(err)
69
70
}
70
71
71
-
inEvts := make([]*XRPCStreamEvent, b.N)
72
+
inEvts := make([]*events.XRPCStreamEvent, b.N)
72
73
for i := 0; i < b.N; i++ {
73
74
cidLink := lexutil.LexLink(cid)
74
75
headLink := lexutil.LexLink(userRepoHead)
75
-
inEvts[i] = &XRPCStreamEvent{
76
+
inEvts[i] = &events.XRPCStreamEvent{
76
77
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
77
78
Repo: "did:example:123",
78
79
Commit: headLink,
···
130
131
131
132
b.StopTimer()
132
133
133
-
dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
134
+
dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
134
135
outEvtCount++
135
136
return nil
136
137
})
···
183
184
}
184
185
185
186
// Create a bunch of events
186
-
evtman := NewEventManager(dbp)
187
+
evtman := events.NewEventManager(dbp)
187
188
188
189
userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
189
190
if err != nil {
190
191
b.Fatal(err)
191
192
}
192
193
193
-
inEvts := make([]*XRPCStreamEvent, n)
194
+
inEvts := make([]*events.XRPCStreamEvent, n)
194
195
for i := 0; i < n; i++ {
195
196
cidLink := lexutil.LexLink(cid)
196
197
headLink := lexutil.LexLink(userRepoHead)
197
-
inEvts[i] = &XRPCStreamEvent{
198
+
inEvts[i] = &events.XRPCStreamEvent{
198
199
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
199
200
Repo: "did:example:123",
200
201
Commit: headLink,
···
250
251
251
252
b.ResetTimer()
252
253
253
-
dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
254
+
dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
254
255
outEvtCount++
255
256
return nil
256
257
})
+18
-14
events/diskpersist.go
events/diskpersist/diskpersist.go
+18
-14
events/diskpersist.go
events/diskpersist/diskpersist.go
···
1
-
package events
1
+
package diskpersist
2
2
3
3
import (
4
4
"bufio"
···
8
8
"errors"
9
9
"fmt"
10
10
"io"
11
+
"log/slog"
11
12
"os"
12
13
"path/filepath"
13
14
"sync"
14
15
"time"
15
16
16
17
"github.com/bluesky-social/indigo/api/atproto"
18
+
"github.com/bluesky-social/indigo/events"
17
19
"github.com/bluesky-social/indigo/models"
18
20
arc "github.com/hashicorp/golang-lru/arc/v2"
19
21
"github.com/prometheus/client_golang/prometheus"
···
21
23
cbg "github.com/whyrusleeping/cbor-gen"
22
24
"gorm.io/gorm"
23
25
)
26
+
27
+
var log = slog.Default().With("system", "diskpersist")
24
28
25
29
type DiskPersistence struct {
26
30
primaryDir string
···
31
35
32
36
meta *gorm.DB
33
37
34
-
broadcast func(*XRPCStreamEvent)
38
+
broadcast func(*events.XRPCStreamEvent)
35
39
36
40
logfi *os.File
37
41
···
54
58
55
59
type persistJob struct {
56
60
Bytes []byte
57
-
Evt *XRPCStreamEvent
61
+
Evt *events.XRPCStreamEvent
58
62
Buffer *bytes.Buffer // so we can put it back in the pool when we're done
59
63
}
60
64
···
68
72
EvtFlagRebased
69
73
)
70
74
71
-
var _ (EventPersistence) = (*DiskPersistence)(nil)
75
+
var _ (events.EventPersistence) = (*DiskPersistence)(nil)
72
76
73
77
type DiskPersistOptions struct {
74
78
UIDCacheSize int
···
487
491
return nil
488
492
}
489
493
490
-
func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error {
494
+
func (dp *DiskPersistence) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
491
495
buffer := dp.buffers.Get().(*bytes.Buffer)
492
496
cw := dp.writers.Get().(*cbg.CborWriter)
493
497
cw.SetWriter(buffer)
···
631
635
return u.Uid, nil
632
636
}
633
637
634
-
func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
638
+
func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
635
639
base := since - (since % dp.eventsPerFile)
636
640
var logs []LogFileRef
637
641
if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", base).Error; err != nil {
···
658
662
return nil
659
663
}
660
664
661
-
func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) {
665
+
func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) {
662
666
for i, lf := range logFiles {
663
667
lastSeq, err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb)
664
668
if err != nil {
···
684
688
return false
685
689
}
686
690
687
-
func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) (*int64, error) {
691
+
func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*events.XRPCStreamEvent) error) (*int64, error) {
688
692
fi, err := os.OpenFile(fn, os.O_RDONLY, 0)
689
693
if err != nil {
690
694
return nil, err
···
738
742
return nil, err
739
743
}
740
744
evt.Seq = h.Seq
741
-
if err := cb(&XRPCStreamEvent{RepoCommit: &evt}); err != nil {
745
+
if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil {
742
746
return nil, err
743
747
}
744
748
case evtKindHandle:
···
747
751
return nil, err
748
752
}
749
753
evt.Seq = h.Seq
750
-
if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil {
754
+
if err := cb(&events.XRPCStreamEvent{RepoHandle: &evt}); err != nil {
751
755
return nil, err
752
756
}
753
757
case evtKindIdentity:
···
756
760
return nil, err
757
761
}
758
762
evt.Seq = h.Seq
759
-
if err := cb(&XRPCStreamEvent{RepoIdentity: &evt}); err != nil {
763
+
if err := cb(&events.XRPCStreamEvent{RepoIdentity: &evt}); err != nil {
760
764
return nil, err
761
765
}
762
766
case evtKindAccount:
···
765
769
return nil, err
766
770
}
767
771
evt.Seq = h.Seq
768
-
if err := cb(&XRPCStreamEvent{RepoAccount: &evt}); err != nil {
772
+
if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil {
769
773
return nil, err
770
774
}
771
775
case evtKindTombstone:
···
774
778
return nil, err
775
779
}
776
780
evt.Seq = h.Seq
777
-
if err := cb(&XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
781
+
if err := cb(&events.XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
778
782
return nil, err
779
783
}
780
784
default:
···
931
935
return nil
932
936
}
933
937
934
-
func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent)) {
938
+
func (dp *DiskPersistence) SetEventBroadcaster(f func(*events.XRPCStreamEvent)) {
935
939
dp.broadcast = f
936
940
}
+67
-24
events/diskpersist_test.go
events/diskpersist/diskpersist_test.go
+67
-24
events/diskpersist_test.go
events/diskpersist/diskpersist_test.go
···
1
-
package events
1
+
package diskpersist
2
2
3
3
import (
4
4
"context"
···
14
14
atproto "github.com/bluesky-social/indigo/api/atproto"
15
15
"github.com/bluesky-social/indigo/api/bsky"
16
16
"github.com/bluesky-social/indigo/carstore"
17
+
"github.com/bluesky-social/indigo/events"
17
18
lexutil "github.com/bluesky-social/indigo/lex/util"
18
19
"github.com/bluesky-social/indigo/models"
19
20
pds "github.com/bluesky-social/indigo/pds/data"
20
21
"github.com/bluesky-social/indigo/repomgr"
21
22
"github.com/bluesky-social/indigo/util"
23
+
"gorm.io/driver/sqlite"
22
24
"gorm.io/gorm"
23
25
)
24
26
25
-
func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (EventPersistence, error)) {
27
+
func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (events.EventPersistence, error)) {
26
28
ctx := context.Background()
27
29
28
30
db, _, cs, tempPath, err := setupDBs(t)
···
63
65
}
64
66
65
67
// Create a bunch of events
66
-
evtman := NewEventManager(dp)
68
+
evtman := events.NewEventManager(dp)
67
69
68
70
userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
69
71
if err != nil {
···
71
73
}
72
74
73
75
n := 100
74
-
inEvts := make([]*XRPCStreamEvent, n)
76
+
inEvts := make([]*events.XRPCStreamEvent, n)
75
77
for i := 0; i < n; i++ {
76
78
cidLink := lexutil.LexLink(cid)
77
79
headLink := lexutil.LexLink(userRepoHead)
78
-
inEvts[i] = &XRPCStreamEvent{
80
+
inEvts[i] = &events.XRPCStreamEvent{
79
81
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
80
82
Repo: "did:example:123",
81
83
Commit: headLink,
···
107
109
outEvtCount := 0
108
110
expectedEvtCount := n
109
111
110
-
dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
112
+
dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
111
113
outEvtCount++
112
114
return nil
113
115
})
···
129
131
t.Fatal(err)
130
132
}
131
133
132
-
evtman2 := NewEventManager(dp2)
134
+
evtman2 := events.NewEventManager(dp2)
133
135
134
-
inEvts = make([]*XRPCStreamEvent, n)
136
+
inEvts = make([]*events.XRPCStreamEvent, n)
135
137
for i := 0; i < n; i++ {
136
138
cidLink := lexutil.LexLink(cid)
137
139
headLink := lexutil.LexLink(userRepoHead)
138
-
inEvts[i] = &XRPCStreamEvent{
140
+
inEvts[i] = &events.XRPCStreamEvent{
139
141
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
140
142
Repo: "did:example:123",
141
143
Commit: headLink,
···
159
161
}
160
162
}
161
163
func TestDiskPersist(t *testing.T) {
162
-
factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) {
164
+
factory := func(tempPath string, db *gorm.DB) (events.EventPersistence, error) {
163
165
return NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{
164
166
EventsPerFile: 10,
165
167
UIDCacheSize: 100000,
···
192
194
193
195
}
194
196
195
-
func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p EventPersistence) {
197
+
func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
196
198
ctx := context.Background()
197
199
198
200
db.AutoMigrate(&pds.User{})
···
220
222
}
221
223
222
224
// Create a bunch of events
223
-
evtman := NewEventManager(p)
225
+
evtman := events.NewEventManager(p)
224
226
225
227
userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
226
228
if err != nil {
227
229
b.Fatal(err)
228
230
}
229
231
230
-
inEvts := make([]*XRPCStreamEvent, b.N)
232
+
inEvts := make([]*events.XRPCStreamEvent, b.N)
231
233
for i := 0; i < b.N; i++ {
232
234
cidLink := lexutil.LexLink(cid)
233
235
headLink := lexutil.LexLink(userRepoHead)
234
-
inEvts[i] = &XRPCStreamEvent{
236
+
inEvts[i] = &events.XRPCStreamEvent{
235
237
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
236
238
Repo: "did:example:123",
237
239
Commit: headLink,
···
307
309
runEventManagerTest(t, cs, db, dp)
308
310
}
309
311
310
-
func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) {
312
+
func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
311
313
ctx := context.Background()
312
314
313
315
db.AutoMigrate(&pds.User{})
···
334
336
t.Fatal(err)
335
337
}
336
338
337
-
evtman := NewEventManager(p)
339
+
evtman := events.NewEventManager(p)
338
340
339
341
userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
340
342
if err != nil {
···
342
344
}
343
345
344
346
testSize := 100 // you can adjust this number as needed
345
-
inEvts := make([]*XRPCStreamEvent, testSize)
347
+
inEvts := make([]*events.XRPCStreamEvent, testSize)
346
348
for i := 0; i < testSize; i++ {
347
349
cidLink := lexutil.LexLink(cid)
348
350
headLink := lexutil.LexLink(userRepoHead)
349
-
inEvts[i] = &XRPCStreamEvent{
351
+
inEvts[i] = &events.XRPCStreamEvent{
350
352
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
351
353
Repo: "did:example:123",
352
354
Commit: headLink,
···
373
375
}
374
376
375
377
outEvtCount := 0
376
-
p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
378
+
p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
377
379
// Check that the contents of the output events match the input events
378
380
// Clear cache, don't care if one has it and not the other
379
381
inEvts[outEvtCount].Preserialized = nil
···
414
416
runTakedownTest(t, cs, db, dp)
415
417
}
416
418
417
-
func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) {
419
+
func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) {
418
420
ctx := context.TODO()
419
421
420
422
db.AutoMigrate(&pds.User{})
···
444
446
}
445
447
}
446
448
447
-
evtman := NewEventManager(p)
449
+
evtman := events.NewEventManager(p)
448
450
449
451
testSize := 100 // you can adjust this number as needed
450
-
inEvts := make([]*XRPCStreamEvent, testSize*userCount)
452
+
inEvts := make([]*events.XRPCStreamEvent, testSize*userCount)
451
453
for i := 0; i < testSize*userCount; i++ {
452
454
user := users[i%userCount]
453
455
_, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{
···
465
467
466
468
cidLink := lexutil.LexLink(cid)
467
469
headLink := lexutil.LexLink(userRepoHead)
468
-
inEvts[i] = &XRPCStreamEvent{
470
+
inEvts[i] = &events.XRPCStreamEvent{
469
471
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
470
472
Repo: user.Did,
471
473
Commit: headLink,
···
500
502
501
503
// Verify that the events of the user have been removed from the event stream
502
504
var evtsCount int
503
-
if err := p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error {
505
+
if err := p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
504
506
evtsCount++
505
507
if evt.RepoCommit.Repo == takeDownUser.Did {
506
508
t.Fatalf("found event for user %d after takedown", takeDownUser.Uid)
···
515
517
t.Fatalf("wrong number of events out: %d != %d", evtsCount, exp)
516
518
}
517
519
}
520
+
521
+
func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, error) {
522
+
dir, err := os.MkdirTemp("", "integtest")
523
+
if err != nil {
524
+
return nil, nil, nil, "", err
525
+
}
526
+
527
+
maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite?cache=shared&mode=rwc")))
528
+
if err != nil {
529
+
return nil, nil, nil, "", err
530
+
}
531
+
532
+
tx := maindb.Exec("PRAGMA journal_mode=WAL;")
533
+
if tx.Error != nil {
534
+
return nil, nil, nil, "", tx.Error
535
+
}
536
+
537
+
tx.Commit()
538
+
539
+
cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite?cache=shared&mode=rwc")))
540
+
if err != nil {
541
+
return nil, nil, nil, "", err
542
+
}
543
+
544
+
tx = cardb.Exec("PRAGMA journal_mode=WAL;")
545
+
if tx.Error != nil {
546
+
return nil, nil, nil, "", tx.Error
547
+
}
548
+
549
+
cspath := filepath.Join(dir, "carstore")
550
+
if err := os.Mkdir(cspath, 0775); err != nil {
551
+
return nil, nil, nil, "", err
552
+
}
553
+
554
+
cs, err := carstore.NewCarStore(cardb, []string{cspath})
555
+
if err != nil {
556
+
return nil, nil, nil, "", err
557
+
}
558
+
559
+
return maindb, cardb, cs, dir, nil
560
+
}
+12
-8
events/pebblepersist.go
events/pebblepersist/pebblepersist.go
+12
-8
events/pebblepersist.go
events/pebblepersist/pebblepersist.go
···
1
-
package events
1
+
package pebblepersist
2
2
3
3
import (
4
4
"bytes"
···
7
7
"encoding/hex"
8
8
"errors"
9
9
"fmt"
10
+
"log/slog"
10
11
"time"
11
12
13
+
"github.com/bluesky-social/indigo/events"
12
14
"github.com/bluesky-social/indigo/models"
13
15
"github.com/cockroachdb/pebble"
14
16
)
15
17
18
+
var log = slog.Default().With("system", "pebblepersist")
19
+
16
20
type PebblePersist struct {
17
-
broadcast func(*XRPCStreamEvent)
21
+
broadcast func(*events.XRPCStreamEvent)
18
22
db *pebble.DB
19
23
20
24
prevSeq int64
···
66
70
binary.BigEndian.PutUint64(key[8:16], uint64(millis))
67
71
}
68
72
69
-
func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error {
73
+
func (pp *PebblePersist) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
70
74
err := e.Preserialize()
71
75
if err != nil {
72
76
return err
···
101
105
return err
102
106
}
103
107
104
-
func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) {
108
+
func eventFromPebbleIter(iter *pebble.Iterator) (*events.XRPCStreamEvent, error) {
105
109
blob, err := iter.ValueAndErr()
106
110
if err != nil {
107
111
return nil, err
108
112
}
109
113
br := bytes.NewReader(blob)
110
-
evt := new(XRPCStreamEvent)
114
+
evt := new(events.XRPCStreamEvent)
111
115
err = evt.Deserialize(br)
112
116
if err != nil {
113
117
return nil, err
···
116
120
return evt, nil
117
121
}
118
122
119
-
func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
123
+
func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
120
124
var key [8]byte
121
125
binary.BigEndian.PutUint64(key[:], uint64(since))
122
126
···
156
160
return err
157
161
}
158
162
159
-
func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent)) {
163
+
func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*events.XRPCStreamEvent)) {
160
164
pp.broadcast = broadcast
161
165
}
162
166
163
167
var ErrNoLast = errors.New("no last event")
164
168
165
-
func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error) {
169
+
func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *events.XRPCStreamEvent, err error) {
166
170
iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{})
167
171
if err != nil {
168
172
return 0, 0, nil, err
+210
events/pebblepersist/pebblepersist_test.go
+210
events/pebblepersist/pebblepersist_test.go
···
1
+
package pebblepersist
2
+
3
+
import (
4
+
"context"
5
+
"os"
6
+
"path/filepath"
7
+
"testing"
8
+
"time"
9
+
10
+
atproto "github.com/bluesky-social/indigo/api/atproto"
11
+
"github.com/bluesky-social/indigo/api/bsky"
12
+
"github.com/bluesky-social/indigo/carstore"
13
+
"github.com/bluesky-social/indigo/events"
14
+
"github.com/bluesky-social/indigo/events/diskpersist"
15
+
lexutil "github.com/bluesky-social/indigo/lex/util"
16
+
"github.com/bluesky-social/indigo/models"
17
+
pds "github.com/bluesky-social/indigo/pds/data"
18
+
"github.com/bluesky-social/indigo/repomgr"
19
+
"github.com/bluesky-social/indigo/util"
20
+
21
+
"gorm.io/driver/sqlite"
22
+
"gorm.io/gorm"
23
+
)
24
+
25
+
func TestPebblePersist(t *testing.T) {
26
+
factory := func(tempPath string, db *gorm.DB) (events.EventPersistence, error) {
27
+
opts := DefaultPebblePersistOptions
28
+
opts.DbPath = filepath.Join(tempPath, "pebble.db")
29
+
return NewPebblePersistance(&opts)
30
+
}
31
+
testPersister(t, factory)
32
+
}
33
+
34
+
func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (events.EventPersistence, error)) {
35
+
ctx := context.Background()
36
+
37
+
db, _, cs, tempPath, err := setupDBs(t)
38
+
if err != nil {
39
+
t.Fatal(err)
40
+
}
41
+
42
+
db.AutoMigrate(&pds.User{})
43
+
db.AutoMigrate(&pds.Peering{})
44
+
db.AutoMigrate(&models.ActorInfo{})
45
+
46
+
db.Create(&models.ActorInfo{
47
+
Uid: 1,
48
+
Did: "did:example:123",
49
+
})
50
+
51
+
mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{})
52
+
53
+
err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "")
54
+
if err != nil {
55
+
t.Fatal(err)
56
+
}
57
+
58
+
_, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{
59
+
Text: "hello world",
60
+
CreatedAt: time.Now().Format(util.ISO8601),
61
+
})
62
+
if err != nil {
63
+
t.Fatal(err)
64
+
}
65
+
66
+
defer os.RemoveAll(tempPath)
67
+
68
+
// Initialize a persister
69
+
dp, err := perisistenceFactory(tempPath, db)
70
+
if err != nil {
71
+
t.Fatal(err)
72
+
}
73
+
74
+
// Create a bunch of events
75
+
evtman := events.NewEventManager(dp)
76
+
77
+
userRepoHead, err := mgr.GetRepoRoot(ctx, 1)
78
+
if err != nil {
79
+
t.Fatal(err)
80
+
}
81
+
82
+
n := 100
83
+
inEvts := make([]*events.XRPCStreamEvent, n)
84
+
for i := 0; i < n; i++ {
85
+
cidLink := lexutil.LexLink(cid)
86
+
headLink := lexutil.LexLink(userRepoHead)
87
+
inEvts[i] = &events.XRPCStreamEvent{
88
+
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
89
+
Repo: "did:example:123",
90
+
Commit: headLink,
91
+
Ops: []*atproto.SyncSubscribeRepos_RepoOp{
92
+
{
93
+
Action: "add",
94
+
Cid: &cidLink,
95
+
Path: "path1",
96
+
},
97
+
},
98
+
Time: time.Now().Format(util.ISO8601),
99
+
Seq: int64(i),
100
+
},
101
+
}
102
+
}
103
+
104
+
// Add events in parallel
105
+
for i := 0; i < n; i++ {
106
+
err = evtman.AddEvent(ctx, inEvts[i])
107
+
if err != nil {
108
+
t.Fatal(err)
109
+
}
110
+
}
111
+
112
+
if err := dp.Flush(ctx); err != nil {
113
+
t.Fatal(err)
114
+
}
115
+
116
+
outEvtCount := 0
117
+
expectedEvtCount := n
118
+
119
+
dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
120
+
outEvtCount++
121
+
return nil
122
+
})
123
+
124
+
if outEvtCount != expectedEvtCount {
125
+
t.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount)
126
+
}
127
+
128
+
dp.Shutdown(ctx)
129
+
130
+
time.Sleep(time.Millisecond * 100)
131
+
132
+
dp2, err := diskpersist.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &diskpersist.DiskPersistOptions{
133
+
EventsPerFile: 10,
134
+
UIDCacheSize: 100000,
135
+
DIDCacheSize: 100000,
136
+
})
137
+
if err != nil {
138
+
t.Fatal(err)
139
+
}
140
+
141
+
evtman2 := events.NewEventManager(dp2)
142
+
143
+
inEvts = make([]*events.XRPCStreamEvent, n)
144
+
for i := 0; i < n; i++ {
145
+
cidLink := lexutil.LexLink(cid)
146
+
headLink := lexutil.LexLink(userRepoHead)
147
+
inEvts[i] = &events.XRPCStreamEvent{
148
+
RepoCommit: &atproto.SyncSubscribeRepos_Commit{
149
+
Repo: "did:example:123",
150
+
Commit: headLink,
151
+
Ops: []*atproto.SyncSubscribeRepos_RepoOp{
152
+
{
153
+
Action: "add",
154
+
Cid: &cidLink,
155
+
Path: "path1",
156
+
},
157
+
},
158
+
Time: time.Now().Format(util.ISO8601),
159
+
},
160
+
}
161
+
}
162
+
163
+
for i := 0; i < n; i++ {
164
+
err = evtman2.AddEvent(ctx, inEvts[i])
165
+
if err != nil {
166
+
t.Fatal(err)
167
+
}
168
+
}
169
+
}
170
+
171
+
func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, error) {
172
+
dir, err := os.MkdirTemp("", "integtest")
173
+
if err != nil {
174
+
return nil, nil, nil, "", err
175
+
}
176
+
177
+
maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite?cache=shared&mode=rwc")))
178
+
if err != nil {
179
+
return nil, nil, nil, "", err
180
+
}
181
+
182
+
tx := maindb.Exec("PRAGMA journal_mode=WAL;")
183
+
if tx.Error != nil {
184
+
return nil, nil, nil, "", tx.Error
185
+
}
186
+
187
+
tx.Commit()
188
+
189
+
cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite?cache=shared&mode=rwc")))
190
+
if err != nil {
191
+
return nil, nil, nil, "", err
192
+
}
193
+
194
+
tx = cardb.Exec("PRAGMA journal_mode=WAL;")
195
+
if tx.Error != nil {
196
+
return nil, nil, nil, "", tx.Error
197
+
}
198
+
199
+
cspath := filepath.Join(dir, "carstore")
200
+
if err := os.Mkdir(cspath, 0775); err != nil {
201
+
return nil, nil, nil, "", err
202
+
}
203
+
204
+
cs, err := carstore.NewCarStore(cardb, []string{cspath})
205
+
if err != nil {
206
+
return nil, nil, nil, "", err
207
+
}
208
+
209
+
return maindb, cardb, cs, dir, nil
210
+
}
-16
events/pebblepersist_test.go
-16
events/pebblepersist_test.go
···
1
-
package events
2
-
3
-
import (
4
-
"gorm.io/gorm"
5
-
"path/filepath"
6
-
"testing"
7
-
)
8
-
9
-
func TestPebblePersist(t *testing.T) {
10
-
factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) {
11
-
opts := DefaultPebblePersistOptions
12
-
opts.DbPath = filepath.Join(tempPath, "pebble.db")
13
-
return NewPebblePersistance(&opts)
14
-
}
15
-
testPersister(t, factory)
16
-
}
+6
-5
events/yolopersist.go
events/yolopersist/yolopersist.go
+6
-5
events/yolopersist.go
events/yolopersist/yolopersist.go
···
1
-
package events
1
+
package yolopersist
2
2
3
3
import (
4
4
"context"
5
5
"fmt"
6
6
"sync"
7
7
8
+
"github.com/bluesky-social/indigo/events"
8
9
"github.com/bluesky-social/indigo/models"
9
10
)
10
11
···
13
14
lk sync.Mutex
14
15
seq int64
15
16
16
-
broadcast func(*XRPCStreamEvent)
17
+
broadcast func(*events.XRPCStreamEvent)
17
18
}
18
19
19
20
func NewYoloPersister() *YoloPersister {
20
21
return &YoloPersister{}
21
22
}
22
23
23
-
func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error {
24
+
func (yp *YoloPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error {
24
25
yp.lk.Lock()
25
26
defer yp.lk.Unlock()
26
27
yp.seq++
···
48
49
return nil
49
50
}
50
51
51
-
func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error {
52
+
func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
52
53
return fmt.Errorf("playback not supported by yolo persister, test usage only")
53
54
}
54
55
···
56
57
return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only")
57
58
}
58
59
59
-
func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
60
+
func (yp *YoloPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
60
61
yp.broadcast = brc
61
62
}
62
63
+8
-7
splitter/splitter.go
+8
-7
splitter/splitter.go
···
22
22
"github.com/bluesky-social/indigo/api/atproto"
23
23
comatproto "github.com/bluesky-social/indigo/api/atproto"
24
24
"github.com/bluesky-social/indigo/bgs"
25
-
events "github.com/bluesky-social/indigo/events"
25
+
"github.com/bluesky-social/indigo/events"
26
+
"github.com/bluesky-social/indigo/events/pebblepersist"
26
27
"github.com/bluesky-social/indigo/events/schedulers/sequential"
27
28
"github.com/bluesky-social/indigo/util"
28
29
"github.com/bluesky-social/indigo/xrpc"
···
36
37
37
38
type Splitter struct {
38
39
erb *EventRingBuffer
39
-
pp *events.PebblePersist
40
+
pp *pebblepersist.PebblePersist
40
41
events *events.EventManager
41
42
42
43
// Management of Socket Consumers
···
55
56
type SplitterConfig struct {
56
57
UpstreamHost string
57
58
CursorFile string
58
-
PebbleOptions *events.PebblePersistOptions
59
+
PebbleOptions *pebblepersist.PebblePersistOptions
59
60
}
60
61
61
62
func (sc *SplitterConfig) XrpcRootUrl() string {
···
103
104
s.erb = erb
104
105
s.events = events.NewEventManager(erb)
105
106
} else {
106
-
pp, err := events.NewPebblePersistance(conf.PebbleOptions)
107
+
pp, err := pebblepersist.NewPebblePersistance(conf.PebbleOptions)
107
108
if err != nil {
108
109
return nil, err
109
110
}
···
115
116
return s, nil
116
117
}
117
118
func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) {
118
-
ppopts := events.PebblePersistOptions{
119
+
ppopts := pebblepersist.PebblePersistOptions{
119
120
DbPath: path,
120
121
PersistDuration: time.Duration(float64(time.Hour) * persistHours),
121
122
GCPeriod: 5 * time.Minute,
···
126
127
CursorFile: "cursor-file",
127
128
PebbleOptions: &ppopts,
128
129
}
129
-
pp, err := events.NewPebblePersistance(&ppopts)
130
+
pp, err := pebblepersist.NewPebblePersistance(&ppopts)
130
131
if err != nil {
131
132
return nil, err
132
133
}
···
644
645
if err == nil {
645
646
s.log.Debug("got last cursor from pebble", "seq", seq, "millis", millis)
646
647
return seq, nil
647
-
} else if errors.Is(err, events.ErrNoLast) {
648
+
} else if errors.Is(err, pebblepersist.ErrNoLast) {
648
649
s.log.Info("pebble no last")
649
650
} else {
650
651
s.log.Error("pebble seq fail", "err", err)
+3
-2
testing/utils.go
+3
-2
testing/utils.go
···
25
25
"github.com/bluesky-social/indigo/bgs"
26
26
"github.com/bluesky-social/indigo/carstore"
27
27
"github.com/bluesky-social/indigo/events"
28
+
"github.com/bluesky-social/indigo/events/diskpersist"
28
29
"github.com/bluesky-social/indigo/events/schedulers/sequential"
29
30
"github.com/bluesky-social/indigo/indexer"
30
31
lexutil "github.com/bluesky-social/indigo/lex/util"
···
559
560
560
561
repoman := repomgr.NewRepoManager(cs, kmgr)
561
562
562
-
opts := events.DefaultDiskPersistOptions()
563
+
opts := diskpersist.DefaultDiskPersistOptions()
563
564
opts.EventsPerFile = 10
564
-
diskpersist, err := events.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts)
565
+
diskpersist, err := diskpersist.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts)
565
566
566
567
evtman := events.NewEventManager(diskpersist)
567
568
rf := indexer.NewRepoFetcher(maindb, repoman, 10)