porting all github actions from bluesky-social/indigo to tangled CI
at main 14 kB view raw
1package testing 2 3import ( 4 "bytes" 5 "context" 6 "math/rand" 7 "strings" 8 "testing" 9 "time" 10 11 atproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/events" 13 "github.com/bluesky-social/indigo/repo" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/ipfs/go-cid" 16 car "github.com/ipld/go-car" 17 "github.com/stretchr/testify/assert" 18) 19 20func TestRelayBasic(t *testing.T) { 21 t.Helper() 22 testRelayBasic(t, true) 23} 24 25func TestRelayBasicNonArchive(t *testing.T) { 26 t.Helper() 27 testRelayBasic(t, false) 28} 29 30func testRelayBasic(t *testing.T, archive bool) { 31 if testing.Short() { 32 t.Skip("skipping Relay test in 'short' test mode") 33 } 34 assert := assert.New(t) 35 didr := TestPLC(t) 36 p1 := MustSetupPDS(t, ".tpds", didr) 37 p1.Run(t) 38 39 b1 := MustSetupRelay(t, didr, archive) 40 b1.Run(t) 41 42 b1.tr.TrialHosts = []string{p1.RawHost()} 43 44 p1.RequestScraping(t, b1) 45 p1.BumpLimits(t, b1) 46 47 time.Sleep(time.Millisecond * 50) 48 49 evts := b1.Events(t, -1) 50 defer evts.Cancel() 51 52 bob := p1.MustNewUser(t, "bob.tpds") 53 t.Log("event 1") 54 e1 := evts.Next() 55 assert.NotNil(e1.RepoCommit) 56 assert.Equal(e1.RepoCommit.Repo, bob.DID()) 57 58 alice := p1.MustNewUser(t, "alice.tpds") 59 t.Log("event 2") 60 e2 := evts.Next() 61 assert.NotNil(e2.RepoCommit) 62 assert.Equal(e2.RepoCommit.Repo, alice.DID()) 63 64 bp1 := bob.Post(t, "cats for cats") 65 ap1 := alice.Post(t, "no i like dogs") 66 67 _ = bp1 68 _ = ap1 69 70 t.Log("bob:", bob.DID()) 71 t.Log("event 3") 72 e3 := evts.Next() 73 assert.Equal(e3.RepoCommit.Repo, bob.DID()) 74 //assert.Equal(e3.RepoCommit.Ops[0].Kind, "createRecord") 75 76 t.Log("alice:", alice.DID()) 77 t.Log("event 4") 78 e4 := evts.Next() 79 assert.Equal(e4.RepoCommit.Repo, alice.DID()) 80 //assert.Equal(e4.RepoCommit.Ops[0].Kind, "createRecord") 81 82 // playback 83 pbevts := b1.Events(t, 2) 84 defer pbevts.Cancel() 85 86 t.Log("event 5") 87 pbe1 := pbevts.Next() 88 assert.Equal(*e3, *pbe1) 89} 90 91func randomFollows(t *testing.T, users []*TestUser) { 92 for n := 0; n < 3; n++ { 93 for i, u := range users { 94 oi := rand.Intn(len(users)) 95 if i == oi { 96 continue 97 } 98 99 u.Follow(t, users[oi].DID()) 100 } 101 } 102} 103 104func socialSim(t *testing.T, users []*TestUser, postiter, likeiter int) []*atproto.RepoStrongRef { 105 var posts []*atproto.RepoStrongRef 106 for i := 0; i < postiter; i++ { 107 for _, u := range users { 108 posts = append(posts, u.Post(t, MakeRandomPost())) 109 } 110 } 111 112 for i := 0; i < likeiter; i++ { 113 for _, u := range users { 114 u.Like(t, posts[rand.Intn(len(posts))]) 115 } 116 } 117 118 return posts 119} 120 121func TestRelayMultiPDS(t *testing.T) { 122 t.Helper() 123 testRelayMultiPDS(t, true) 124} 125 126func TestRelayMultiPDSNonArchive(t *testing.T) { 127 t.Helper() 128 testRelayMultiPDS(t, false) 129} 130 131func testRelayMultiPDS(t *testing.T, archive bool) { 132 if testing.Short() { 133 t.Skip("skipping Relay test in 'short' test mode") 134 } 135 //t.Skip("test too sleepy to run in CI for now") 136 137 assert := assert.New(t) 138 _ = assert 139 didr := TestPLC(t) 140 p1 := MustSetupPDS(t, ".pdsuno", didr) 141 p1.Run(t) 142 143 p2 := MustSetupPDS(t, ".pdsdos", didr) 144 p2.Run(t) 145 146 b1 := MustSetupRelay(t, didr, archive) 147 b1.Run(t) 148 149 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()} 150 151 p1.RequestScraping(t, b1) 152 p1.BumpLimits(t, b1) 153 time.Sleep(time.Millisecond * 100) 154 155 var users []*TestUser 156 for i := 0; i < 5; i++ { 157 users = append(users, p1.MustNewUser(t, usernames[i]+".pdsuno")) 158 } 159 160 randomFollows(t, users) 161 socialSim(t, users, 10, 10) 162 163 var users2 []*TestUser 164 for i := 0; i < 5; i++ { 165 users2 = append(users2, p2.MustNewUser(t, usernames[i+5]+".pdsdos")) 166 } 167 168 randomFollows(t, users2) 169 p2posts := socialSim(t, users2, 10, 10) 170 171 randomFollows(t, append(users, users2...)) 172 173 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life") 174 175 // now if we make posts on pds 2, the relay will not hear about those new posts 176 177 p2posts2 := socialSim(t, users2, 10, 10) 178 179 time.Sleep(time.Second) 180 181 p2.RequestScraping(t, b1) 182 p2.BumpLimits(t, b1) 183 time.Sleep(time.Millisecond * 50) 184 185 // Now, the relay will discover a gap, and have to catch up somehow 186 socialSim(t, users2, 1, 0) 187 188 // we expect the relay to learn about posts that it did not directly see from 189 // repos its already partially scraped, as long as its seen *something* after the missing post 190 // this is the 'catchup' process 191 _ = p2posts2 192 /* NOTE: BGS doesn't support indexing any more 193 time.Sleep(time.Second) 194 ctx := context.Background() 195 _, err := b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri) 196 if err != nil { 197 t.Fatal(err) 198 } 199 */ 200} 201 202func TestRelayMultiGap(t *testing.T) { 203 if testing.Short() { 204 t.Skip("skipping Relay test in 'short' test mode") 205 } 206 //t.Skip("test too sleepy to run in CI for now") 207 assert := assert.New(t) 208 _ = assert 209 didr := TestPLC(t) 210 p1 := MustSetupPDS(t, ".pdsuno", didr) 211 p1.Run(t) 212 213 p2 := MustSetupPDS(t, ".pdsdos", didr) 214 p2.Run(t) 215 216 b1 := MustSetupRelay(t, didr, true) 217 b1.Run(t) 218 219 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()} 220 221 p1.RequestScraping(t, b1) 222 p1.BumpLimits(t, b1) 223 time.Sleep(time.Millisecond * 250) 224 225 users := []*TestUser{p1.MustNewUser(t, usernames[0]+".pdsuno")} 226 227 socialSim(t, users, 10, 0) 228 229 users2 := []*TestUser{p2.MustNewUser(t, usernames[1]+".pdsdos")} 230 231 p2posts := socialSim(t, users2, 10, 0) 232 233 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life") 234 235 /* NOTE: BGS doesn't support indexing any more 236 time.Sleep(time.Second * 2) 237 ctx := context.Background() 238 _, err := b1.bgs.Index.GetPost(ctx, p2posts[3].Uri) 239 if err != nil { 240 t.Fatal(err) 241 } 242 */ 243 244 // now if we make posts on pds 2, the relay will not hear about those new posts 245 246 p2posts2 := socialSim(t, users2, 10, 0) 247 248 time.Sleep(time.Second) 249 250 p2.RequestScraping(t, b1) 251 p2.BumpLimits(t, b1) 252 time.Sleep(time.Second * 2) 253 254 // Now, the relay will discover a gap, and have to catch up somehow 255 socialSim(t, users2, 1, 0) 256 257 // we expect the relay to learn about posts that it did not directly see from 258 // repos its already partially scraped, as long as its seen *something* after the missing post 259 // this is the 'catchup' process 260 _ = p2posts2 261 /* NOTE: BGS doesn't support indexing any more 262 time.Sleep(time.Second * 2) 263 _, err = b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri) 264 if err != nil { 265 t.Fatal(err) 266 } 267 */ 268} 269 270func TestHandleChange(t *testing.T) { 271 //t.Skip("test too sleepy to run in CI for now") 272 assert := assert.New(t) 273 _ = assert 274 didr := TestPLC(t) 275 p1 := MustSetupPDS(t, ".pdsuno", didr) 276 p1.Run(t) 277 278 b1 := MustSetupRelay(t, didr, true) 279 b1.Run(t) 280 281 b1.tr.TrialHosts = []string{p1.RawHost()} 282 283 p1.RequestScraping(t, b1) 284 p1.BumpLimits(t, b1) 285 time.Sleep(time.Millisecond * 50) 286 287 evts := b1.Events(t, -1) 288 289 u := p1.MustNewUser(t, usernames[0]+".pdsuno") 290 291 // if the handle changes before the relay processes the first event, things 292 // get a little weird 293 time.Sleep(time.Millisecond * 50) 294 //socialSim(t, []*testUser{u}, 10, 0) 295 296 u.ChangeHandle(t, "catbear.pdsuno") 297 298 time.Sleep(time.Millisecond * 100) 299 300 initevt := evts.Next() 301 t.Log(initevt.RepoCommit) 302 idevt := evts.Next() 303 t.Log(idevt.RepoIdentity) 304} 305 306func TestAccountEvent(t *testing.T) { 307 assert := assert.New(t) 308 _ = assert 309 didr := TestPLC(t) 310 p1 := MustSetupPDS(t, ".pdsuno", didr) 311 p1.Run(t) 312 313 b1 := MustSetupRelay(t, didr, true) 314 b1.Run(t) 315 316 b1.tr.TrialHosts = []string{p1.RawHost()} 317 318 p1.RequestScraping(t, b1) 319 p1.BumpLimits(t, b1) 320 time.Sleep(time.Millisecond * 50) 321 322 evts := b1.Events(t, -1) 323 324 u := p1.MustNewUser(t, usernames[0]+".pdsuno") 325 326 // if the handle changes before the relay processes the first event, things 327 // get a little weird 328 time.Sleep(time.Millisecond * 50) 329 //socialSim(t, []*testUser{u}, 10, 0) 330 331 p1.TakedownRepo(t, u.DID()) 332 p1.ReactivateRepo(t, u.DID()) 333 p1.DeactivateRepo(t, u.DID()) 334 p1.ReactivateRepo(t, u.DID()) 335 p1.SuspendRepo(t, u.DID()) 336 p1.ReactivateRepo(t, u.DID()) 337 338 time.Sleep(time.Millisecond * 100) 339 340 initevt := evts.Next() 341 t.Log(initevt.RepoCommit) 342 343 // Takedown 344 acevt := evts.Next() 345 t.Log(acevt.RepoAccount) 346 assert.Equal(acevt.RepoAccount.Did, u.DID()) 347 assert.Equal(acevt.RepoAccount.Active, false) 348 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) 349 350 // Reactivate 351 acevt = evts.Next() 352 t.Log(acevt.RepoAccount) 353 assert.Equal(acevt.RepoAccount.Did, u.DID()) 354 assert.Equal(acevt.RepoAccount.Active, true) 355 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 356 357 // Deactivate 358 acevt = evts.Next() 359 t.Log(acevt.RepoAccount) 360 assert.Equal(acevt.RepoAccount.Did, u.DID()) 361 assert.Equal(acevt.RepoAccount.Active, false) 362 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusDeactivated) 363 364 // Reactivate 365 acevt = evts.Next() 366 t.Log(acevt.RepoAccount) 367 assert.Equal(acevt.RepoAccount.Did, u.DID()) 368 assert.Equal(acevt.RepoAccount.Active, true) 369 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 370 371 // Suspend 372 acevt = evts.Next() 373 t.Log(acevt.RepoAccount) 374 assert.Equal(acevt.RepoAccount.Did, u.DID()) 375 assert.Equal(acevt.RepoAccount.Active, false) 376 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusSuspended) 377 378 // Reactivate 379 acevt = evts.Next() 380 t.Log(acevt.RepoAccount) 381 assert.Equal(acevt.RepoAccount.Did, u.DID()) 382 assert.Equal(acevt.RepoAccount.Active, true) 383 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 384 385 // Takedown at Relay level, then emit active event and make sure relay overrides it 386 b1.bgs.TakeDownRepo(context.TODO(), u.DID()) 387 p1.ReactivateRepo(t, u.DID()) 388 389 time.Sleep(time.Millisecond * 20) 390 391 acevt = evts.Next() 392 t.Log(acevt.RepoAccount) 393 assert.Equal(acevt.RepoAccount.Did, u.DID()) 394 assert.Equal(acevt.RepoAccount.Active, false) 395 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) 396 397 // Reactivate at Relay level, then emit an active account event and make sure relay passes it through 398 b1.bgs.ReverseTakedown(context.TODO(), u.DID()) 399 p1.ReactivateRepo(t, u.DID()) 400 401 time.Sleep(time.Millisecond * 20) 402 403 acevt = evts.Next() 404 t.Log(acevt.RepoAccount) 405 assert.Equal(acevt.RepoAccount.Did, u.DID()) 406 assert.Equal(acevt.RepoAccount.Active, true) 407 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 408} 409 410func TestRelayTakedown(t *testing.T) { 411 testRelayTakedown(t, true) 412} 413 414func TestRelayTakedownNonArchive(t *testing.T) { 415 testRelayTakedown(t, false) 416} 417 418func testRelayTakedown(t *testing.T, archive bool) { 419 if testing.Short() { 420 t.Skip("skipping Relay test in 'short' test mode") 421 } 422 assert := assert.New(t) 423 _ = assert 424 425 didr := TestPLC(t) 426 p1 := MustSetupPDS(t, ".tpds", didr) 427 p1.Run(t) 428 429 b1 := MustSetupRelay(t, didr, true) 430 b1.Run(t) 431 432 b1.tr.TrialHosts = []string{p1.RawHost()} 433 434 p1.RequestScraping(t, b1) 435 p1.BumpLimits(t, b1) 436 437 time.Sleep(time.Millisecond * 50) 438 es1 := b1.Events(t, 0) 439 440 bob := p1.MustNewUser(t, "bob.tpds") 441 alice := p1.MustNewUser(t, "alice.tpds") 442 443 bob.Post(t, "cats for cats") 444 alice.Post(t, "no i like dogs") 445 bp2 := bob.Post(t, "im a bad person who deserves to be taken down") 446 bob.Like(t, bp2) 447 448 expCount := 6 449 evts1 := es1.WaitFor(expCount) 450 assert.Equal(expCount, len(evts1)) 451 452 assert.NoError(b1.bgs.TakeDownRepo(context.TODO(), bob.did)) 453 454 es2 := b1.Events(t, 0) 455 time.Sleep(time.Millisecond * 50) // wait for events to stream in and be collected 456 evts2 := es2.WaitFor(2) 457 458 assert.Equal(2, len(evts2)) 459 for _, e := range evts2 { 460 if e.RepoCommit.Repo == bob.did { 461 t.Fatal("events from bob were not removed") 462 } 463 } 464 465 bob.Post(t, "im gonna sneak through being banned") 466 time.Sleep(time.Millisecond * 50) 467 alice.Post(t, "im a normal person") 468 // ensure events from bob dont get through 469 470 last := es2.Next() 471 assert.Equal(alice.did, last.RepoCommit.Repo) 472} 473 474func commitFromSlice(t *testing.T, slice []byte, rcid cid.Cid) *repo.SignedCommit { 475 carr, err := car.NewCarReader(bytes.NewReader(slice)) 476 if err != nil { 477 t.Fatal(err) 478 } 479 480 for { 481 blk, err := carr.Next() 482 if err != nil { 483 t.Fatal(err) 484 } 485 486 if blk.Cid() == rcid { 487 488 var sc repo.SignedCommit 489 if err := sc.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil { 490 t.Fatal(err) 491 } 492 return &sc 493 } 494 } 495} 496 497func TestDomainBans(t *testing.T) { 498 if testing.Short() { 499 t.Skip("skipping Relay test in 'short' test mode") 500 } 501 didr := TestPLC(t) 502 503 b1 := MustSetupRelay(t, didr, true) 504 b1.Run(t) 505 506 b1.BanDomain(t, "foo.com") 507 508 c := &xrpc.Client{Host: "http://" + b1.Host()} 509 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.com"}); err == nil { 510 t.Fatal("domain should be banned") 511 } 512 513 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "pds.foo.com"}); err == nil { 514 t.Fatal("domain should be banned") 515 } 516 517 err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "app.pds.foo.com"}) 518 if err == nil { 519 t.Fatal("domain should be banned") 520 } 521 522 if !strings.Contains(err.Error(), "XRPC ERROR 401") { 523 t.Fatal("should have failed with a 401") 524 } 525 526 // should not be banned 527 err = atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.bar.com"}) 528 if err == nil { 529 t.Fatal("should still fail") 530 } 531 532 if !strings.Contains(err.Error(), "XRPC ERROR 400") { 533 t.Fatal("should have failed with a 400") 534 } 535} 536 537func TestRelayHandleEmptyEvent(t *testing.T) { 538 if testing.Short() { 539 t.Skip("skipping Relay test in 'short' test mode") 540 } 541 assert := assert.New(t) 542 didr := TestPLC(t) 543 p1 := MustSetupPDS(t, ".tpds", didr) 544 p1.Run(t) 545 546 b1 := MustSetupRelay(t, didr, true) 547 b1.Run(t) 548 549 b1.tr.TrialHosts = []string{p1.RawHost()} 550 551 p1.RequestScraping(t, b1) 552 p1.BumpLimits(t, b1) 553 554 time.Sleep(time.Millisecond * 50) 555 556 evts := b1.Events(t, -1) 557 defer evts.Cancel() 558 559 bob := p1.MustNewUser(t, "bob.tpds") 560 t.Log("event 1") 561 e1 := evts.Next() 562 assert.NotNil(e1.RepoCommit) 563 assert.Equal(e1.RepoCommit.Repo, bob.DID()) 564 t.Log(e1.RepoCommit.Ops[0]) 565 566 ctx := context.TODO() 567 rm := p1.server.Repoman() 568 if err := rm.BatchWrite(ctx, 1, nil); err != nil { 569 t.Fatal(err) 570 } 571 572 e2 := evts.Next() 573 //t.Log(e2.RepoCommit.Ops[0]) 574 assert.Equal(len(e2.RepoCommit.Ops), 0) 575 assert.Equal(e2.RepoCommit.Repo, bob.DID()) 576}