a love letter to tangled (android, iOS, and a search API)
1package backfill
2
3import (
4 "context"
5 "errors"
6 "io"
7 "log/slog"
8 "os"
9 "path/filepath"
10 "strings"
11 "testing"
12
13 "tangled.org/desertthunder.dev/twister/internal/store"
14)
15
16type fakeStore struct {
17 collaborators map[string][]string
18 identities map[string]string
19 documents []*store.Document
20}
21
22func (f *fakeStore) GetRepoCollaborators(_ context.Context, did string) ([]string, error) {
23 return f.collaborators[did], nil
24}
25
26func (f *fakeStore) UpsertIdentityHandle(_ context.Context, did, handle string, _ bool, _ string) error {
27 if f.identities == nil {
28 f.identities = map[string]string{}
29 }
30 f.identities[did] = handle
31 return nil
32}
33
34func (f *fakeStore) UpsertDocument(_ context.Context, doc *store.Document) error {
35 f.documents = append(f.documents, doc)
36 return nil
37}
38
39type fakeFollowFetcher struct {
40 follows map[string][]string
41}
42
43func (f *fakeFollowFetcher) ListFollowSubjects(_ context.Context, did string) ([]string, error) {
44 return f.follows[did], nil
45}
46
47type fakeTapAdmin struct {
48 statuses map[string]RepoStatus
49 statusErrs map[string]error
50 added [][]string
51 addReposError func(dids []string) error
52}
53
54func (f *fakeTapAdmin) RepoStatus(_ context.Context, did string) (RepoStatus, error) {
55 if err, ok := f.statusErrs[did]; ok {
56 return RepoStatus{}, err
57 }
58 if status, ok := f.statuses[did]; ok {
59 return status, nil
60 }
61 return RepoStatus{Found: false, Tracked: false}, nil
62}
63
64func (f *fakeTapAdmin) AddRepos(_ context.Context, dids []string) error {
65 if f.addReposError != nil {
66 if err := f.addReposError(dids); err != nil {
67 return err
68 }
69 }
70 batch := make([]string, len(dids))
71 copy(batch, dids)
72 f.added = append(f.added, batch)
73 return nil
74}
75
76type fakeResolver struct {
77 mapping map[string]string
78}
79
80func (r *fakeResolver) Resolve(_ context.Context, handle string) (string, error) {
81 if did, ok := r.mapping[handle]; ok {
82 return did, nil
83 }
84 return "", io.EOF
85}
86
87type fakeProfileFetcher struct {
88 profiles map[string]*ProfileRecord
89}
90
91func (f *fakeProfileFetcher) FetchProfile(_ context.Context, did string) (*ProfileRecord, error) {
92 if pr, ok := f.profiles[did]; ok {
93 return pr, nil
94 }
95 return &ProfileRecord{}, nil
96}
97
98type fakeRepoFetcher struct {
99 repos map[string][]RepoRecord
100}
101
102func (f *fakeRepoFetcher) ListRepos(_ context.Context, did string) ([]RepoRecord, error) {
103 if repos, ok := f.repos[did]; ok {
104 return repos, nil
105 }
106 return nil, nil
107}
108
109type fakeLightrailRepoLister struct {
110 dids []string
111 err error
112 calls int
113 baseURL string
114 collections []string
115 limit int
116}
117
118func (f *fakeLightrailRepoLister) ListReposByCollection(
119 _ context.Context, baseURL string, collections []string, limit int,
120) ([]string, error) {
121 f.calls++
122 f.baseURL = baseURL
123 f.collections = append([]string(nil), collections...)
124 f.limit = limit
125 if f.err != nil {
126 return nil, f.err
127 }
128 return append([]string(nil), f.dids...), nil
129}
130
131func TestRunner_DiscoveryAndSubmit(t *testing.T) {
132 st := &fakeStore{
133 collaborators: map[string][]string{
134 "did:plc:seed": {"did:plc:c1"},
135 },
136 }
137 follows := &fakeFollowFetcher{follows: map[string][]string{"did:plc:seed": {"did:plc:f1"}}}
138 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{"did:plc:f1": {Found: true, Tracked: true, Backfilled: true}}}
139 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}}
140 log := slog.New(slog.NewTextHandler(io.Discard, nil))
141 r := NewRunnerWithDeps(
142 st, tap, resolver, follows,
143 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}},
144 &fakeRepoFetcher{},
145 &fakeLightrailRepoLister{}, log,
146 )
147
148 dir := t.TempDir()
149 seedsPath := filepath.Join(dir, "seeds.txt")
150 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil {
151 t.Fatalf("write seeds: %v", err)
152 }
153
154 err := r.Run(context.Background(), Options{
155 SeedsPath: seedsPath,
156 MaxHops: 1,
157 Concurrency: 2,
158 BatchSize: 2,
159 Source: SourceGraph,
160 })
161 if err != nil {
162 t.Fatalf("run backfill: %v", err)
163 }
164
165 if len(tap.added) != 1 {
166 t.Fatalf("expected one batch, got %d", len(tap.added))
167 }
168 if len(tap.added[0]) != 2 {
169 t.Fatalf("expected 2 dids submitted, got %#v", tap.added[0])
170 }
171}
172
173func TestRunner_DryRunSkipsMutations(t *testing.T) {
174 st := &fakeStore{collaborators: map[string][]string{}}
175 follows := &fakeFollowFetcher{follows: map[string][]string{}}
176 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}}
177 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}}
178 log := slog.New(slog.NewTextHandler(io.Discard, nil))
179 r := NewRunnerWithDeps(
180 st, tap, resolver, follows,
181 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}},
182 &fakeRepoFetcher{},
183 &fakeLightrailRepoLister{}, log,
184 )
185
186 dir := t.TempDir()
187 seedsPath := filepath.Join(dir, "seeds.txt")
188 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil {
189 t.Fatalf("write seeds: %v", err)
190 }
191
192 err := r.Run(context.Background(), Options{
193 SeedsPath: seedsPath,
194 MaxHops: 0,
195 DryRun: true,
196 Concurrency: 1,
197 BatchSize: 10,
198 Source: SourceGraph,
199 })
200 if err != nil {
201 t.Fatalf("run dry-run backfill: %v", err)
202 }
203 if len(tap.added) != 0 {
204 t.Fatalf("expected no tap submissions in dry-run, got %#v", tap.added)
205 }
206}
207
208func TestRunner_SkipsInProgressBackfills(t *testing.T) {
209 st := &fakeStore{collaborators: map[string][]string{}}
210 follows := &fakeFollowFetcher{follows: map[string][]string{}}
211 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{
212 "did:plc:seed": {Found: true, Tracked: true, Backfilling: true},
213 }}
214 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}}
215 log := slog.New(slog.NewTextHandler(io.Discard, nil))
216 r := NewRunnerWithDeps(
217 st, tap, resolver, follows,
218 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}},
219 &fakeRepoFetcher{},
220 &fakeLightrailRepoLister{}, log,
221 )
222
223 dir := t.TempDir()
224 seedsPath := filepath.Join(dir, "seeds.txt")
225 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil {
226 t.Fatalf("write seeds: %v", err)
227 }
228
229 err := r.Run(context.Background(), Options{
230 SeedsPath: seedsPath, MaxHops: 0, Source: SourceGraph,
231 })
232 if err != nil {
233 t.Fatalf("run backfill: %v", err)
234 }
235 if len(tap.added) != 0 {
236 t.Fatalf("expected no submission for in-progress did, got %#v", tap.added)
237 }
238}
239
240func TestRunner_ContinuesWhenRepoStatusFails(t *testing.T) {
241 st := &fakeStore{
242 collaborators: map[string][]string{
243 "did:plc:seed": {"did:plc:good", "did:plc:bad"},
244 },
245 }
246 follows := &fakeFollowFetcher{follows: map[string][]string{}}
247 tap := &fakeTapAdmin{
248 statuses: map[string]RepoStatus{},
249 statusErrs: map[string]error{"did:plc:bad": errors.New("tap info request failed: status 502")},
250 }
251 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}}
252 log := slog.New(slog.NewTextHandler(io.Discard, nil))
253 r := NewRunnerWithDeps(
254 st, tap, resolver, follows,
255 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}},
256 &fakeRepoFetcher{},
257 &fakeLightrailRepoLister{}, log,
258 )
259
260 dir := t.TempDir()
261 seedsPath := filepath.Join(dir, "seeds.txt")
262 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil {
263 t.Fatalf("write seeds: %v", err)
264 }
265
266 err := r.Run(context.Background(), Options{
267 SeedsPath: seedsPath,
268 MaxHops: 1,
269 Concurrency: 1,
270 BatchSize: 10,
271 Source: SourceGraph,
272 })
273 if err != nil {
274 t.Fatalf("run backfill: %v", err)
275 }
276
277 if len(tap.added) != 1 {
278 t.Fatalf("expected one submission batch, got %d", len(tap.added))
279 }
280 if len(tap.added[0]) != 2 {
281 t.Fatalf("expected seed and good DID submitted, got %#v", tap.added[0])
282 }
283 for _, did := range tap.added[0] {
284 if did == "did:plc:bad" {
285 t.Fatalf("did with status error should have been skipped, got %#v", tap.added[0])
286 }
287 }
288}
289
290func TestRunner_FallsBackToSingleRepoSubmissionOnBatchFailure(t *testing.T) {
291 st := &fakeStore{
292 collaborators: map[string][]string{
293 "did:plc:seed": {"did:plc:good", "did:plc:bad"},
294 },
295 }
296 follows := &fakeFollowFetcher{follows: map[string][]string{}}
297 tap := &fakeTapAdmin{
298 statuses: map[string]RepoStatus{},
299 addReposError: func(dids []string) error {
300 if len(dids) > 1 {
301 return errors.New("repos add failed: status 502")
302 }
303 if len(dids) == 1 && strings.Contains(dids[0], "bad") {
304 return errors.New("repos add failed: status 502")
305 }
306 return nil
307 },
308 }
309 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}}
310 log := slog.New(slog.NewTextHandler(io.Discard, nil))
311 r := NewRunnerWithDeps(
312 st, tap, resolver, follows,
313 &fakeProfileFetcher{profiles: map[string]*ProfileRecord{}},
314 &fakeRepoFetcher{},
315 &fakeLightrailRepoLister{}, log,
316 )
317
318 dir := t.TempDir()
319 seedsPath := filepath.Join(dir, "seeds.txt")
320 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil {
321 t.Fatalf("write seeds: %v", err)
322 }
323
324 err := r.Run(context.Background(), Options{
325 SeedsPath: seedsPath,
326 MaxHops: 1,
327 Concurrency: 1,
328 BatchSize: 10,
329 Source: SourceGraph,
330 })
331 if err != nil {
332 t.Fatalf("run backfill: %v", err)
333 }
334
335 if len(tap.added) != 2 {
336 t.Fatalf("expected successful individual fallbacks only, got %#v", tap.added)
337 }
338 for _, batch := range tap.added {
339 if len(batch) != 1 {
340 t.Fatalf("expected only single-DID successful submissions after batch fallback, got %#v", tap.added)
341 }
342 if batch[0] == "did:plc:bad" {
343 t.Fatalf("bad DID should not have been successfully submitted, got %#v", tap.added)
344 }
345 }
346}
347
348func TestRunner_IndexesProfilesAndHandles(t *testing.T) {
349 st := &fakeStore{collaborators: map[string][]string{}}
350 follows := &fakeFollowFetcher{follows: map[string][]string{}}
351 tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}}
352 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}}
353 profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{
354 "did:plc:seed": {
355 Record: map[string]any{
356 "description": "Building cool stuff",
357 "location": "NYC",
358 },
359 CID: "bafyabc123",
360 Handle: "alice.tangled.sh",
361 },
362 }}
363 log := slog.New(slog.NewTextHandler(io.Discard, nil))
364 r := NewRunnerWithDeps(
365 st, tap, resolver, follows, profiles, &fakeRepoFetcher{}, &fakeLightrailRepoLister{}, log,
366 )
367
368 dir := t.TempDir()
369 seedsPath := filepath.Join(dir, "seeds.txt")
370 if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil {
371 t.Fatalf("write seeds: %v", err)
372 }
373
374 err := r.Run(context.Background(), Options{
375 SeedsPath: seedsPath, MaxHops: 0, Source: SourceGraph,
376 })
377 if err != nil {
378 t.Fatalf("run backfill: %v", err)
379 }
380
381 if st.identities["did:plc:seed"] != "alice.tangled.sh" {
382 t.Fatalf("expected identity handle for seed DID, got %#v", st.identities)
383 }
384
385 if len(st.documents) != 1 {
386 t.Fatalf("expected 1 profile document, got %d", len(st.documents))
387 }
388 doc := st.documents[0]
389 if doc.Title != "alice.tangled.sh" {
390 t.Errorf("expected title to be handle, got %q", doc.Title)
391 }
392 if doc.AuthorHandle != "alice.tangled.sh" {
393 t.Errorf("expected author_handle to be handle, got %q", doc.AuthorHandle)
394 }
395 if doc.Body != "Building cool stuff" {
396 t.Errorf("expected body to be description, got %q", doc.Body)
397 }
398 if doc.RecordType != "profile" {
399 t.Errorf("expected record_type profile, got %q", doc.RecordType)
400 }
401 if !strings.Contains(doc.Summary, "NYC") {
402 t.Errorf("expected summary to contain location, got %q", doc.Summary)
403 }
404}
405
406func TestRunner_LightrailDryRunSkipsTapAndProfileIndexing(t *testing.T) {
407 st := &fakeStore{collaborators: map[string][]string{}}
408 tap := &fakeTapAdmin{
409 statusErrs: map[string]error{"did:plc:a": errors.New("should not be called")},
410 }
411 lightrail := &fakeLightrailRepoLister{dids: []string{"did:plc:b", "did:plc:a"}}
412 log := slog.New(slog.NewTextHandler(io.Discard, nil))
413 r := NewRunnerWithDeps(
414 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, &fakeRepoFetcher{},
415 lightrail, log,
416 )
417
418 err := r.Run(context.Background(), Options{
419 Source: SourceLightrail, DryRun: true,
420 LightrailURL: "https://example.test", PageLimit: 500,
421 })
422 if err != nil {
423 t.Fatalf("run lightrail dry-run: %v", err)
424 }
425 if len(tap.added) != 0 {
426 t.Fatalf("expected no Tap submissions, got %#v", tap.added)
427 }
428 if len(st.documents) != 0 {
429 t.Fatalf("expected no profile indexing, got %#v", st.documents)
430 }
431 if lightrail.calls != 1 {
432 t.Fatalf("expected one Lightrail call, got %d", lightrail.calls)
433 }
434 if len(lightrail.collections) != len(DefaultCollections) {
435 t.Fatalf("expected default collections, got %#v", lightrail.collections)
436 }
437}
438
439func TestRunner_LightrailSubmitsWithoutRepoStatusChecks(t *testing.T) {
440 st := &fakeStore{collaborators: map[string][]string{}}
441 tap := &fakeTapAdmin{
442 statusErrs: map[string]error{
443 "did:plc:a": errors.New("RepoStatus should not be called in lightrail mode"),
444 },
445 }
446 lightrail := &fakeLightrailRepoLister{
447 dids: []string{"did:plc:b", "did:plc:a", "did:plc:b"},
448 }
449 log := slog.New(slog.NewTextHandler(io.Discard, nil))
450 r := NewRunnerWithDeps(
451 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, &fakeProfileFetcher{}, &fakeRepoFetcher{},
452 lightrail, log,
453 )
454
455 err := r.Run(context.Background(), Options{
456 Source: SourceLightrail, BatchSize: 10,
457 Collections: []string{"sh.tangled.repo"},
458 })
459 if err != nil {
460 t.Fatalf("run lightrail backfill: %v", err)
461 }
462 if len(tap.added) != 1 {
463 t.Fatalf("expected one Tap batch, got %#v", tap.added)
464 }
465 if len(tap.added[0]) != 2 {
466 t.Fatalf("expected deduped DIDs, got %#v", tap.added)
467 }
468}
469
470func TestRunner_LightrailIndexesProfiles(t *testing.T) {
471 st := &fakeStore{collaborators: map[string][]string{}}
472 tap := &fakeTapAdmin{}
473 lightrail := &fakeLightrailRepoLister{
474 dids: []string{"did:plc:xg2vq45muivyy3xwatcehspu"},
475 }
476 profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{
477 "did:plc:xg2vq45muivyy3xwatcehspu": {
478 Record: map[string]any{
479 "description": "Twisted maintainer",
480 "location": "Chicago",
481 },
482 CID: "bafydesert123",
483 Handle: "desertthunder.dev",
484 },
485 }}
486 log := slog.New(slog.NewTextHandler(io.Discard, nil))
487 r := NewRunnerWithDeps(
488 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, profiles, &fakeRepoFetcher{}, lightrail, log,
489 )
490
491 err := r.Run(context.Background(), Options{
492 Source: SourceLightrail,
493 BatchSize: 10,
494 Concurrency: 1,
495 Collections: []string{"sh.tangled.repo"},
496 })
497 if err != nil {
498 t.Fatalf("run lightrail backfill: %v", err)
499 }
500 if st.identities["did:plc:xg2vq45muivyy3xwatcehspu"] != "desertthunder.dev" {
501 t.Fatalf("expected identity handle to be stored, got %#v", st.identities)
502 }
503 if len(st.documents) != 1 {
504 t.Fatalf("expected one profile document, got %d", len(st.documents))
505 }
506 doc := st.documents[0]
507 if doc.RecordType != "profile" {
508 t.Fatalf("expected profile document, got %q", doc.RecordType)
509 }
510 if doc.AuthorHandle != "desertthunder.dev" {
511 t.Fatalf("expected author_handle desertthunder.dev, got %q", doc.AuthorHandle)
512 }
513 if doc.Title != "desertthunder.dev" {
514 t.Fatalf("expected title desertthunder.dev, got %q", doc.Title)
515 }
516}
517
518func TestRunner_LightrailIndexesReposDirectly(t *testing.T) {
519 st := &fakeStore{collaborators: map[string][]string{}}
520 tap := &fakeTapAdmin{}
521 lightrail := &fakeLightrailRepoLister{
522 dids: []string{"did:plc:xg2vq45muivyy3xwatcehspu"},
523 }
524 profiles := &fakeProfileFetcher{profiles: map[string]*ProfileRecord{
525 "did:plc:xg2vq45muivyy3xwatcehspu": {
526 Handle: "desertthunder.dev",
527 Record: map[string]any{
528 "description": "Twisted maintainer",
529 },
530 CID: "bafydesert123",
531 },
532 }}
533 repos := &fakeRepoFetcher{repos: map[string][]RepoRecord{
534 "did:plc:xg2vq45muivyy3xwatcehspu": {
535 {
536 RKey: "3mho6hukiei22",
537 CID: "bafyreitwisted123",
538 Record: map[string]any{
539 "name": "twisted",
540 "description": "A tangled mobile client",
541 "topics": []any{"go", "search"},
542 "createdAt": "2026-03-01T00:00:00Z",
543 },
544 },
545 },
546 }}
547 log := slog.New(slog.NewTextHandler(io.Discard, nil))
548 r := NewRunnerWithDeps(
549 st, tap, &fakeResolver{}, &fakeFollowFetcher{}, profiles, repos, lightrail, log,
550 )
551
552 err := r.Run(context.Background(), Options{
553 Source: SourceLightrail,
554 BatchSize: 10,
555 Concurrency: 1,
556 Collections: []string{"sh.tangled.repo"},
557 })
558 if err != nil {
559 t.Fatalf("run lightrail backfill: %v", err)
560 }
561
562 if len(st.documents) != 2 {
563 t.Fatalf("expected profile and repo bootstrap documents, got %d", len(st.documents))
564 }
565
566 var foundRepo *store.Document
567 for _, doc := range st.documents {
568 if doc.RecordType == "repo" {
569 foundRepo = doc
570 break
571 }
572 }
573 if foundRepo == nil {
574 t.Fatal("expected repo bootstrap document")
575 }
576 if foundRepo.Title != "twisted" {
577 t.Fatalf("expected repo title twisted, got %q", foundRepo.Title)
578 }
579 if foundRepo.AuthorHandle != "desertthunder.dev" {
580 t.Fatalf("expected repo author_handle desertthunder.dev, got %q", foundRepo.AuthorHandle)
581 }
582 if foundRepo.WebURL == "" {
583 t.Fatal("expected repo web_url to be populated")
584 }
585}