a love letter to tangled (android, iOS, and a search API)
at main 585 lines 17 kB view raw
1package backfill 2 3import ( 4 "context" 5 "errors" 6 "io" 7 "log/slog" 8 "os" 9 "path/filepath" 10 "strings" 11 "testing" 12 13 "tangled.org/desertthunder.dev/twister/internal/store" 14) 15 16type fakeStore struct { 17 collaborators map[string][]string 18 identities map[string]string 19 documents []*store.Document 20} 21 22func (f *fakeStore) GetRepoCollaborators(_ context.Context, did string) ([]string, error) { 23 return f.collaborators[did], nil 24} 25 26func (f *fakeStore) UpsertIdentityHandle(_ context.Context, did, handle string, _ bool, _ string) error { 27 if f.identities == nil { 28 f.identities = map[string]string{} 29 } 30 f.identities[did] = handle 31 return nil 32} 33 34func (f *fakeStore) UpsertDocument(_ context.Context, doc *store.Document) error { 35 f.documents = append(f.documents, doc) 36 return nil 37} 38 39type fakeFollowFetcher struct { 40 follows map[string][]string 41} 42 43func (f *fakeFollowFetcher) ListFollowSubjects(_ context.Context, did string) ([]string, error) { 44 return f.follows[did], nil 45} 46 47type fakeTapAdmin struct { 48 statuses map[string]RepoStatus 49 statusErrs map[string]error 50 added [][]string 51 addReposError func(dids []string) error 52} 53 54func (f *fakeTapAdmin) RepoStatus(_ context.Context, did string) (RepoStatus, error) { 55 if err, ok := f.statusErrs[did]; ok { 56 return RepoStatus{}, err 57 } 58 if status, ok := f.statuses[did]; ok { 59 return status, nil 60 } 61 return RepoStatus{Found: false, Tracked: false}, nil 62} 63 64func (f *fakeTapAdmin) AddRepos(_ context.Context, dids []string) error { 65 if f.addReposError != nil { 66 if err := f.addReposError(dids); err != nil { 67 return err 68 } 69 } 70 batch := make([]string, len(dids)) 71 copy(batch, dids) 72 f.added = append(f.added, batch) 73 return nil 74} 75 76type fakeResolver struct { 77 mapping map[string]string 78} 79 80func (r *fakeResolver) Resolve(_ context.Context, handle string) (string, error) { 81 if did, ok := r.mapping[handle]; ok { 82 return did, nil 83 } 84 return "", io.EOF 85} 86 87type fakeProfileFetcher struct { 88 profiles map[string]*ProfileRecord 89} 90 91func (f *fakeProfileFetcher) FetchProfile(_ context.Context, did string) (*ProfileRecord, error) { 92 if pr, ok := f.profiles[did]; ok { 93 return pr, nil 94 } 95 return &ProfileRecord{}, nil 96} 97 98type fakeRepoFetcher struct { 99 repos map[string][]RepoRecord 100} 101 102func (f *fakeRepoFetcher) ListRepos(_ context.Context, did string) ([]RepoRecord, error) { 103 if repos, ok := f.repos[did]; ok { 104 return repos, nil 105 } 106 return nil, nil 107} 108 109type fakeLightrailRepoLister struct { 110 dids []string 111 err error 112 calls int 113 baseURL string 114 collections []string 115 limit int 116} 117 118func (f *fakeLightrailRepoLister) ListReposByCollection( 119 _ context.Context, baseURL string, collections []string, limit int, 120) ([]string, error) { 121 f.calls++ 122 f.baseURL = baseURL 123 f.collections = append([]string(nil), collections...) 124 f.limit = limit 125 if f.err != nil { 126 return nil, f.err 127 } 128 return append([]string(nil), f.dids...), nil 129} 130 131func TestRunner_DiscoveryAndSubmit(t *testing.T) { 132 st := &fakeStore{ 133 collaborators: map[string][]string{ 134 "did:plc:seed": {"did:plc:c1"}, 135 }, 136 } 137 follows := &fakeFollowFetcher{follows: map[string][]string{"did:plc:seed": {"did:plc:f1"}}} 138 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{"did:plc:f1": {Found: true, Tracked: true, Backfilled: true}}} 139 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 140 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 141 r := NewRunnerWithDeps( 142 st, tap, resolver, follows, 143 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 144 &fakeRepoFetcher{}, 145 &fakeLightrailRepoLister{}, log, 146 ) 147 148 dir := t.TempDir() 149 seedsPath := filepath.Join(dir, "seeds.txt") 150 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 151 t.Fatalf("write seeds: %v", err) 152 } 153 154 err := r.Run(context.Background(), Options{ 155 SeedsPath: seedsPath, 156 MaxHops: 1, 157 Concurrency: 2, 158 BatchSize: 2, 159 Source: SourceGraph, 160 }) 161 if err != nil { 162 t.Fatalf("run backfill: %v", err) 163 } 164 165 if len(tap.added) != 1 { 166 t.Fatalf("expected one batch, got %d", len(tap.added)) 167 } 168 if len(tap.added[0]) != 2 { 169 t.Fatalf("expected 2 dids submitted, got %#v", tap.added[0]) 170 } 171} 172 173func TestRunner_DryRunSkipsMutations(t *testing.T) { 174 st := &fakeStore{collaborators: map[string][]string{}} 175 follows := &fakeFollowFetcher{follows: map[string][]string{}} 176 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}} 177 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 178 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 179 r := NewRunnerWithDeps( 180 st, tap, resolver, follows, 181 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 182 &fakeRepoFetcher{}, 183 &fakeLightrailRepoLister{}, log, 184 ) 185 186 dir := t.TempDir() 187 seedsPath := filepath.Join(dir, "seeds.txt") 188 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 189 t.Fatalf("write seeds: %v", err) 190 } 191 192 err := r.Run(context.Background(), Options{ 193 SeedsPath: seedsPath, 194 MaxHops: 0, 195 DryRun: true, 196 Concurrency: 1, 197 BatchSize: 10, 198 Source: SourceGraph, 199 }) 200 if err != nil { 201 t.Fatalf("run dry-run backfill: %v", err) 202 } 203 if len(tap.added) != 0 { 204 t.Fatalf("expected no tap submissions in dry-run, got %#v", tap.added) 205 } 206} 207 208func TestRunner_SkipsInProgressBackfills(t *testing.T) { 209 st := &fakeStore{collaborators: map[string][]string{}} 210 follows := &fakeFollowFetcher{follows: map[string][]string{}} 211 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{ 212 "did:plc:seed": {Found: true, Tracked: true, Backfilling: true}, 213 }} 214 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 215 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 216 r := NewRunnerWithDeps( 217 st, tap, resolver, follows, 218 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 219 &fakeRepoFetcher{}, 220 &fakeLightrailRepoLister{}, log, 221 ) 222 223 dir := t.TempDir() 224 seedsPath := filepath.Join(dir, "seeds.txt") 225 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 226 t.Fatalf("write seeds: %v", err) 227 } 228 229 err := r.Run(context.Background(), Options{ 230 SeedsPath: seedsPath, MaxHops: 0, Source: SourceGraph, 231 }) 232 if err != nil { 233 t.Fatalf("run backfill: %v", err) 234 } 235 if len(tap.added) != 0 { 236 t.Fatalf("expected no submission for in-progress did, got %#v", tap.added) 237 } 238} 239 240func TestRunner_ContinuesWhenRepoStatusFails(t *testing.T) { 241 st := &fakeStore{ 242 collaborators: map[string][]string{ 243 "did:plc:seed": {"did:plc:good", "did:plc:bad"}, 244 }, 245 } 246 follows := &fakeFollowFetcher{follows: map[string][]string{}} 247 tap := &fakeTapAdmin{ 248 statuses: map[string]RepoStatus{}, 249 statusErrs: map[string]error{"did:plc:bad": errors.New("tap info request failed: status 502")}, 250 } 251 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 252 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 253 r := NewRunnerWithDeps( 254 st, tap, resolver, follows, 255 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 256 &fakeRepoFetcher{}, 257 &fakeLightrailRepoLister{}, log, 258 ) 259 260 dir := t.TempDir() 261 seedsPath := filepath.Join(dir, "seeds.txt") 262 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 263 t.Fatalf("write seeds: %v", err) 264 } 265 266 err := r.Run(context.Background(), Options{ 267 SeedsPath: seedsPath, 268 MaxHops: 1, 269 Concurrency: 1, 270 BatchSize: 10, 271 Source: SourceGraph, 272 }) 273 if err != nil { 274 t.Fatalf("run backfill: %v", err) 275 } 276 277 if len(tap.added) != 1 { 278 t.Fatalf("expected one submission batch, got %d", len(tap.added)) 279 } 280 if len(tap.added[0]) != 2 { 281 t.Fatalf("expected seed and good DID submitted, got %#v", tap.added[0]) 282 } 283 for _, did := range tap.added[0] { 284 if did == "did:plc:bad" { 285 t.Fatalf("did with status error should have been skipped, got %#v", tap.added[0]) 286 } 287 } 288} 289 290func TestRunner_FallsBackToSingleRepoSubmissionOnBatchFailure(t *testing.T) { 291 st := &fakeStore{ 292 collaborators: map[string][]string{ 293 "did:plc:seed": {"did:plc:good", "did:plc:bad"}, 294 }, 295 } 296 follows := &fakeFollowFetcher{follows: map[string][]string{}} 297 tap := &fakeTapAdmin{ 298 statuses: map[string]RepoStatus{}, 299 addReposError: func(dids []string) error { 300 if len(dids) > 1 { 301 return errors.New("repos add failed: status 502") 302 } 303 if len(dids) == 1 && strings.Contains(dids[0], "bad") { 304 return errors.New("repos add failed: status 502") 305 } 306 return nil 307 }, 308 } 309 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 310 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 311 r := NewRunnerWithDeps( 312 st, tap, resolver, follows, 313 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}}, 314 &fakeRepoFetcher{}, 315 &fakeLightrailRepoLister{}, log, 316 ) 317 318 dir := t.TempDir() 319 seedsPath := filepath.Join(dir, "seeds.txt") 320 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 321 t.Fatalf("write seeds: %v", err) 322 } 323 324 err := r.Run(context.Background(), Options{ 325 SeedsPath: seedsPath, 326 MaxHops: 1, 327 Concurrency: 1, 328 BatchSize: 10, 329 Source: SourceGraph, 330 }) 331 if err != nil { 332 t.Fatalf("run backfill: %v", err) 333 } 334 335 if len(tap.added) != 2 { 336 t.Fatalf("expected successful individual fallbacks only, got %#v", tap.added) 337 } 338 for _, batch := range tap.added { 339 if len(batch) != 1 { 340 t.Fatalf("expected only single-DID successful submissions after batch fallback, got %#v", tap.added) 341 } 342 if batch[0] == "did:plc:bad" { 343 t.Fatalf("bad DID should not have been successfully submitted, got %#v", tap.added) 344 } 345 } 346} 347 348func TestRunner_IndexesProfilesAndHandles(t *testing.T) { 349 st := &fakeStore{collaborators: map[string][]string{}} 350 follows := &fakeFollowFetcher{follows: map[string][]string{}} 351 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}} 352 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 353 profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{ 354 "did:plc:seed": { 355 Record: map[string]any{ 356 "description": "Building cool stuff", 357 "location": "NYC", 358 }, 359 CID: "bafyabc123", 360 Handle: "alice.tangled.sh", 361 }, 362 }} 363 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 364 r := NewRunnerWithDeps( 365 st, tap, resolver, follows, profiles, &fakeRepoFetcher{}, &fakeLightrailRepoLister{}, log, 366 ) 367 368 dir := t.TempDir() 369 seedsPath := filepath.Join(dir, "seeds.txt") 370 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 371 t.Fatalf("write seeds: %v", err) 372 } 373 374 err := r.Run(context.Background(), Options{ 375 SeedsPath: seedsPath, MaxHops: 0, Source: SourceGraph, 376 }) 377 if err != nil { 378 t.Fatalf("run backfill: %v", err) 379 } 380 381 if st.identities["did:plc:seed"] != "alice.tangled.sh" { 382 t.Fatalf("expected identity handle for seed DID, got %#v", st.identities) 383 } 384 385 if len(st.documents) != 1 { 386 t.Fatalf("expected 1 profile document, got %d", len(st.documents)) 387 } 388 doc := st.documents[0] 389 if doc.Title != "alice.tangled.sh" { 390 t.Errorf("expected title to be handle, got %q", doc.Title) 391 } 392 if doc.AuthorHandle != "alice.tangled.sh" { 393 t.Errorf("expected author_handle to be handle, got %q", doc.AuthorHandle) 394 } 395 if doc.Body != "Building cool stuff" { 396 t.Errorf("expected body to be description, got %q", doc.Body) 397 } 398 if doc.RecordType != "profile" { 399 t.Errorf("expected record_type profile, got %q", doc.RecordType) 400 } 401 if !strings.Contains(doc.Summary, "NYC") { 402 t.Errorf("expected summary to contain location, got %q", doc.Summary) 403 } 404} 405 406func TestRunner_LightrailDryRunSkipsTapAndProfileIndexing(t *testing.T) { 407 st := &fakeStore{collaborators: map[string][]string{}} 408 tap := &fakeTapAdmin{ 409 statusErrs: map[string]error{"did:plc:a": errors.New("should not be called")}, 410 } 411 lightrail := &fakeLightrailRepoLister{dids: []string{"did:plc:b", "did:plc:a"}} 412 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 413 r := NewRunnerWithDeps( 414 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, &fakeRepoFetcher{}, 415 lightrail, log, 416 ) 417 418 err := r.Run(context.Background(), Options{ 419 Source: SourceLightrail, DryRun: true, 420 LightrailURL: "https://example.test", PageLimit: 500, 421 }) 422 if err != nil { 423 t.Fatalf("run lightrail dry-run: %v", err) 424 } 425 if len(tap.added) != 0 { 426 t.Fatalf("expected no Tap submissions, got %#v", tap.added) 427 } 428 if len(st.documents) != 0 { 429 t.Fatalf("expected no profile indexing, got %#v", st.documents) 430 } 431 if lightrail.calls != 1 { 432 t.Fatalf("expected one Lightrail call, got %d", lightrail.calls) 433 } 434 if len(lightrail.collections) != len(DefaultCollections) { 435 t.Fatalf("expected default collections, got %#v", lightrail.collections) 436 } 437} 438 439func TestRunner_LightrailSubmitsWithoutRepoStatusChecks(t *testing.T) { 440 st := &fakeStore{collaborators: map[string][]string{}} 441 tap := &fakeTapAdmin{ 442 statusErrs: map[string]error{ 443 "did:plc:a": errors.New("RepoStatus should not be called in lightrail mode"), 444 }, 445 } 446 lightrail := &fakeLightrailRepoLister{ 447 dids: []string{"did:plc:b", "did:plc:a", "did:plc:b"}, 448 } 449 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 450 r := NewRunnerWithDeps( 451 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, &fakeRepoFetcher{}, 452 lightrail, log, 453 ) 454 455 err := r.Run(context.Background(), Options{ 456 Source: SourceLightrail, BatchSize: 10, 457 Collections: []string{"sh.tangled.repo"}, 458 }) 459 if err != nil { 460 t.Fatalf("run lightrail backfill: %v", err) 461 } 462 if len(tap.added) != 1 { 463 t.Fatalf("expected one Tap batch, got %#v", tap.added) 464 } 465 if len(tap.added[0]) != 2 { 466 t.Fatalf("expected deduped DIDs, got %#v", tap.added) 467 } 468} 469 470func TestRunner_LightrailIndexesProfiles(t *testing.T) { 471 st := &fakeStore{collaborators: map[string][]string{}} 472 tap := &fakeTapAdmin{} 473 lightrail := &fakeLightrailRepoLister{ 474 dids: []string{"did:plc:xg2vq45muivyy3xwatcehspu"}, 475 } 476 profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{ 477 "did:plc:xg2vq45muivyy3xwatcehspu": { 478 Record: map[string]any{ 479 "description": "Twisted maintainer", 480 "location": "Chicago", 481 }, 482 CID: "bafydesert123", 483 Handle: "desertthunder.dev", 484 }, 485 }} 486 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 487 r := NewRunnerWithDeps( 488 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, profiles, &fakeRepoFetcher{}, lightrail, log, 489 ) 490 491 err := r.Run(context.Background(), Options{ 492 Source: SourceLightrail, 493 BatchSize: 10, 494 Concurrency: 1, 495 Collections: []string{"sh.tangled.repo"}, 496 }) 497 if err != nil { 498 t.Fatalf("run lightrail backfill: %v", err) 499 } 500 if st.identities["did:plc:xg2vq45muivyy3xwatcehspu"] != "desertthunder.dev" { 501 t.Fatalf("expected identity handle to be stored, got %#v", st.identities) 502 } 503 if len(st.documents) != 1 { 504 t.Fatalf("expected one profile document, got %d", len(st.documents)) 505 } 506 doc := st.documents[0] 507 if doc.RecordType != "profile" { 508 t.Fatalf("expected profile document, got %q", doc.RecordType) 509 } 510 if doc.AuthorHandle != "desertthunder.dev" { 511 t.Fatalf("expected author_handle desertthunder.dev, got %q", doc.AuthorHandle) 512 } 513 if doc.Title != "desertthunder.dev" { 514 t.Fatalf("expected title desertthunder.dev, got %q", doc.Title) 515 } 516} 517 518func TestRunner_LightrailIndexesReposDirectly(t *testing.T) { 519 st := &fakeStore{collaborators: map[string][]string{}} 520 tap := &fakeTapAdmin{} 521 lightrail := &fakeLightrailRepoLister{ 522 dids: []string{"did:plc:xg2vq45muivyy3xwatcehspu"}, 523 } 524 profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{ 525 "did:plc:xg2vq45muivyy3xwatcehspu": { 526 Handle: "desertthunder.dev", 527 Record: map[string]any{ 528 "description": "Twisted maintainer", 529 }, 530 CID: "bafydesert123", 531 }, 532 }} 533 repos := &fakeRepoFetcher{repos: map[string][]RepoRecord{ 534 "did:plc:xg2vq45muivyy3xwatcehspu": { 535 { 536 RKey: "3mho6hukiei22", 537 CID: "bafyreitwisted123", 538 Record: map[string]any{ 539 "name": "twisted", 540 "description": "A tangled mobile client", 541 "topics": []any{"go", "search"}, 542 "createdAt": "2026-03-01T00:00:00Z", 543 }, 544 }, 545 }, 546 }} 547 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 548 r := NewRunnerWithDeps( 549 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, profiles, repos, lightrail, log, 550 ) 551 552 err := r.Run(context.Background(), Options{ 553 Source: SourceLightrail, 554 BatchSize: 10, 555 Concurrency: 1, 556 Collections: []string{"sh.tangled.repo"}, 557 }) 558 if err != nil { 559 t.Fatalf("run lightrail backfill: %v", err) 560 } 561 562 if len(st.documents) != 2 { 563 t.Fatalf("expected profile and repo bootstrap documents, got %d", len(st.documents)) 564 } 565 566 var foundRepo *store.Document 567 for _, doc := range st.documents { 568 if doc.RecordType == "repo" { 569 foundRepo = doc 570 break 571 } 572 } 573 if foundRepo == nil { 574 t.Fatal("expected repo bootstrap document") 575 } 576 if foundRepo.Title != "twisted" { 577 t.Fatalf("expected repo title twisted, got %q", foundRepo.Title) 578 } 579 if foundRepo.AuthorHandle != "desertthunder.dev" { 580 t.Fatalf("expected repo author_handle desertthunder.dev, got %q", foundRepo.AuthorHandle) 581 } 582 if foundRepo.WebURL == "" { 583 t.Fatal("expected repo web_url to be populated") 584 } 585}