a love letter to tangled (android, iOS, and a search API)
1package store_test
2
3import (
4 "context"
5 "os"
6 "path/filepath"
7 "sync"
8 "testing"
9 "time"
10
11 "tangled.org/desertthunder.dev/twister/internal/store"
12)
13
14func TestIntegration(t *testing.T) {
15 dir := t.TempDir()
16 dbPath := filepath.Join(dir, "test.db")
17 url := "file:" + dbPath
18
19 db, err := store.Open(url)
20 if err != nil {
21 t.Fatalf("open: %v", err)
22 }
23 t.Cleanup(func() {
24 db.Close()
25 os.Remove(dbPath)
26 })
27
28 if err := store.Migrate(db, url); err != nil {
29 t.Fatalf("migrate: %v", err)
30 }
31
32 st := store.New(url, db)
33 ctx := context.Background()
34
35 t.Run("upsert and get document", func(t *testing.T) {
36 doc := &store.Document{
37 ID: "did:plc:abc|sh.tangled.repo|abc123",
38 DID: "did:plc:abc",
39 Collection: "sh.tangled.repo",
40 RKey: "abc123",
41 ATURI: "at://did:plc:abc/sh.tangled.repo/abc123",
42 CID: "bafyreiabc",
43 RecordType: "repo",
44 Title: "my-repo",
45 Body: "A test repository",
46 RepoName: "my-repo",
47 }
48 if err := st.UpsertDocument(ctx, doc); err != nil {
49 t.Fatalf("upsert: %v", err)
50 }
51
52 got, err := st.GetDocument(ctx, doc.ID)
53 if err != nil {
54 t.Fatalf("get: %v", err)
55 }
56 if got == nil {
57 t.Fatal("expected document, got nil")
58 }
59 if got.Title != "my-repo" {
60 t.Errorf("title: got %q, want %q", got.Title, "my-repo")
61 }
62 if got.IndexedAt == "" {
63 t.Error("indexed_at should be set by upsert")
64 }
65 })
66
67 t.Run("upsert is idempotent", func(t *testing.T) {
68 doc := &store.Document{
69 ID: "did:plc:abc|sh.tangled.repo|abc123",
70 DID: "did:plc:abc",
71 Collection: "sh.tangled.repo",
72 RKey: "abc123",
73 ATURI: "at://did:plc:abc/sh.tangled.repo/abc123",
74 CID: "bafyreiabc2",
75 RecordType: "repo",
76 Title: "my-repo-v2",
77 }
78 if err := st.UpsertDocument(ctx, doc); err != nil {
79 t.Fatalf("upsert: %v", err)
80 }
81 got, err := st.GetDocument(ctx, doc.ID)
82 if err != nil {
83 t.Fatalf("get: %v", err)
84 }
85 if got.Title != "my-repo-v2" {
86 t.Errorf("title: got %q, want %q", got.Title, "my-repo-v2")
87 }
88 if got.CID != "bafyreiabc2" {
89 t.Errorf("cid: got %q, want updated CID", got.CID)
90 }
91 })
92
93 t.Run("tombstone sets deleted_at", func(t *testing.T) {
94 if err := st.MarkDeleted(ctx, "did:plc:abc|sh.tangled.repo|abc123"); err != nil {
95 t.Fatalf("mark deleted: %v", err)
96 }
97 got, err := st.GetDocument(ctx, "did:plc:abc|sh.tangled.repo|abc123")
98 if err != nil {
99 t.Fatalf("get: %v", err)
100 }
101 if got.DeletedAt == "" {
102 t.Error("deleted_at should be set after tombstone")
103 }
104 })
105
106 t.Run("get missing document returns nil", func(t *testing.T) {
107 got, err := st.GetDocument(ctx, "nonexistent")
108 if err != nil {
109 t.Fatalf("get: %v", err)
110 }
111 if got != nil {
112 t.Error("expected nil for missing document")
113 }
114 })
115
116 t.Run("sync state CRUD", func(t *testing.T) {
117 got, err := st.GetSyncState(ctx, "tap-consumer")
118 if err != nil {
119 t.Fatalf("get sync state: %v", err)
120 }
121 if got != nil {
122 t.Error("expected nil for missing sync state")
123 }
124
125 if err := st.SetSyncState(ctx, "tap-consumer", "cursor-001"); err != nil {
126 t.Fatalf("set sync state: %v", err)
127 }
128
129 got, err = st.GetSyncState(ctx, "tap-consumer")
130 if err != nil {
131 t.Fatalf("get sync state: %v", err)
132 }
133 if got == nil {
134 t.Fatal("expected sync state, got nil")
135 }
136 if got.Cursor != "cursor-001" {
137 t.Errorf("cursor: got %q, want %q", got.Cursor, "cursor-001")
138 }
139
140 if err := st.SetSyncState(ctx, "tap-consumer", "cursor-002"); err != nil {
141 t.Fatalf("update sync state: %v", err)
142 }
143 got, err = st.GetSyncState(ctx, "tap-consumer")
144 if err != nil {
145 t.Fatalf("get: %v", err)
146 }
147 if got.Cursor != "cursor-002" {
148 t.Errorf("cursor after update: got %q, want %q", got.Cursor, "cursor-002")
149 }
150 })
151
152 t.Run("record state upsert", func(t *testing.T) {
153 uri := "at://did:plc:abc/sh.tangled.repo.issue/1"
154 if err := st.UpdateRecordState(ctx, uri, "open"); err != nil {
155 t.Fatalf("update record state: %v", err)
156 }
157 if err := st.UpdateRecordState(ctx, uri, "closed"); err != nil {
158 t.Fatalf("update record state to closed: %v", err)
159 }
160 })
161
162 t.Run("identity handle upsert and get", func(t *testing.T) {
163 const did = "did:plc:identity1"
164
165 handle, err := st.GetIdentityHandle(ctx, did)
166 if err != nil {
167 t.Fatalf("get missing identity handle: %v", err)
168 }
169 if handle != "" {
170 t.Fatalf("expected empty missing handle, got %q", handle)
171 }
172
173 if err := st.UpsertIdentityHandle(ctx, did, "alice.tangled.org", true, "active"); err != nil {
174 t.Fatalf("upsert identity handle: %v", err)
175 }
176
177 handle, err = st.GetIdentityHandle(ctx, did)
178 if err != nil {
179 t.Fatalf("get identity handle: %v", err)
180 }
181 if handle != "alice.tangled.org" {
182 t.Fatalf("handle: got %q, want %q", handle, "alice.tangled.org")
183 }
184
185 if err := st.UpsertIdentityHandle(ctx, did, "alice2.tangled.org", false, "inactive"); err != nil {
186 t.Fatalf("update identity handle: %v", err)
187 }
188
189 handle, err = st.GetIdentityHandle(ctx, did)
190 if err != nil {
191 t.Fatalf("get identity handle after update: %v", err)
192 }
193 if handle != "alice2.tangled.org" {
194 t.Fatalf("handle after update: got %q, want %q", handle, "alice2.tangled.org")
195 }
196 })
197
198 t.Run("follow subject discovery query", func(t *testing.T) {
199 followDoc := &store.Document{
200 ID: "did:plc:owner|sh.tangled.graph.follow|f1",
201 DID: "did:plc:owner",
202 Collection: "sh.tangled.graph.follow",
203 RKey: "f1",
204 ATURI: "at://did:plc:owner/sh.tangled.graph.follow/f1",
205 CID: "cid-follow",
206 RecordType: "follow",
207 RepoDID: "did:plc:target",
208 }
209 if err := st.UpsertDocument(ctx, followDoc); err != nil {
210 t.Fatalf("upsert follow doc: %v", err)
211 }
212
213 subjects, err := st.GetFollowSubjects(ctx, "did:plc:owner")
214 if err != nil {
215 t.Fatalf("get follow subjects: %v", err)
216 }
217 if len(subjects) != 1 || subjects[0] != "did:plc:target" {
218 t.Fatalf("subjects: got %#v", subjects)
219 }
220 })
221
222 t.Run("repo collaborator discovery query", func(t *testing.T) {
223 docs := []*store.Document{
224 {
225 ID: "did:plc:collab1|sh.tangled.repo.issue|i1",
226 DID: "did:plc:collab1",
227 Collection: "sh.tangled.repo.issue",
228 RKey: "i1",
229 ATURI: "at://did:plc:collab1/sh.tangled.repo.issue/i1",
230 CID: "cid-c1",
231 RecordType: "issue",
232 RepoDID: "did:plc:owner",
233 },
234 {
235 ID: "did:plc:collab2|sh.tangled.repo.pull.comment|pc1",
236 DID: "did:plc:collab2",
237 Collection: "sh.tangled.repo.pull.comment",
238 RKey: "pc1",
239 ATURI: "at://did:plc:collab2/sh.tangled.repo.pull.comment/pc1",
240 CID: "cid-c2",
241 RecordType: "pull_comment",
242 RepoDID: "did:plc:owner",
243 },
244 {
245 ID: "did:plc:owner|sh.tangled.repo.issue|i-owner",
246 DID: "did:plc:owner",
247 Collection: "sh.tangled.repo.issue",
248 RKey: "i-owner",
249 ATURI: "at://did:plc:owner/sh.tangled.repo.issue/i-owner",
250 CID: "cid-owner",
251 RecordType: "issue",
252 RepoDID: "did:plc:owner",
253 },
254 }
255 for _, doc := range docs {
256 if err := st.UpsertDocument(ctx, doc); err != nil {
257 t.Fatalf("upsert collaborator doc %s: %v", doc.ID, err)
258 }
259 }
260
261 collaborators, err := st.GetRepoCollaborators(ctx, "did:plc:owner")
262 if err != nil {
263 t.Fatalf("get collaborators: %v", err)
264 }
265 if len(collaborators) != 2 {
266 t.Fatalf("collaborators length: got %d want 2 (%#v)", len(collaborators), collaborators)
267 }
268 got := map[string]bool{}
269 for _, did := range collaborators {
270 got[did] = true
271 }
272 if !got["did:plc:collab1"] || !got["did:plc:collab2"] {
273 t.Fatalf("collaborators: got %#v", collaborators)
274 }
275 })
276
277 t.Run("jetstream events insert and list", func(t *testing.T) {
278 evt := &store.JetstreamEvent{
279 TimeUS: 1_000_000,
280 DID: "did:plc:evt1",
281 Kind: "commit",
282 Collection: "sh.tangled.repo",
283 RKey: "rkey1",
284 Operation: "create",
285 Payload: `{"did":"did:plc:evt1","time_us":1000000,"kind":"commit"}`,
286 ReceivedAt: "2026-01-01T00:00:00Z",
287 }
288 if err := st.InsertJetstreamEvent(ctx, evt, 500); err != nil {
289 t.Fatalf("insert jetstream event: %v", err)
290 }
291
292 events, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{Limit: 10})
293 if err != nil {
294 t.Fatalf("list jetstream events: %v", err)
295 }
296 if len(events) < 1 {
297 t.Fatal("expected at least one jetstream event")
298 }
299 found := false
300 for _, e := range events {
301 if e.DID == "did:plc:evt1" && e.Collection == "sh.tangled.repo" {
302 found = true
303 if e.Operation != "create" {
304 t.Errorf("operation: got %q, want create", e.Operation)
305 }
306 if e.TimeUS != 1_000_000 {
307 t.Errorf("time_us: got %d, want 1000000", e.TimeUS)
308 }
309 }
310 }
311 if !found {
312 t.Error("inserted event not found in list")
313 }
314 })
315
316 t.Run("jetstream events filter by collection", func(t *testing.T) {
317 evt := &store.JetstreamEvent{
318 TimeUS: 2_000_000,
319 DID: "did:plc:evt2",
320 Kind: "commit",
321 Collection: "sh.tangled.repo.issue",
322 RKey: "rkey2",
323 Operation: "create",
324 Payload: `{"did":"did:plc:evt2","time_us":2000000,"kind":"commit"}`,
325 ReceivedAt: "2026-01-01T00:00:01Z",
326 }
327 if err := st.InsertJetstreamEvent(ctx, evt, 500); err != nil {
328 t.Fatalf("insert jetstream event: %v", err)
329 }
330
331 events, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{
332 Collection: "sh.tangled.repo.issue",
333 Limit: 10,
334 })
335 if err != nil {
336 t.Fatalf("list with collection filter: %v", err)
337 }
338 for _, e := range events {
339 if e.Collection != "sh.tangled.repo.issue" {
340 t.Errorf("filter leaked event with collection %q", e.Collection)
341 }
342 }
343 if len(events) == 0 {
344 t.Error("expected at least one event matching collection filter")
345 }
346 })
347
348 t.Run("jetstream events bounded by maxEvents", func(t *testing.T) {
349 // Insert 5 events with max=3; only the 3 most recent should survive.
350 for i := int64(1); i <= 5; i++ {
351 e := &store.JetstreamEvent{
352 TimeUS: 100 + i,
353 DID: "did:plc:bound",
354 Kind: "commit",
355 Collection: "sh.tangled.repo",
356 RKey: "rkey-bound",
357 Operation: "create",
358 Payload: `{}`,
359 ReceivedAt: "2026-01-01T00:00:00Z",
360 }
361 if err := st.InsertJetstreamEvent(ctx, e, 3); err != nil {
362 t.Fatalf("insert bounded event %d: %v", i, err)
363 }
364 }
365
366 all, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{
367 DID: "did:plc:bound",
368 Limit: 100,
369 })
370 if err != nil {
371 t.Fatalf("list bounded events: %v", err)
372 }
373 // After inserting 5 events total (including the one from the previous subtests),
374 // maxEvents=3 means at most 3 rows survive across all events.
375 if len(all) > 3 {
376 t.Errorf("expected at most 3 events after bounding, got %d", len(all))
377 }
378 })
379
380 t.Run("indexing jobs enqueue claim retry complete", func(t *testing.T) {
381 job := store.IndexingJobInput{
382 DocumentID: "did:plc:owner|sh.tangled.repo|repo1",
383 DID: "did:plc:owner",
384 Collection: "sh.tangled.repo",
385 RKey: "repo1",
386 CID: "cid-repo1",
387 RecordJSON: `{"name":"repo1"}`,
388 Source: store.IndexSourceReadThrough,
389 }
390 if err := st.EnqueueIndexingJob(ctx, job); err != nil {
391 t.Fatalf("enqueue indexing job: %v", err)
392 }
393 if err := st.EnqueueIndexingJob(ctx, job); err != nil {
394 t.Fatalf("enqueue indexing job second call: %v", err)
395 }
396
397 claimed, err := st.ClaimIndexingJob(ctx, "worker-a", time.Now().Add(time.Minute).Format(time.RFC3339))
398 if err != nil {
399 t.Fatalf("claim indexing job: %v", err)
400 }
401 if claimed == nil {
402 t.Fatal("expected claimed indexing job")
403 }
404 if claimed.DocumentID != job.DocumentID {
405 t.Fatalf("claimed document id: got %q want %q", claimed.DocumentID, job.DocumentID)
406 }
407
408 if err := st.RetryIndexingJob(ctx, job.DocumentID, "9999-12-31T23:59:59Z", "boom"); err != nil {
409 t.Fatalf("retry indexing job: %v", err)
410 }
411
412 none, err := st.ClaimIndexingJob(ctx, "worker-b", time.Now().Add(time.Minute).Format(time.RFC3339))
413 if err != nil {
414 t.Fatalf("claim delayed indexing job: %v", err)
415 }
416 if none != nil {
417 t.Fatalf("expected no claim before schedule time, got %#v", none)
418 }
419
420 if err := st.RetryIndexingJob(ctx, job.DocumentID, "1970-01-01T00:00:00Z", "retry-now"); err != nil {
421 t.Fatalf("retry indexing job now: %v", err)
422 }
423
424 claimed, err = st.ClaimIndexingJob(ctx, "worker-c", time.Now().Add(time.Minute).Format(time.RFC3339))
425 if err != nil {
426 t.Fatalf("claim retried indexing job: %v", err)
427 }
428 if claimed == nil {
429 t.Fatal("expected claimed retried indexing job")
430 }
431
432 if err := st.CompleteIndexingJob(ctx, job.DocumentID); err != nil {
433 t.Fatalf("complete indexing job: %v", err)
434 }
435
436 claimed, err = st.ClaimIndexingJob(ctx, "worker-d", time.Now().Add(time.Minute).Format(time.RFC3339))
437 if err != nil {
438 t.Fatalf("claim after complete: %v", err)
439 }
440 if claimed != nil {
441 t.Fatalf("expected no job after complete, got %#v", claimed)
442 }
443
444 got, err := st.GetIndexingJob(ctx, job.DocumentID)
445 if err != nil {
446 t.Fatalf("get completed job: %v", err)
447 }
448 if got == nil || got.Status != store.IndexingJobCompleted {
449 t.Fatalf("expected completed job row, got %#v", got)
450 }
451 })
452
453 t.Run("indexing claim is single winner", func(t *testing.T) {
454 job := store.IndexingJobInput{
455 DocumentID: "did:plc:owner|sh.tangled.repo|repo2",
456 DID: "did:plc:owner",
457 Collection: "sh.tangled.repo",
458 RKey: "repo2",
459 CID: "cid-repo2",
460 RecordJSON: `{"name":"repo2"}`,
461 Source: store.IndexSourceReadThrough,
462 }
463 if err := st.EnqueueIndexingJob(ctx, job); err != nil {
464 t.Fatalf("enqueue job: %v", err)
465 }
466
467 var wg sync.WaitGroup
468 results := make(chan *store.IndexingJob, 2)
469 errs := make(chan error, 2)
470 for _, worker := range []string{"worker-1", "worker-2"} {
471 wg.Add(1)
472 go func(worker string) {
473 defer wg.Done()
474 claimed, err := st.ClaimIndexingJob(
475 ctx, worker, time.Now().Add(time.Minute).Format(time.RFC3339),
476 )
477 errs <- err
478 results <- claimed
479 }(worker)
480 }
481 wg.Wait()
482 close(results)
483 close(errs)
484
485 claimedCount := 0
486 for err := range errs {
487 if err != nil {
488 t.Fatalf("claim concurrent job: %v", err)
489 }
490 }
491 for claimed := range results {
492 if claimed != nil {
493 claimedCount++
494 }
495 }
496 if claimedCount != 1 {
497 t.Fatalf("expected exactly one claimant, got %d", claimedCount)
498 }
499 })
500
501 t.Run("indexing audit and stats", func(t *testing.T) {
502 if err := st.AppendIndexingAudit(ctx, store.IndexingAuditInput{
503 Source: store.IndexSourceReadThrough,
504 DocumentID: "doc-audit",
505 Collection: "sh.tangled.repo",
506 CID: "cid-audit",
507 Decision: "enqueued",
508 }); err != nil {
509 t.Fatalf("append indexing audit: %v", err)
510 }
511 stats, err := st.GetIndexingJobStats(ctx)
512 if err != nil {
513 t.Fatalf("get indexing stats: %v", err)
514 }
515 if stats.Completed < 1 {
516 t.Fatalf("expected completed jobs in stats, got %#v", stats)
517 }
518 entries, err := st.ListIndexingAudit(ctx, store.IndexingAuditFilter{DocumentID: "doc-audit"})
519 if err != nil {
520 t.Fatalf("list indexing audit: %v", err)
521 }
522 if len(entries) != 1 || entries[0].Decision != "enqueued" {
523 t.Fatalf("unexpected audit rows: %#v", entries)
524 }
525 })
526}