1package testing
2
3import (
4 "bytes"
5 "context"
6 "math/rand"
7 "strings"
8 "testing"
9 "time"
10
11 atproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/events"
13 "github.com/bluesky-social/indigo/repo"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/ipfs/go-cid"
16 car "github.com/ipld/go-car"
17 "github.com/stretchr/testify/assert"
18)
19
20func TestRelayBasic(t *testing.T) {
21 t.Helper()
22 testRelayBasic(t, true)
23}
24
25func TestRelayBasicNonArchive(t *testing.T) {
26 t.Helper()
27 testRelayBasic(t, false)
28}
29
30func testRelayBasic(t *testing.T, archive bool) {
31 if testing.Short() {
32 t.Skip("skipping Relay test in 'short' test mode")
33 }
34 assert := assert.New(t)
35 didr := TestPLC(t)
36 p1 := MustSetupPDS(t, ".tpds", didr)
37 p1.Run(t)
38
39 b1 := MustSetupRelay(t, didr, archive)
40 b1.Run(t)
41
42 b1.tr.TrialHosts = []string{p1.RawHost()}
43
44 p1.RequestScraping(t, b1)
45 p1.BumpLimits(t, b1)
46
47 time.Sleep(time.Millisecond * 50)
48
49 evts := b1.Events(t, -1)
50 defer evts.Cancel()
51
52 bob := p1.MustNewUser(t, "bob.tpds")
53 t.Log("event 1")
54 e1 := evts.Next()
55 assert.NotNil(e1.RepoCommit)
56 assert.Equal(e1.RepoCommit.Repo, bob.DID())
57
58 alice := p1.MustNewUser(t, "alice.tpds")
59 t.Log("event 2")
60 e2 := evts.Next()
61 assert.NotNil(e2.RepoCommit)
62 assert.Equal(e2.RepoCommit.Repo, alice.DID())
63
64 bp1 := bob.Post(t, "cats for cats")
65 ap1 := alice.Post(t, "no i like dogs")
66
67 _ = bp1
68 _ = ap1
69
70 t.Log("bob:", bob.DID())
71 t.Log("event 3")
72 e3 := evts.Next()
73 assert.Equal(e3.RepoCommit.Repo, bob.DID())
74 //assert.Equal(e3.RepoCommit.Ops[0].Kind, "createRecord")
75
76 t.Log("alice:", alice.DID())
77 t.Log("event 4")
78 e4 := evts.Next()
79 assert.Equal(e4.RepoCommit.Repo, alice.DID())
80 //assert.Equal(e4.RepoCommit.Ops[0].Kind, "createRecord")
81
82 // playback
83 pbevts := b1.Events(t, 2)
84 defer pbevts.Cancel()
85
86 t.Log("event 5")
87 pbe1 := pbevts.Next()
88 assert.Equal(*e3, *pbe1)
89}
90
91func randomFollows(t *testing.T, users []*TestUser) {
92 for n := 0; n < 3; n++ {
93 for i, u := range users {
94 oi := rand.Intn(len(users))
95 if i == oi {
96 continue
97 }
98
99 u.Follow(t, users[oi].DID())
100 }
101 }
102}
103
104func socialSim(t *testing.T, users []*TestUser, postiter, likeiter int) []*atproto.RepoStrongRef {
105 var posts []*atproto.RepoStrongRef
106 for i := 0; i < postiter; i++ {
107 for _, u := range users {
108 posts = append(posts, u.Post(t, MakeRandomPost()))
109 }
110 }
111
112 for i := 0; i < likeiter; i++ {
113 for _, u := range users {
114 u.Like(t, posts[rand.Intn(len(posts))])
115 }
116 }
117
118 return posts
119}
120
121func TestRelayMultiPDS(t *testing.T) {
122 t.Helper()
123 testRelayMultiPDS(t, true)
124}
125
126func TestRelayMultiPDSNonArchive(t *testing.T) {
127 t.Helper()
128 testRelayMultiPDS(t, false)
129}
130
131func testRelayMultiPDS(t *testing.T, archive bool) {
132 if testing.Short() {
133 t.Skip("skipping Relay test in 'short' test mode")
134 }
135 //t.Skip("test too sleepy to run in CI for now")
136
137 assert := assert.New(t)
138 _ = assert
139 didr := TestPLC(t)
140 p1 := MustSetupPDS(t, ".pdsuno", didr)
141 p1.Run(t)
142
143 p2 := MustSetupPDS(t, ".pdsdos", didr)
144 p2.Run(t)
145
146 b1 := MustSetupRelay(t, didr, archive)
147 b1.Run(t)
148
149 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()}
150
151 p1.RequestScraping(t, b1)
152 p1.BumpLimits(t, b1)
153 time.Sleep(time.Millisecond * 100)
154
155 var users []*TestUser
156 for i := 0; i < 5; i++ {
157 users = append(users, p1.MustNewUser(t, usernames[i]+".pdsuno"))
158 }
159
160 randomFollows(t, users)
161 socialSim(t, users, 10, 10)
162
163 var users2 []*TestUser
164 for i := 0; i < 5; i++ {
165 users2 = append(users2, p2.MustNewUser(t, usernames[i+5]+".pdsdos"))
166 }
167
168 randomFollows(t, users2)
169 p2posts := socialSim(t, users2, 10, 10)
170
171 randomFollows(t, append(users, users2...))
172
173 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life")
174
175 // now if we make posts on pds 2, the relay will not hear about those new posts
176
177 p2posts2 := socialSim(t, users2, 10, 10)
178
179 time.Sleep(time.Second)
180
181 p2.RequestScraping(t, b1)
182 p2.BumpLimits(t, b1)
183 time.Sleep(time.Millisecond * 50)
184
185 // Now, the relay will discover a gap, and have to catch up somehow
186 socialSim(t, users2, 1, 0)
187
188 // we expect the relay to learn about posts that it did not directly see from
189 // repos its already partially scraped, as long as its seen *something* after the missing post
190 // this is the 'catchup' process
191 _ = p2posts2
192 /* NOTE: BGS doesn't support indexing any more
193 time.Sleep(time.Second)
194 ctx := context.Background()
195 _, err := b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri)
196 if err != nil {
197 t.Fatal(err)
198 }
199 */
200}
201
202func TestRelayMultiGap(t *testing.T) {
203 if testing.Short() {
204 t.Skip("skipping Relay test in 'short' test mode")
205 }
206 //t.Skip("test too sleepy to run in CI for now")
207 assert := assert.New(t)
208 _ = assert
209 didr := TestPLC(t)
210 p1 := MustSetupPDS(t, ".pdsuno", didr)
211 p1.Run(t)
212
213 p2 := MustSetupPDS(t, ".pdsdos", didr)
214 p2.Run(t)
215
216 b1 := MustSetupRelay(t, didr, true)
217 b1.Run(t)
218
219 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()}
220
221 p1.RequestScraping(t, b1)
222 p1.BumpLimits(t, b1)
223 time.Sleep(time.Millisecond * 250)
224
225 users := []*TestUser{p1.MustNewUser(t, usernames[0]+".pdsuno")}
226
227 socialSim(t, users, 10, 0)
228
229 users2 := []*TestUser{p2.MustNewUser(t, usernames[1]+".pdsdos")}
230
231 p2posts := socialSim(t, users2, 10, 0)
232
233 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life")
234
235 /* NOTE: BGS doesn't support indexing any more
236 time.Sleep(time.Second * 2)
237 ctx := context.Background()
238 _, err := b1.bgs.Index.GetPost(ctx, p2posts[3].Uri)
239 if err != nil {
240 t.Fatal(err)
241 }
242 */
243
244 // now if we make posts on pds 2, the relay will not hear about those new posts
245
246 p2posts2 := socialSim(t, users2, 10, 0)
247
248 time.Sleep(time.Second)
249
250 p2.RequestScraping(t, b1)
251 p2.BumpLimits(t, b1)
252 time.Sleep(time.Second * 2)
253
254 // Now, the relay will discover a gap, and have to catch up somehow
255 socialSim(t, users2, 1, 0)
256
257 // we expect the relay to learn about posts that it did not directly see from
258 // repos its already partially scraped, as long as its seen *something* after the missing post
259 // this is the 'catchup' process
260 _ = p2posts2
261 /* NOTE: BGS doesn't support indexing any more
262 time.Sleep(time.Second * 2)
263 _, err = b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri)
264 if err != nil {
265 t.Fatal(err)
266 }
267 */
268}
269
270func TestHandleChange(t *testing.T) {
271 //t.Skip("test too sleepy to run in CI for now")
272 assert := assert.New(t)
273 _ = assert
274 didr := TestPLC(t)
275 p1 := MustSetupPDS(t, ".pdsuno", didr)
276 p1.Run(t)
277
278 b1 := MustSetupRelay(t, didr, true)
279 b1.Run(t)
280
281 b1.tr.TrialHosts = []string{p1.RawHost()}
282
283 p1.RequestScraping(t, b1)
284 p1.BumpLimits(t, b1)
285 time.Sleep(time.Millisecond * 50)
286
287 evts := b1.Events(t, -1)
288
289 u := p1.MustNewUser(t, usernames[0]+".pdsuno")
290
291 // if the handle changes before the relay processes the first event, things
292 // get a little weird
293 time.Sleep(time.Millisecond * 50)
294 //socialSim(t, []*testUser{u}, 10, 0)
295
296 u.ChangeHandle(t, "catbear.pdsuno")
297
298 time.Sleep(time.Millisecond * 100)
299
300 initevt := evts.Next()
301 t.Log(initevt.RepoCommit)
302 idevt := evts.Next()
303 t.Log(idevt.RepoIdentity)
304}
305
306func TestAccountEvent(t *testing.T) {
307 assert := assert.New(t)
308 _ = assert
309 didr := TestPLC(t)
310 p1 := MustSetupPDS(t, ".pdsuno", didr)
311 p1.Run(t)
312
313 b1 := MustSetupRelay(t, didr, true)
314 b1.Run(t)
315
316 b1.tr.TrialHosts = []string{p1.RawHost()}
317
318 p1.RequestScraping(t, b1)
319 p1.BumpLimits(t, b1)
320 time.Sleep(time.Millisecond * 50)
321
322 evts := b1.Events(t, -1)
323
324 u := p1.MustNewUser(t, usernames[0]+".pdsuno")
325
326 // if the handle changes before the relay processes the first event, things
327 // get a little weird
328 time.Sleep(time.Millisecond * 50)
329 //socialSim(t, []*testUser{u}, 10, 0)
330
331 p1.TakedownRepo(t, u.DID())
332 p1.ReactivateRepo(t, u.DID())
333 p1.DeactivateRepo(t, u.DID())
334 p1.ReactivateRepo(t, u.DID())
335 p1.SuspendRepo(t, u.DID())
336 p1.ReactivateRepo(t, u.DID())
337
338 time.Sleep(time.Millisecond * 100)
339
340 initevt := evts.Next()
341 t.Log(initevt.RepoCommit)
342
343 // Takedown
344 acevt := evts.Next()
345 t.Log(acevt.RepoAccount)
346 assert.Equal(acevt.RepoAccount.Did, u.DID())
347 assert.Equal(acevt.RepoAccount.Active, false)
348 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown)
349
350 // Reactivate
351 acevt = evts.Next()
352 t.Log(acevt.RepoAccount)
353 assert.Equal(acevt.RepoAccount.Did, u.DID())
354 assert.Equal(acevt.RepoAccount.Active, true)
355 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
356
357 // Deactivate
358 acevt = evts.Next()
359 t.Log(acevt.RepoAccount)
360 assert.Equal(acevt.RepoAccount.Did, u.DID())
361 assert.Equal(acevt.RepoAccount.Active, false)
362 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusDeactivated)
363
364 // Reactivate
365 acevt = evts.Next()
366 t.Log(acevt.RepoAccount)
367 assert.Equal(acevt.RepoAccount.Did, u.DID())
368 assert.Equal(acevt.RepoAccount.Active, true)
369 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
370
371 // Suspend
372 acevt = evts.Next()
373 t.Log(acevt.RepoAccount)
374 assert.Equal(acevt.RepoAccount.Did, u.DID())
375 assert.Equal(acevt.RepoAccount.Active, false)
376 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusSuspended)
377
378 // Reactivate
379 acevt = evts.Next()
380 t.Log(acevt.RepoAccount)
381 assert.Equal(acevt.RepoAccount.Did, u.DID())
382 assert.Equal(acevt.RepoAccount.Active, true)
383 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
384
385 // Takedown at Relay level, then emit active event and make sure relay overrides it
386 b1.bgs.TakeDownRepo(context.TODO(), u.DID())
387 p1.ReactivateRepo(t, u.DID())
388
389 time.Sleep(time.Millisecond * 20)
390
391 acevt = evts.Next()
392 t.Log(acevt.RepoAccount)
393 assert.Equal(acevt.RepoAccount.Did, u.DID())
394 assert.Equal(acevt.RepoAccount.Active, false)
395 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown)
396
397 // Reactivate at Relay level, then emit an active account event and make sure relay passes it through
398 b1.bgs.ReverseTakedown(context.TODO(), u.DID())
399 p1.ReactivateRepo(t, u.DID())
400
401 time.Sleep(time.Millisecond * 20)
402
403 acevt = evts.Next()
404 t.Log(acevt.RepoAccount)
405 assert.Equal(acevt.RepoAccount.Did, u.DID())
406 assert.Equal(acevt.RepoAccount.Active, true)
407 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
408}
409
410func TestRelayTakedown(t *testing.T) {
411 testRelayTakedown(t, true)
412}
413
414func TestRelayTakedownNonArchive(t *testing.T) {
415 testRelayTakedown(t, false)
416}
417
418func testRelayTakedown(t *testing.T, archive bool) {
419 if testing.Short() {
420 t.Skip("skipping Relay test in 'short' test mode")
421 }
422 assert := assert.New(t)
423 _ = assert
424
425 didr := TestPLC(t)
426 p1 := MustSetupPDS(t, ".tpds", didr)
427 p1.Run(t)
428
429 b1 := MustSetupRelay(t, didr, true)
430 b1.Run(t)
431
432 b1.tr.TrialHosts = []string{p1.RawHost()}
433
434 p1.RequestScraping(t, b1)
435 p1.BumpLimits(t, b1)
436
437 time.Sleep(time.Millisecond * 50)
438 es1 := b1.Events(t, 0)
439
440 bob := p1.MustNewUser(t, "bob.tpds")
441 alice := p1.MustNewUser(t, "alice.tpds")
442
443 bob.Post(t, "cats for cats")
444 alice.Post(t, "no i like dogs")
445 bp2 := bob.Post(t, "im a bad person who deserves to be taken down")
446 bob.Like(t, bp2)
447
448 expCount := 6
449 evts1 := es1.WaitFor(expCount)
450 assert.Equal(expCount, len(evts1))
451
452 assert.NoError(b1.bgs.TakeDownRepo(context.TODO(), bob.did))
453
454 es2 := b1.Events(t, 0)
455 time.Sleep(time.Millisecond * 50) // wait for events to stream in and be collected
456 evts2 := es2.WaitFor(2)
457
458 assert.Equal(2, len(evts2))
459 for _, e := range evts2 {
460 if e.RepoCommit.Repo == bob.did {
461 t.Fatal("events from bob were not removed")
462 }
463 }
464
465 bob.Post(t, "im gonna sneak through being banned")
466 time.Sleep(time.Millisecond * 50)
467 alice.Post(t, "im a normal person")
468 // ensure events from bob dont get through
469
470 last := es2.Next()
471 assert.Equal(alice.did, last.RepoCommit.Repo)
472}
473
474func commitFromSlice(t *testing.T, slice []byte, rcid cid.Cid) *repo.SignedCommit {
475 carr, err := car.NewCarReader(bytes.NewReader(slice))
476 if err != nil {
477 t.Fatal(err)
478 }
479
480 for {
481 blk, err := carr.Next()
482 if err != nil {
483 t.Fatal(err)
484 }
485
486 if blk.Cid() == rcid {
487
488 var sc repo.SignedCommit
489 if err := sc.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
490 t.Fatal(err)
491 }
492 return &sc
493 }
494 }
495}
496
497func TestDomainBans(t *testing.T) {
498 if testing.Short() {
499 t.Skip("skipping Relay test in 'short' test mode")
500 }
501 didr := TestPLC(t)
502
503 b1 := MustSetupRelay(t, didr, true)
504 b1.Run(t)
505
506 b1.BanDomain(t, "foo.com")
507
508 c := &xrpc.Client{Host: "http://" + b1.Host()}
509 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.com"}); err == nil {
510 t.Fatal("domain should be banned")
511 }
512
513 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "pds.foo.com"}); err == nil {
514 t.Fatal("domain should be banned")
515 }
516
517 err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "app.pds.foo.com"})
518 if err == nil {
519 t.Fatal("domain should be banned")
520 }
521
522 if !strings.Contains(err.Error(), "XRPC ERROR 401") {
523 t.Fatal("should have failed with a 401")
524 }
525
526 // should not be banned
527 err = atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.bar.com"})
528 if err == nil {
529 t.Fatal("should still fail")
530 }
531
532 if !strings.Contains(err.Error(), "XRPC ERROR 400") {
533 t.Fatal("should have failed with a 400")
534 }
535}
536
537func TestRelayHandleEmptyEvent(t *testing.T) {
538 if testing.Short() {
539 t.Skip("skipping Relay test in 'short' test mode")
540 }
541 assert := assert.New(t)
542 didr := TestPLC(t)
543 p1 := MustSetupPDS(t, ".tpds", didr)
544 p1.Run(t)
545
546 b1 := MustSetupRelay(t, didr, true)
547 b1.Run(t)
548
549 b1.tr.TrialHosts = []string{p1.RawHost()}
550
551 p1.RequestScraping(t, b1)
552 p1.BumpLimits(t, b1)
553
554 time.Sleep(time.Millisecond * 50)
555
556 evts := b1.Events(t, -1)
557 defer evts.Cancel()
558
559 bob := p1.MustNewUser(t, "bob.tpds")
560 t.Log("event 1")
561 e1 := evts.Next()
562 assert.NotNil(e1.RepoCommit)
563 assert.Equal(e1.RepoCommit.Repo, bob.DID())
564 t.Log(e1.RepoCommit.Ops[0])
565
566 ctx := context.TODO()
567 rm := p1.server.Repoman()
568 if err := rm.BatchWrite(ctx, 1, nil); err != nil {
569 t.Fatal(err)
570 }
571
572 e2 := evts.Next()
573 //t.Log(e2.RepoCommit.Ops[0])
574 assert.Equal(len(e2.RepoCommit.Ops), 0)
575 assert.Equal(e2.RepoCommit.Repo, bob.DID())
576}