fork of indigo with slightly nicer lexgen

refactor persisters in to sub-packages

+28 -24
events/dbpersist.go events/dbpersist/dbpersist.go
··· 1 - package events 1 + package dbpersist 2 2 3 3 import ( 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 7 "fmt" 8 + "log/slog" 8 9 "sync" 9 10 "time" 10 11 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 13 "github.com/bluesky-social/indigo/carstore" 14 + "github.com/bluesky-social/indigo/events" 13 15 lexutil "github.com/bluesky-social/indigo/lex/util" 14 16 "github.com/bluesky-social/indigo/models" 15 17 "github.com/bluesky-social/indigo/util" ··· 19 21 "gorm.io/gorm" 20 22 ) 21 23 24 + var log = slog.Default().With("system", "dbpersist") 25 + 22 26 type PersistenceBatchItem struct { 23 27 Record *RepoEventRecord 24 - Event *XRPCStreamEvent 28 + Event *events.XRPCStreamEvent 25 29 } 26 30 27 31 type Options struct { ··· 55 59 56 60 lk sync.Mutex 57 61 58 - broadcast func(*XRPCStreamEvent) 62 + broadcast func(*events.XRPCStreamEvent) 59 63 60 64 batch []*PersistenceBatchItem 61 65 batchOptions Options ··· 137 141 } 138 142 } 139 143 140 - func (p *DbPersistence) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 144 + func (p *DbPersistence) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 141 145 p.broadcast = brc 142 146 } 143 147 ··· 187 191 return nil 188 192 } 189 193 190 - func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *XRPCStreamEvent) error { 194 + func (p *DbPersistence) AddItemToBatch(ctx context.Context, rec *RepoEventRecord, evt *events.XRPCStreamEvent) error { 191 195 p.lk.Lock() 192 196 defer p.lk.Unlock() 193 197 p.batch = append(p.batch, &PersistenceBatchItem{ ··· 204 208 return nil 205 209 } 206 210 207 - func (p *DbPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 211 + func (p *DbPersistence) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 208 212 var rer *RepoEventRecord 209 213 var err error 210 214 ··· 367 371 return &rer, nil 368 372 } 369 373 370 - func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 374 + func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 371 375 pageSize := 1000 372 376 373 377 for { ··· 416 420 return nil 417 421 } 418 422 419 - func (p *DbPersistence) hydrateBatch(ctx context.Context, batch []*RepoEventRecord, cb func(*XRPCStreamEvent) error) error { 420 - events := make([]*XRPCStreamEvent, len(batch)) 423 + func (p *DbPersistence) hydrateBatch(ctx context.Context, batch []*RepoEventRecord, cb func(*events.XRPCStreamEvent) error) error { 424 + evts := make([]*events.XRPCStreamEvent, len(batch)) 421 425 422 426 type Result struct { 423 - Event *XRPCStreamEvent 427 + Event *events.XRPCStreamEvent 424 428 Index int 425 429 Err error 426 430 } ··· 439 443 // release the semaphore at the end of the goroutine 440 444 defer func() { <-sem }() 441 445 442 - var streamEvent *XRPCStreamEvent 446 + var streamEvent *events.XRPCStreamEvent 443 447 var err error 444 448 445 449 switch { ··· 473 477 return result.Err 474 478 } 475 479 476 - events[result.Index] = result.Event 480 + evts[result.Index] = result.Event 477 481 478 - for ; cur < len(events) && events[cur] != nil; cur++ { 479 - if err := cb(events[cur]); err != nil { 482 + for ; cur < len(evts) && evts[cur] != nil; cur++ { 483 + if err := cb(evts[cur]); err != nil { 480 484 return err 481 485 } 482 486 } ··· 515 519 return u.Did, nil 516 520 } 517 521 518 - func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 522 + func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 519 523 if rer.NewHandle == nil { 520 524 return nil, fmt.Errorf("NewHandle is nil") 521 525 } ··· 525 529 return nil, err 526 530 } 527 531 528 - return &XRPCStreamEvent{ 532 + return &events.XRPCStreamEvent{ 529 533 RepoHandle: &comatproto.SyncSubscribeRepos_Handle{ 530 534 Did: did, 531 535 Handle: *rer.NewHandle, ··· 534 538 }, nil 535 539 } 536 540 537 - func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 541 + func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 538 542 did, err := p.didForUid(ctx, rer.Repo) 539 543 if err != nil { 540 544 return nil, err 541 545 } 542 546 543 - return &XRPCStreamEvent{ 547 + return &events.XRPCStreamEvent{ 544 548 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 545 549 Did: did, 546 550 Time: rer.Time.Format(util.ISO8601), ··· 548 552 }, nil 549 553 } 550 554 551 - func (p *DbPersistence) hydrateAccountEvent(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 555 + func (p *DbPersistence) hydrateAccountEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 552 556 did, err := p.didForUid(ctx, rer.Repo) 553 557 if err != nil { 554 558 return nil, err 555 559 } 556 560 557 - return &XRPCStreamEvent{ 561 + return &events.XRPCStreamEvent{ 558 562 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 559 563 Did: did, 560 564 Time: rer.Time.Format(util.ISO8601), ··· 564 568 }, nil 565 569 } 566 570 567 - func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 571 + func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 568 572 did, err := p.didForUid(ctx, rer.Repo) 569 573 if err != nil { 570 574 return nil, err 571 575 } 572 576 573 - return &XRPCStreamEvent{ 577 + return &events.XRPCStreamEvent{ 574 578 RepoTombstone: &comatproto.SyncSubscribeRepos_Tombstone{ 575 579 Did: did, 576 580 Time: rer.Time.Format(util.ISO8601), ··· 578 582 }, nil 579 583 } 580 584 581 - func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) (*XRPCStreamEvent, error) { 585 + func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) { 582 586 if rer.Commit == nil { 583 587 return nil, fmt.Errorf("commit is nil") 584 588 } ··· 632 636 out.Blocks = cs 633 637 } 634 638 635 - return &XRPCStreamEvent{RepoCommit: out}, nil 639 + return &events.XRPCStreamEvent{RepoCommit: out}, nil 636 640 } 637 641 638 642 func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
+10 -9
events/dbpersist_test.go events/dbpersist/dbpersist_test.go
··· 1 - package events 1 + package dbpersist 2 2 3 3 import ( 4 4 "context" ··· 11 11 atproto "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/api/bsky" 13 13 "github.com/bluesky-social/indigo/carstore" 14 + "github.com/bluesky-social/indigo/events" 14 15 lexutil "github.com/bluesky-social/indigo/lex/util" 15 16 "github.com/bluesky-social/indigo/models" 16 17 pds "github.com/bluesky-social/indigo/pds/data" ··· 61 62 } 62 63 63 64 // Create a bunch of events 64 - evtman := NewEventManager(dbp) 65 + evtman := events.NewEventManager(dbp) 65 66 66 67 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 67 68 if err != nil { 68 69 b.Fatal(err) 69 70 } 70 71 71 - inEvts := make([]*XRPCStreamEvent, b.N) 72 + inEvts := make([]*events.XRPCStreamEvent, b.N) 72 73 for i := 0; i < b.N; i++ { 73 74 cidLink := lexutil.LexLink(cid) 74 75 headLink := lexutil.LexLink(userRepoHead) 75 - inEvts[i] = &XRPCStreamEvent{ 76 + inEvts[i] = &events.XRPCStreamEvent{ 76 77 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 77 78 Repo: "did:example:123", 78 79 Commit: headLink, ··· 130 131 131 132 b.StopTimer() 132 133 133 - dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 134 + dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 134 135 outEvtCount++ 135 136 return nil 136 137 }) ··· 183 184 } 184 185 185 186 // Create a bunch of events 186 - evtman := NewEventManager(dbp) 187 + evtman := events.NewEventManager(dbp) 187 188 188 189 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 189 190 if err != nil { 190 191 b.Fatal(err) 191 192 } 192 193 193 - inEvts := make([]*XRPCStreamEvent, n) 194 + inEvts := make([]*events.XRPCStreamEvent, n) 194 195 for i := 0; i < n; i++ { 195 196 cidLink := lexutil.LexLink(cid) 196 197 headLink := lexutil.LexLink(userRepoHead) 197 - inEvts[i] = &XRPCStreamEvent{ 198 + inEvts[i] = &events.XRPCStreamEvent{ 198 199 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 199 200 Repo: "did:example:123", 200 201 Commit: headLink, ··· 250 251 251 252 b.ResetTimer() 252 253 253 - dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 254 + dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 254 255 outEvtCount++ 255 256 return nil 256 257 })
+18 -14
events/diskpersist.go events/diskpersist/diskpersist.go
··· 1 - package events 1 + package diskpersist 2 2 3 3 import ( 4 4 "bufio" ··· 8 8 "errors" 9 9 "fmt" 10 10 "io" 11 + "log/slog" 11 12 "os" 12 13 "path/filepath" 13 14 "sync" 14 15 "time" 15 16 16 17 "github.com/bluesky-social/indigo/api/atproto" 18 + "github.com/bluesky-social/indigo/events" 17 19 "github.com/bluesky-social/indigo/models" 18 20 arc "github.com/hashicorp/golang-lru/arc/v2" 19 21 "github.com/prometheus/client_golang/prometheus" ··· 21 23 cbg "github.com/whyrusleeping/cbor-gen" 22 24 "gorm.io/gorm" 23 25 ) 26 + 27 + var log = slog.Default().With("system", "diskpersist") 24 28 25 29 type DiskPersistence struct { 26 30 primaryDir string ··· 31 35 32 36 meta *gorm.DB 33 37 34 - broadcast func(*XRPCStreamEvent) 38 + broadcast func(*events.XRPCStreamEvent) 35 39 36 40 logfi *os.File 37 41 ··· 54 58 55 59 type persistJob struct { 56 60 Bytes []byte 57 - Evt *XRPCStreamEvent 61 + Evt *events.XRPCStreamEvent 58 62 Buffer *bytes.Buffer // so we can put it back in the pool when we're done 59 63 } 60 64 ··· 68 72 EvtFlagRebased 69 73 ) 70 74 71 - var _ (EventPersistence) = (*DiskPersistence)(nil) 75 + var _ (events.EventPersistence) = (*DiskPersistence)(nil) 72 76 73 77 type DiskPersistOptions struct { 74 78 UIDCacheSize int ··· 487 491 return nil 488 492 } 489 493 490 - func (dp *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 494 + func (dp *DiskPersistence) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 491 495 buffer := dp.buffers.Get().(*bytes.Buffer) 492 496 cw := dp.writers.Get().(*cbg.CborWriter) 493 497 cw.SetWriter(buffer) ··· 631 635 return u.Uid, nil 632 636 } 633 637 634 - func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 638 + func (dp *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 635 639 base := since - (since % dp.eventsPerFile) 636 640 var logs []LogFileRef 637 641 if err := dp.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", base).Error; err != nil { ··· 658 662 return nil 659 663 } 660 664 661 - func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) { 665 + func (dp *DiskPersistence) PlaybackLogfiles(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error, logFiles []LogFileRef) (*int64, error) { 662 666 for i, lf := range logFiles { 663 667 lastSeq, err := dp.readEventsFrom(ctx, since, filepath.Join(dp.primaryDir, lf.Path), cb) 664 668 if err != nil { ··· 684 688 return false 685 689 } 686 690 687 - func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) (*int64, error) { 691 + func (dp *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*events.XRPCStreamEvent) error) (*int64, error) { 688 692 fi, err := os.OpenFile(fn, os.O_RDONLY, 0) 689 693 if err != nil { 690 694 return nil, err ··· 738 742 return nil, err 739 743 } 740 744 evt.Seq = h.Seq 741 - if err := cb(&XRPCStreamEvent{RepoCommit: &evt}); err != nil { 745 + if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil { 742 746 return nil, err 743 747 } 744 748 case evtKindHandle: ··· 747 751 return nil, err 748 752 } 749 753 evt.Seq = h.Seq 750 - if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil { 754 + if err := cb(&events.XRPCStreamEvent{RepoHandle: &evt}); err != nil { 751 755 return nil, err 752 756 } 753 757 case evtKindIdentity: ··· 756 760 return nil, err 757 761 } 758 762 evt.Seq = h.Seq 759 - if err := cb(&XRPCStreamEvent{RepoIdentity: &evt}); err != nil { 763 + if err := cb(&events.XRPCStreamEvent{RepoIdentity: &evt}); err != nil { 760 764 return nil, err 761 765 } 762 766 case evtKindAccount: ··· 765 769 return nil, err 766 770 } 767 771 evt.Seq = h.Seq 768 - if err := cb(&XRPCStreamEvent{RepoAccount: &evt}); err != nil { 772 + if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil { 769 773 return nil, err 770 774 } 771 775 case evtKindTombstone: ··· 774 778 return nil, err 775 779 } 776 780 evt.Seq = h.Seq 777 - if err := cb(&XRPCStreamEvent{RepoTombstone: &evt}); err != nil { 781 + if err := cb(&events.XRPCStreamEvent{RepoTombstone: &evt}); err != nil { 778 782 return nil, err 779 783 } 780 784 default: ··· 931 935 return nil 932 936 } 933 937 934 - func (dp *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent)) { 938 + func (dp *DiskPersistence) SetEventBroadcaster(f func(*events.XRPCStreamEvent)) { 935 939 dp.broadcast = f 936 940 }
+67 -24
events/diskpersist_test.go events/diskpersist/diskpersist_test.go
··· 1 - package events 1 + package diskpersist 2 2 3 3 import ( 4 4 "context" ··· 14 14 atproto "github.com/bluesky-social/indigo/api/atproto" 15 15 "github.com/bluesky-social/indigo/api/bsky" 16 16 "github.com/bluesky-social/indigo/carstore" 17 + "github.com/bluesky-social/indigo/events" 17 18 lexutil "github.com/bluesky-social/indigo/lex/util" 18 19 "github.com/bluesky-social/indigo/models" 19 20 pds "github.com/bluesky-social/indigo/pds/data" 20 21 "github.com/bluesky-social/indigo/repomgr" 21 22 "github.com/bluesky-social/indigo/util" 23 + "gorm.io/driver/sqlite" 22 24 "gorm.io/gorm" 23 25 ) 24 26 25 - func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (EventPersistence, error)) { 27 + func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (events.EventPersistence, error)) { 26 28 ctx := context.Background() 27 29 28 30 db, _, cs, tempPath, err := setupDBs(t) ··· 63 65 } 64 66 65 67 // Create a bunch of events 66 - evtman := NewEventManager(dp) 68 + evtman := events.NewEventManager(dp) 67 69 68 70 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 69 71 if err != nil { ··· 71 73 } 72 74 73 75 n := 100 74 - inEvts := make([]*XRPCStreamEvent, n) 76 + inEvts := make([]*events.XRPCStreamEvent, n) 75 77 for i := 0; i < n; i++ { 76 78 cidLink := lexutil.LexLink(cid) 77 79 headLink := lexutil.LexLink(userRepoHead) 78 - inEvts[i] = &XRPCStreamEvent{ 80 + inEvts[i] = &events.XRPCStreamEvent{ 79 81 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 80 82 Repo: "did:example:123", 81 83 Commit: headLink, ··· 107 109 outEvtCount := 0 108 110 expectedEvtCount := n 109 111 110 - dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 112 + dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 111 113 outEvtCount++ 112 114 return nil 113 115 }) ··· 129 131 t.Fatal(err) 130 132 } 131 133 132 - evtman2 := NewEventManager(dp2) 134 + evtman2 := events.NewEventManager(dp2) 133 135 134 - inEvts = make([]*XRPCStreamEvent, n) 136 + inEvts = make([]*events.XRPCStreamEvent, n) 135 137 for i := 0; i < n; i++ { 136 138 cidLink := lexutil.LexLink(cid) 137 139 headLink := lexutil.LexLink(userRepoHead) 138 - inEvts[i] = &XRPCStreamEvent{ 140 + inEvts[i] = &events.XRPCStreamEvent{ 139 141 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 140 142 Repo: "did:example:123", 141 143 Commit: headLink, ··· 159 161 } 160 162 } 161 163 func TestDiskPersist(t *testing.T) { 162 - factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { 164 + factory := func(tempPath string, db *gorm.DB) (events.EventPersistence, error) { 163 165 return NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 164 166 EventsPerFile: 10, 165 167 UIDCacheSize: 100000, ··· 192 194 193 195 } 194 196 195 - func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { 197 + func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 196 198 ctx := context.Background() 197 199 198 200 db.AutoMigrate(&pds.User{}) ··· 220 222 } 221 223 222 224 // Create a bunch of events 223 - evtman := NewEventManager(p) 225 + evtman := events.NewEventManager(p) 224 226 225 227 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 226 228 if err != nil { 227 229 b.Fatal(err) 228 230 } 229 231 230 - inEvts := make([]*XRPCStreamEvent, b.N) 232 + inEvts := make([]*events.XRPCStreamEvent, b.N) 231 233 for i := 0; i < b.N; i++ { 232 234 cidLink := lexutil.LexLink(cid) 233 235 headLink := lexutil.LexLink(userRepoHead) 234 - inEvts[i] = &XRPCStreamEvent{ 236 + inEvts[i] = &events.XRPCStreamEvent{ 235 237 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 236 238 Repo: "did:example:123", 237 239 Commit: headLink, ··· 307 309 runEventManagerTest(t, cs, db, dp) 308 310 } 309 311 310 - func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { 312 + func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 311 313 ctx := context.Background() 312 314 313 315 db.AutoMigrate(&pds.User{}) ··· 334 336 t.Fatal(err) 335 337 } 336 338 337 - evtman := NewEventManager(p) 339 + evtman := events.NewEventManager(p) 338 340 339 341 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 340 342 if err != nil { ··· 342 344 } 343 345 344 346 testSize := 100 // you can adjust this number as needed 345 - inEvts := make([]*XRPCStreamEvent, testSize) 347 + inEvts := make([]*events.XRPCStreamEvent, testSize) 346 348 for i := 0; i < testSize; i++ { 347 349 cidLink := lexutil.LexLink(cid) 348 350 headLink := lexutil.LexLink(userRepoHead) 349 - inEvts[i] = &XRPCStreamEvent{ 351 + inEvts[i] = &events.XRPCStreamEvent{ 350 352 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 351 353 Repo: "did:example:123", 352 354 Commit: headLink, ··· 373 375 } 374 376 375 377 outEvtCount := 0 376 - p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 378 + p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 377 379 // Check that the contents of the output events match the input events 378 380 // Clear cache, don't care if one has it and not the other 379 381 inEvts[outEvtCount].Preserialized = nil ··· 414 416 runTakedownTest(t, cs, db, dp) 415 417 } 416 418 417 - func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { 419 + func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 418 420 ctx := context.TODO() 419 421 420 422 db.AutoMigrate(&pds.User{}) ··· 444 446 } 445 447 } 446 448 447 - evtman := NewEventManager(p) 449 + evtman := events.NewEventManager(p) 448 450 449 451 testSize := 100 // you can adjust this number as needed 450 - inEvts := make([]*XRPCStreamEvent, testSize*userCount) 452 + inEvts := make([]*events.XRPCStreamEvent, testSize*userCount) 451 453 for i := 0; i < testSize*userCount; i++ { 452 454 user := users[i%userCount] 453 455 _, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{ ··· 465 467 466 468 cidLink := lexutil.LexLink(cid) 467 469 headLink := lexutil.LexLink(userRepoHead) 468 - inEvts[i] = &XRPCStreamEvent{ 470 + inEvts[i] = &events.XRPCStreamEvent{ 469 471 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 470 472 Repo: user.Did, 471 473 Commit: headLink, ··· 500 502 501 503 // Verify that the events of the user have been removed from the event stream 502 504 var evtsCount int 503 - if err := p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 505 + if err := p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 504 506 evtsCount++ 505 507 if evt.RepoCommit.Repo == takeDownUser.Did { 506 508 t.Fatalf("found event for user %d after takedown", takeDownUser.Uid) ··· 515 517 t.Fatalf("wrong number of events out: %d != %d", evtsCount, exp) 516 518 } 517 519 } 520 + 521 + func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, error) { 522 + dir, err := os.MkdirTemp("", "integtest") 523 + if err != nil { 524 + return nil, nil, nil, "", err 525 + } 526 + 527 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite?cache=shared&mode=rwc"))) 528 + if err != nil { 529 + return nil, nil, nil, "", err 530 + } 531 + 532 + tx := maindb.Exec("PRAGMA journal_mode=WAL;") 533 + if tx.Error != nil { 534 + return nil, nil, nil, "", tx.Error 535 + } 536 + 537 + tx.Commit() 538 + 539 + cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite?cache=shared&mode=rwc"))) 540 + if err != nil { 541 + return nil, nil, nil, "", err 542 + } 543 + 544 + tx = cardb.Exec("PRAGMA journal_mode=WAL;") 545 + if tx.Error != nil { 546 + return nil, nil, nil, "", tx.Error 547 + } 548 + 549 + cspath := filepath.Join(dir, "carstore") 550 + if err := os.Mkdir(cspath, 0775); err != nil { 551 + return nil, nil, nil, "", err 552 + } 553 + 554 + cs, err := carstore.NewCarStore(cardb, []string{cspath}) 555 + if err != nil { 556 + return nil, nil, nil, "", err 557 + } 558 + 559 + return maindb, cardb, cs, dir, nil 560 + }
+12 -8
events/pebblepersist.go events/pebblepersist/pebblepersist.go
··· 1 - package events 1 + package pebblepersist 2 2 3 3 import ( 4 4 "bytes" ··· 7 7 "encoding/hex" 8 8 "errors" 9 9 "fmt" 10 + "log/slog" 10 11 "time" 11 12 13 + "github.com/bluesky-social/indigo/events" 12 14 "github.com/bluesky-social/indigo/models" 13 15 "github.com/cockroachdb/pebble" 14 16 ) 15 17 18 + var log = slog.Default().With("system", "pebblepersist") 19 + 16 20 type PebblePersist struct { 17 - broadcast func(*XRPCStreamEvent) 21 + broadcast func(*events.XRPCStreamEvent) 18 22 db *pebble.DB 19 23 20 24 prevSeq int64 ··· 66 70 binary.BigEndian.PutUint64(key[8:16], uint64(millis)) 67 71 } 68 72 69 - func (pp *PebblePersist) Persist(ctx context.Context, e *XRPCStreamEvent) error { 73 + func (pp *PebblePersist) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 70 74 err := e.Preserialize() 71 75 if err != nil { 72 76 return err ··· 101 105 return err 102 106 } 103 107 104 - func eventFromPebbleIter(iter *pebble.Iterator) (*XRPCStreamEvent, error) { 108 + func eventFromPebbleIter(iter *pebble.Iterator) (*events.XRPCStreamEvent, error) { 105 109 blob, err := iter.ValueAndErr() 106 110 if err != nil { 107 111 return nil, err 108 112 } 109 113 br := bytes.NewReader(blob) 110 - evt := new(XRPCStreamEvent) 114 + evt := new(events.XRPCStreamEvent) 111 115 err = evt.Deserialize(br) 112 116 if err != nil { 113 117 return nil, err ··· 116 120 return evt, nil 117 121 } 118 122 119 - func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 123 + func (pp *PebblePersist) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 120 124 var key [8]byte 121 125 binary.BigEndian.PutUint64(key[:], uint64(since)) 122 126 ··· 156 160 return err 157 161 } 158 162 159 - func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*XRPCStreamEvent)) { 163 + func (pp *PebblePersist) SetEventBroadcaster(broadcast func(*events.XRPCStreamEvent)) { 160 164 pp.broadcast = broadcast 161 165 } 162 166 163 167 var ErrNoLast = errors.New("no last event") 164 168 165 - func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *XRPCStreamEvent, err error) { 169 + func (pp *PebblePersist) GetLast(ctx context.Context) (seq, millis int64, evt *events.XRPCStreamEvent, err error) { 166 170 iter, err := pp.db.NewIterWithContext(ctx, &pebble.IterOptions{}) 167 171 if err != nil { 168 172 return 0, 0, nil, err
+210
events/pebblepersist/pebblepersist_test.go
··· 1 + package pebblepersist 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "path/filepath" 7 + "testing" 8 + "time" 9 + 10 + atproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/api/bsky" 12 + "github.com/bluesky-social/indigo/carstore" 13 + "github.com/bluesky-social/indigo/events" 14 + "github.com/bluesky-social/indigo/events/diskpersist" 15 + lexutil "github.com/bluesky-social/indigo/lex/util" 16 + "github.com/bluesky-social/indigo/models" 17 + pds "github.com/bluesky-social/indigo/pds/data" 18 + "github.com/bluesky-social/indigo/repomgr" 19 + "github.com/bluesky-social/indigo/util" 20 + 21 + "gorm.io/driver/sqlite" 22 + "gorm.io/gorm" 23 + ) 24 + 25 + func TestPebblePersist(t *testing.T) { 26 + factory := func(tempPath string, db *gorm.DB) (events.EventPersistence, error) { 27 + opts := DefaultPebblePersistOptions 28 + opts.DbPath = filepath.Join(tempPath, "pebble.db") 29 + return NewPebblePersistance(&opts) 30 + } 31 + testPersister(t, factory) 32 + } 33 + 34 + func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (events.EventPersistence, error)) { 35 + ctx := context.Background() 36 + 37 + db, _, cs, tempPath, err := setupDBs(t) 38 + if err != nil { 39 + t.Fatal(err) 40 + } 41 + 42 + db.AutoMigrate(&pds.User{}) 43 + db.AutoMigrate(&pds.Peering{}) 44 + db.AutoMigrate(&models.ActorInfo{}) 45 + 46 + db.Create(&models.ActorInfo{ 47 + Uid: 1, 48 + Did: "did:example:123", 49 + }) 50 + 51 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 52 + 53 + err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 54 + if err != nil { 55 + t.Fatal(err) 56 + } 57 + 58 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 59 + Text: "hello world", 60 + CreatedAt: time.Now().Format(util.ISO8601), 61 + }) 62 + if err != nil { 63 + t.Fatal(err) 64 + } 65 + 66 + defer os.RemoveAll(tempPath) 67 + 68 + // Initialize a persister 69 + dp, err := perisistenceFactory(tempPath, db) 70 + if err != nil { 71 + t.Fatal(err) 72 + } 73 + 74 + // Create a bunch of events 75 + evtman := events.NewEventManager(dp) 76 + 77 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 78 + if err != nil { 79 + t.Fatal(err) 80 + } 81 + 82 + n := 100 83 + inEvts := make([]*events.XRPCStreamEvent, n) 84 + for i := 0; i < n; i++ { 85 + cidLink := lexutil.LexLink(cid) 86 + headLink := lexutil.LexLink(userRepoHead) 87 + inEvts[i] = &events.XRPCStreamEvent{ 88 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 89 + Repo: "did:example:123", 90 + Commit: headLink, 91 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 92 + { 93 + Action: "add", 94 + Cid: &cidLink, 95 + Path: "path1", 96 + }, 97 + }, 98 + Time: time.Now().Format(util.ISO8601), 99 + Seq: int64(i), 100 + }, 101 + } 102 + } 103 + 104 + // Add events in parallel 105 + for i := 0; i < n; i++ { 106 + err = evtman.AddEvent(ctx, inEvts[i]) 107 + if err != nil { 108 + t.Fatal(err) 109 + } 110 + } 111 + 112 + if err := dp.Flush(ctx); err != nil { 113 + t.Fatal(err) 114 + } 115 + 116 + outEvtCount := 0 117 + expectedEvtCount := n 118 + 119 + dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 120 + outEvtCount++ 121 + return nil 122 + }) 123 + 124 + if outEvtCount != expectedEvtCount { 125 + t.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 126 + } 127 + 128 + dp.Shutdown(ctx) 129 + 130 + time.Sleep(time.Millisecond * 100) 131 + 132 + dp2, err := diskpersist.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &diskpersist.DiskPersistOptions{ 133 + EventsPerFile: 10, 134 + UIDCacheSize: 100000, 135 + DIDCacheSize: 100000, 136 + }) 137 + if err != nil { 138 + t.Fatal(err) 139 + } 140 + 141 + evtman2 := events.NewEventManager(dp2) 142 + 143 + inEvts = make([]*events.XRPCStreamEvent, n) 144 + for i := 0; i < n; i++ { 145 + cidLink := lexutil.LexLink(cid) 146 + headLink := lexutil.LexLink(userRepoHead) 147 + inEvts[i] = &events.XRPCStreamEvent{ 148 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 149 + Repo: "did:example:123", 150 + Commit: headLink, 151 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 152 + { 153 + Action: "add", 154 + Cid: &cidLink, 155 + Path: "path1", 156 + }, 157 + }, 158 + Time: time.Now().Format(util.ISO8601), 159 + }, 160 + } 161 + } 162 + 163 + for i := 0; i < n; i++ { 164 + err = evtman2.AddEvent(ctx, inEvts[i]) 165 + if err != nil { 166 + t.Fatal(err) 167 + } 168 + } 169 + } 170 + 171 + func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, error) { 172 + dir, err := os.MkdirTemp("", "integtest") 173 + if err != nil { 174 + return nil, nil, nil, "", err 175 + } 176 + 177 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite?cache=shared&mode=rwc"))) 178 + if err != nil { 179 + return nil, nil, nil, "", err 180 + } 181 + 182 + tx := maindb.Exec("PRAGMA journal_mode=WAL;") 183 + if tx.Error != nil { 184 + return nil, nil, nil, "", tx.Error 185 + } 186 + 187 + tx.Commit() 188 + 189 + cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite?cache=shared&mode=rwc"))) 190 + if err != nil { 191 + return nil, nil, nil, "", err 192 + } 193 + 194 + tx = cardb.Exec("PRAGMA journal_mode=WAL;") 195 + if tx.Error != nil { 196 + return nil, nil, nil, "", tx.Error 197 + } 198 + 199 + cspath := filepath.Join(dir, "carstore") 200 + if err := os.Mkdir(cspath, 0775); err != nil { 201 + return nil, nil, nil, "", err 202 + } 203 + 204 + cs, err := carstore.NewCarStore(cardb, []string{cspath}) 205 + if err != nil { 206 + return nil, nil, nil, "", err 207 + } 208 + 209 + return maindb, cardb, cs, dir, nil 210 + }
-16
events/pebblepersist_test.go
··· 1 - package events 2 - 3 - import ( 4 - "gorm.io/gorm" 5 - "path/filepath" 6 - "testing" 7 - ) 8 - 9 - func TestPebblePersist(t *testing.T) { 10 - factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { 11 - opts := DefaultPebblePersistOptions 12 - opts.DbPath = filepath.Join(tempPath, "pebble.db") 13 - return NewPebblePersistance(&opts) 14 - } 15 - testPersister(t, factory) 16 - }
+6 -5
events/yolopersist.go events/yolopersist/yolopersist.go
··· 1 - package events 1 + package yolopersist 2 2 3 3 import ( 4 4 "context" 5 5 "fmt" 6 6 "sync" 7 7 8 + "github.com/bluesky-social/indigo/events" 8 9 "github.com/bluesky-social/indigo/models" 9 10 ) 10 11 ··· 13 14 lk sync.Mutex 14 15 seq int64 15 16 16 - broadcast func(*XRPCStreamEvent) 17 + broadcast func(*events.XRPCStreamEvent) 17 18 } 18 19 19 20 func NewYoloPersister() *YoloPersister { 20 21 return &YoloPersister{} 21 22 } 22 23 23 - func (yp *YoloPersister) Persist(ctx context.Context, e *XRPCStreamEvent) error { 24 + func (yp *YoloPersister) Persist(ctx context.Context, e *events.XRPCStreamEvent) error { 24 25 yp.lk.Lock() 25 26 defer yp.lk.Unlock() 26 27 yp.seq++ ··· 48 49 return nil 49 50 } 50 51 51 - func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 52 + func (mp *YoloPersister) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error { 52 53 return fmt.Errorf("playback not supported by yolo persister, test usage only") 53 54 } 54 55 ··· 56 57 return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only") 57 58 } 58 59 59 - func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 60 + func (yp *YoloPersister) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) { 60 61 yp.broadcast = brc 61 62 } 62 63