porting all github actions from bluesky-social/indigo to tangled CI

fix events tests add pebble test

+14 -15
events/dbpersist_test.go
··· 1 - package events_test 1 + package events 2 2 3 3 import ( 4 4 "context" ··· 11 11 atproto "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/api/bsky" 13 13 "github.com/bluesky-social/indigo/carstore" 14 - "github.com/bluesky-social/indigo/events" 15 14 lexutil "github.com/bluesky-social/indigo/lex/util" 16 15 "github.com/bluesky-social/indigo/models" 17 - "github.com/bluesky-social/indigo/pds" 16 + pds "github.com/bluesky-social/indigo/pds/data" 18 17 "github.com/bluesky-social/indigo/repomgr" 19 18 "github.com/bluesky-social/indigo/util" 20 - "github.com/ipfs/go-log/v2" 19 + logging "github.com/ipfs/go-log/v2" 21 20 "gorm.io/driver/sqlite" 22 21 "gorm.io/gorm" 23 22 ) 24 23 25 24 func init() { 26 - log.SetAllLoggers(log.LevelDebug) 25 + logging.SetAllLoggers(logging.LevelDebug) 27 26 } 28 27 29 28 func BenchmarkDBPersist(b *testing.B) { ··· 61 60 defer os.RemoveAll(tempPath) 62 61 63 62 // Initialize a DBPersister 64 - dbp, err := events.NewDbPersistence(db, cs, nil) 63 + dbp, err := NewDbPersistence(db, cs, nil) 65 64 if err != nil { 66 65 b.Fatal(err) 67 66 } 68 67 69 68 // Create a bunch of events 70 - evtman := events.NewEventManager(dbp) 69 + evtman := NewEventManager(dbp) 71 70 72 71 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 73 72 if err != nil { 74 73 b.Fatal(err) 75 74 } 76 75 77 - inEvts := make([]*events.XRPCStreamEvent, b.N) 76 + inEvts := make([]*XRPCStreamEvent, b.N) 78 77 for i := 0; i < b.N; i++ { 79 78 cidLink := lexutil.LexLink(cid) 80 79 headLink := lexutil.LexLink(userRepoHead) 81 - inEvts[i] = &events.XRPCStreamEvent{ 80 + inEvts[i] = &XRPCStreamEvent{ 82 81 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 83 82 Repo: "did:example:123", 84 83 Commit: headLink, ··· 136 135 137 136 b.StopTimer() 138 137 139 - dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 138 + dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 140 139 outEvtCount++ 141 140 return nil 142 141 }) ··· 183 182 defer os.RemoveAll(tempPath) 184 183 185 184 // Initialize a DBPersister 186 - dbp, err := events.NewDbPersistence(db, cs, nil) 185 + dbp, err := NewDbPersistence(db, cs, nil) 187 186 if err != nil { 188 187 b.Fatal(err) 189 188 } 190 189 191 190 // Create a bunch of events 192 - evtman := events.NewEventManager(dbp) 191 + evtman := NewEventManager(dbp) 193 192 194 193 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 195 194 if err != nil { 196 195 b.Fatal(err) 197 196 } 198 197 199 - inEvts := make([]*events.XRPCStreamEvent, n) 198 + inEvts := make([]*XRPCStreamEvent, n) 200 199 for i := 0; i < n; i++ { 201 200 cidLink := lexutil.LexLink(cid) 202 201 headLink := lexutil.LexLink(userRepoHead) 203 - inEvts[i] = &events.XRPCStreamEvent{ 202 + inEvts[i] = &XRPCStreamEvent{ 204 203 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 205 204 Repo: "did:example:123", 206 205 Commit: headLink, ··· 256 255 257 256 b.ResetTimer() 258 257 259 - dbp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 258 + dbp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 260 259 outEvtCount++ 261 260 return nil 262 261 })
+175 -29
events/diskpersist_test.go
··· 1 - package events_test 1 + package events 2 2 3 3 import ( 4 4 "context" ··· 14 14 atproto "github.com/bluesky-social/indigo/api/atproto" 15 15 "github.com/bluesky-social/indigo/api/bsky" 16 16 "github.com/bluesky-social/indigo/carstore" 17 - "github.com/bluesky-social/indigo/events" 18 17 lexutil "github.com/bluesky-social/indigo/lex/util" 19 18 "github.com/bluesky-social/indigo/models" 20 - "github.com/bluesky-social/indigo/pds" 19 + pds "github.com/bluesky-social/indigo/pds/data" 21 20 "github.com/bluesky-social/indigo/repomgr" 22 21 "github.com/bluesky-social/indigo/util" 23 22 "gorm.io/gorm" 24 23 ) 25 24 25 + func testPersister(t *testing.T, perisistenceFactory func(path string, db *gorm.DB) (EventPersistence, error)) { 26 + ctx := context.Background() 27 + 28 + db, _, cs, tempPath, err := setupDBs(t) 29 + if err != nil { 30 + t.Fatal(err) 31 + } 32 + 33 + db.AutoMigrate(&pds.User{}) 34 + db.AutoMigrate(&pds.Peering{}) 35 + db.AutoMigrate(&models.ActorInfo{}) 36 + 37 + db.Create(&models.ActorInfo{ 38 + Uid: 1, 39 + Did: "did:example:123", 40 + }) 41 + 42 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 43 + 44 + err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 45 + if err != nil { 46 + t.Fatal(err) 47 + } 48 + 49 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 50 + Text: "hello world", 51 + CreatedAt: time.Now().Format(util.ISO8601), 52 + }) 53 + if err != nil { 54 + t.Fatal(err) 55 + } 56 + 57 + defer os.RemoveAll(tempPath) 58 + 59 + // Initialize a persister 60 + dp, err := perisistenceFactory(tempPath, db) 61 + if err != nil { 62 + t.Fatal(err) 63 + } 64 + 65 + // Create a bunch of events 66 + evtman := NewEventManager(dp) 67 + 68 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 69 + if err != nil { 70 + t.Fatal(err) 71 + } 72 + 73 + n := 100 74 + inEvts := make([]*XRPCStreamEvent, n) 75 + for i := 0; i < n; i++ { 76 + cidLink := lexutil.LexLink(cid) 77 + headLink := lexutil.LexLink(userRepoHead) 78 + inEvts[i] = &XRPCStreamEvent{ 79 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 80 + Repo: "did:example:123", 81 + Commit: headLink, 82 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 83 + { 84 + Action: "add", 85 + Cid: &cidLink, 86 + Path: "path1", 87 + }, 88 + }, 89 + Time: time.Now().Format(util.ISO8601), 90 + Seq: int64(i), 91 + }, 92 + } 93 + } 94 + 95 + // Add events in parallel 96 + for i := 0; i < n; i++ { 97 + err = evtman.AddEvent(ctx, inEvts[i]) 98 + if err != nil { 99 + t.Fatal(err) 100 + } 101 + } 102 + 103 + if err := dp.Flush(ctx); err != nil { 104 + t.Fatal(err) 105 + } 106 + 107 + outEvtCount := 0 108 + expectedEvtCount := n 109 + 110 + dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 111 + outEvtCount++ 112 + return nil 113 + }) 114 + 115 + if outEvtCount != expectedEvtCount { 116 + t.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 117 + } 118 + 119 + dp.Shutdown(ctx) 120 + 121 + time.Sleep(time.Millisecond * 100) 122 + 123 + dp2, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 124 + EventsPerFile: 10, 125 + UIDCacheSize: 100000, 126 + DIDCacheSize: 100000, 127 + }) 128 + if err != nil { 129 + t.Fatal(err) 130 + } 131 + 132 + evtman2 := NewEventManager(dp2) 133 + 134 + inEvts = make([]*XRPCStreamEvent, n) 135 + for i := 0; i < n; i++ { 136 + cidLink := lexutil.LexLink(cid) 137 + headLink := lexutil.LexLink(userRepoHead) 138 + inEvts[i] = &XRPCStreamEvent{ 139 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 140 + Repo: "did:example:123", 141 + Commit: headLink, 142 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 143 + { 144 + Action: "add", 145 + Cid: &cidLink, 146 + Path: "path1", 147 + }, 148 + }, 149 + Time: time.Now().Format(util.ISO8601), 150 + }, 151 + } 152 + } 153 + 154 + for i := 0; i < n; i++ { 155 + err = evtman2.AddEvent(ctx, inEvts[i]) 156 + if err != nil { 157 + t.Fatal(err) 158 + } 159 + } 160 + } 26 161 func TestDiskPersist(t *testing.T) { 162 + factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { 163 + return NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 164 + EventsPerFile: 10, 165 + UIDCacheSize: 100000, 166 + DIDCacheSize: 100000, 167 + }) 168 + } 169 + testPersister(t, factory) 170 + } 171 + 172 + func XTestDiskPersist(t *testing.T) { 27 173 ctx := context.Background() 28 174 29 175 db, _, cs, tempPath, err := setupDBs(t) ··· 59 205 60 206 // Initialize a DBPersister 61 207 62 - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 208 + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 63 209 EventsPerFile: 10, 64 210 UIDCacheSize: 100000, 65 211 DIDCacheSize: 100000, ··· 69 215 } 70 216 71 217 // Create a bunch of events 72 - evtman := events.NewEventManager(dp) 218 + evtman := NewEventManager(dp) 73 219 74 220 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 75 221 if err != nil { ··· 77 223 } 78 224 79 225 n := 100 80 - inEvts := make([]*events.XRPCStreamEvent, n) 226 + inEvts := make([]*XRPCStreamEvent, n) 81 227 for i := 0; i < n; i++ { 82 228 cidLink := lexutil.LexLink(cid) 83 229 headLink := lexutil.LexLink(userRepoHead) 84 - inEvts[i] = &events.XRPCStreamEvent{ 230 + inEvts[i] = &XRPCStreamEvent{ 85 231 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 86 232 Repo: "did:example:123", 87 233 Commit: headLink, ··· 112 258 outEvtCount := 0 113 259 expectedEvtCount := n 114 260 115 - dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 261 + dp.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 116 262 outEvtCount++ 117 263 return nil 118 264 }) ··· 125 271 126 272 time.Sleep(time.Millisecond * 100) 127 273 128 - dp2, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 274 + dp2, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 129 275 EventsPerFile: 10, 130 276 UIDCacheSize: 100000, 131 277 DIDCacheSize: 100000, ··· 134 280 t.Fatal(err) 135 281 } 136 282 137 - evtman2 := events.NewEventManager(dp2) 283 + evtman2 := NewEventManager(dp2) 138 284 139 - inEvts = make([]*events.XRPCStreamEvent, n) 285 + inEvts = make([]*XRPCStreamEvent, n) 140 286 for i := 0; i < n; i++ { 141 287 cidLink := lexutil.LexLink(cid) 142 288 headLink := lexutil.LexLink(userRepoHead) 143 - inEvts[i] = &events.XRPCStreamEvent{ 289 + inEvts[i] = &XRPCStreamEvent{ 144 290 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 145 291 Repo: "did:example:123", 146 292 Commit: headLink, ··· 174 320 175 321 // Initialize a DBPersister 176 322 177 - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 323 + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 178 324 EventsPerFile: 5000, 179 325 UIDCacheSize: 100000, 180 326 DIDCacheSize: 100000, ··· 187 333 188 334 } 189 335 190 - func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 336 + func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { 191 337 ctx := context.Background() 192 338 193 339 db.AutoMigrate(&pds.User{}) ··· 215 361 } 216 362 217 363 // Create a bunch of events 218 - evtman := events.NewEventManager(p) 364 + evtman := NewEventManager(p) 219 365 220 366 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 221 367 if err != nil { 222 368 b.Fatal(err) 223 369 } 224 370 225 - inEvts := make([]*events.XRPCStreamEvent, b.N) 371 + inEvts := make([]*XRPCStreamEvent, b.N) 226 372 for i := 0; i < b.N; i++ { 227 373 cidLink := lexutil.LexLink(cid) 228 374 headLink := lexutil.LexLink(userRepoHead) 229 - inEvts[i] = &events.XRPCStreamEvent{ 375 + inEvts[i] = &XRPCStreamEvent{ 230 376 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 231 377 Repo: "did:example:123", 232 378 Commit: headLink, ··· 290 436 291 437 // Initialize a DBPersister 292 438 293 - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 439 + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 294 440 EventsPerFile: 20, 295 441 UIDCacheSize: 100000, 296 442 DIDCacheSize: 100000, ··· 302 448 runEventManagerTest(t, cs, db, dp) 303 449 } 304 450 305 - func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 451 + func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { 306 452 ctx := context.Background() 307 453 308 454 db.AutoMigrate(&pds.User{}) ··· 329 475 t.Fatal(err) 330 476 } 331 477 332 - evtman := events.NewEventManager(p) 478 + evtman := NewEventManager(p) 333 479 334 480 userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 335 481 if err != nil { ··· 337 483 } 338 484 339 485 testSize := 100 // you can adjust this number as needed 340 - inEvts := make([]*events.XRPCStreamEvent, testSize) 486 + inEvts := make([]*XRPCStreamEvent, testSize) 341 487 for i := 0; i < testSize; i++ { 342 488 cidLink := lexutil.LexLink(cid) 343 489 headLink := lexutil.LexLink(userRepoHead) 344 - inEvts[i] = &events.XRPCStreamEvent{ 490 + inEvts[i] = &XRPCStreamEvent{ 345 491 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 346 492 Repo: "did:example:123", 347 493 Commit: headLink, ··· 368 514 } 369 515 370 516 outEvtCount := 0 371 - p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 517 + p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 372 518 // Check that the contents of the output events match the input events 373 519 // Clear cache, don't care if one has it and not the other 374 520 inEvts[outEvtCount].Preserialized = nil ··· 397 543 398 544 // Initialize a DBPersister 399 545 400 - dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 546 + dp, err := NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &DiskPersistOptions{ 401 547 EventsPerFile: 10, 402 548 UIDCacheSize: 100000, 403 549 DIDCacheSize: 100000, ··· 409 555 runTakedownTest(t, cs, db, dp) 410 556 } 411 557 412 - func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 558 + func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p EventPersistence) { 413 559 ctx := context.TODO() 414 560 415 561 db.AutoMigrate(&pds.User{}) ··· 439 585 } 440 586 } 441 587 442 - evtman := events.NewEventManager(p) 588 + evtman := NewEventManager(p) 443 589 444 590 testSize := 100 // you can adjust this number as needed 445 - inEvts := make([]*events.XRPCStreamEvent, testSize*userCount) 591 + inEvts := make([]*XRPCStreamEvent, testSize*userCount) 446 592 for i := 0; i < testSize*userCount; i++ { 447 593 user := users[i%userCount] 448 594 _, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{ ··· 460 606 461 607 cidLink := lexutil.LexLink(cid) 462 608 headLink := lexutil.LexLink(userRepoHead) 463 - inEvts[i] = &events.XRPCStreamEvent{ 609 + inEvts[i] = &XRPCStreamEvent{ 464 610 RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 465 611 Repo: user.Did, 466 612 Commit: headLink, ··· 495 641 496 642 // Verify that the events of the user have been removed from the event stream 497 643 var evtsCount int 498 - if err := p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 644 + if err := p.Playback(ctx, 0, func(evt *XRPCStreamEvent) error { 499 645 evtsCount++ 500 646 if evt.RepoCommit.Repo == takeDownUser.Did { 501 647 t.Fatalf("found event for user %d after takedown", takeDownUser.Uid)
+19 -7
events/pebblepersist.go
··· 12 12 type PebblePersist struct { 13 13 broadcast func(*XRPCStreamEvent) 14 14 db *pebble.DB 15 + 16 + prevSeq int64 17 + prevSeqExtra uint32 15 18 } 16 19 17 20 func NewPebblePersistance(path string) (*PebblePersist, error) { ··· 32 35 blob := e.Preserialized 33 36 34 37 seq := e.Sequence() 38 + log.Infof("persist %d", seq) 39 + 35 40 if seq < 0 { 36 - // drop event 37 - // TODO: persist with longer key? {prev 8 byte key}{int32 extra counter} 38 - return nil 39 - } 41 + // persist with longer key {prev 8 byte key}{int32 extra counter} 42 + pp.prevSeqExtra++ 43 + var key [12]byte 44 + binary.BigEndian.PutUint64(key[:8], uint64(pp.prevSeq)) 45 + binary.BigEndian.PutUint32(key[8:], pp.prevSeqExtra) 40 46 41 - var key [8]byte 42 - binary.BigEndian.PutUint64(key[:], uint64(seq)) 47 + err = pp.db.Set(key[:], blob, pebble.Sync) 48 + return nil 49 + } else { 50 + pp.prevSeq = seq 51 + pp.prevSeqExtra = 0 52 + var key [8]byte 53 + binary.BigEndian.PutUint64(key[:], uint64(seq)) 43 54 44 - err = pp.db.Set(key[:], blob, pebble.Sync) 55 + err = pp.db.Set(key[:], blob, pebble.Sync) 56 + } 45 57 46 58 return err 47 59 }
+14
events/pebblepersist_test.go
··· 1 + package events 2 + 3 + import ( 4 + "gorm.io/gorm" 5 + "path/filepath" 6 + "testing" 7 + ) 8 + 9 + func TestPebblePersist(t *testing.T) { 10 + factory := func(tempPath string, db *gorm.DB) (EventPersistence, error) { 11 + return NewPebblePersistance(filepath.Join(tempPath, "pebble.db")) 12 + } 13 + testPersister(t, factory) 14 + }
+27
pds/data/types.go
··· 1 + package data 2 + 3 + import ( 4 + "github.com/bluesky-social/indigo/models" 5 + "gorm.io/gorm" 6 + "time" 7 + ) 8 + 9 + type User struct { 10 + ID models.Uid `gorm:"primarykey"` 11 + CreatedAt time.Time 12 + UpdatedAt time.Time 13 + DeletedAt gorm.DeletedAt `gorm:"index"` 14 + Handle string `gorm:"uniqueIndex"` 15 + Password string 16 + RecoveryKey string 17 + Email string 18 + Did string `gorm:"uniqueIndex"` 19 + PDS uint 20 + } 21 + 22 + type Peering struct { 23 + gorm.Model 24 + Host string 25 + Did string 26 + Approved bool 27 + }
+3 -18
pds/server.go
··· 21 21 lexutil "github.com/bluesky-social/indigo/lex/util" 22 22 "github.com/bluesky-social/indigo/models" 23 23 "github.com/bluesky-social/indigo/notifs" 24 + pdsdata "github.com/bluesky-social/indigo/pds/data" 24 25 "github.com/bluesky-social/indigo/plc" 25 26 "github.com/bluesky-social/indigo/repomgr" 26 27 "github.com/bluesky-social/indigo/util" ··· 456 457 return c.String(200, u.Did) 457 458 } 458 459 459 - type User struct { 460 - ID models.Uid `gorm:"primarykey"` 461 - CreatedAt time.Time 462 - UpdatedAt time.Time 463 - DeletedAt gorm.DeletedAt `gorm:"index"` 464 - Handle string `gorm:"uniqueIndex"` 465 - Password string 466 - RecoveryKey string 467 - Email string 468 - Did string `gorm:"uniqueIndex"` 469 - PDS uint 470 - } 460 + type User = pdsdata.User 471 461 472 462 type RefreshToken struct { 473 463 gorm.Model ··· 636 626 panic("nyi") 637 627 } 638 628 639 - type Peering struct { 640 - gorm.Model 641 - Host string 642 - Did string 643 - Approved bool 644 - } 629 + type Peering = pdsdata.Peering 645 630 646 631 func (s *Server) EventsHandler(c echo.Context) error { 647 632 conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), c.Response().Header(), 1<<10, 1<<10)