a love letter to tangled (android, iOS, and a search API)
at main 526 lines 15 kB view raw
1package store_test 2 3import ( 4 "context" 5 "os" 6 "path/filepath" 7 "sync" 8 "testing" 9 "time" 10 11 "tangled.org/desertthunder.dev/twister/internal/store" 12) 13 14func TestIntegration(t *testing.T) { 15 dir := t.TempDir() 16 dbPath := filepath.Join(dir, "test.db") 17 url := "file:" + dbPath 18 19 db, err := store.Open(url) 20 if err != nil { 21 t.Fatalf("open: %v", err) 22 } 23 t.Cleanup(func() { 24 db.Close() 25 os.Remove(dbPath) 26 }) 27 28 if err := store.Migrate(db, url); err != nil { 29 t.Fatalf("migrate: %v", err) 30 } 31 32 st := store.New(url, db) 33 ctx := context.Background() 34 35 t.Run("upsert and get document", func(t *testing.T) { 36 doc := &store.Document{ 37 ID: "did:plc:abc|sh.tangled.repo|abc123", 38 DID: "did:plc:abc", 39 Collection: "sh.tangled.repo", 40 RKey: "abc123", 41 ATURI: "at://did:plc:abc/sh.tangled.repo/abc123", 42 CID: "bafyreiabc", 43 RecordType: "repo", 44 Title: "my-repo", 45 Body: "A test repository", 46 RepoName: "my-repo", 47 } 48 if err := st.UpsertDocument(ctx, doc); err != nil { 49 t.Fatalf("upsert: %v", err) 50 } 51 52 got, err := st.GetDocument(ctx, doc.ID) 53 if err != nil { 54 t.Fatalf("get: %v", err) 55 } 56 if got == nil { 57 t.Fatal("expected document, got nil") 58 } 59 if got.Title != "my-repo" { 60 t.Errorf("title: got %q, want %q", got.Title, "my-repo") 61 } 62 if got.IndexedAt == "" { 63 t.Error("indexed_at should be set by upsert") 64 } 65 }) 66 67 t.Run("upsert is idempotent", func(t *testing.T) { 68 doc := &store.Document{ 69 ID: "did:plc:abc|sh.tangled.repo|abc123", 70 DID: "did:plc:abc", 71 Collection: "sh.tangled.repo", 72 RKey: "abc123", 73 ATURI: "at://did:plc:abc/sh.tangled.repo/abc123", 74 CID: "bafyreiabc2", 75 RecordType: "repo", 76 Title: "my-repo-v2", 77 } 78 if err := st.UpsertDocument(ctx, doc); err != nil { 79 t.Fatalf("upsert: %v", err) 80 } 81 got, err := st.GetDocument(ctx, doc.ID) 82 if err != nil { 83 t.Fatalf("get: %v", err) 84 } 85 if got.Title != "my-repo-v2" { 86 t.Errorf("title: got %q, want %q", got.Title, "my-repo-v2") 87 } 88 if got.CID != "bafyreiabc2" { 89 t.Errorf("cid: got %q, want updated CID", got.CID) 90 } 91 }) 92 93 t.Run("tombstone sets deleted_at", func(t *testing.T) { 94 if err := st.MarkDeleted(ctx, "did:plc:abc|sh.tangled.repo|abc123"); err != nil { 95 t.Fatalf("mark deleted: %v", err) 96 } 97 got, err := st.GetDocument(ctx, "did:plc:abc|sh.tangled.repo|abc123") 98 if err != nil { 99 t.Fatalf("get: %v", err) 100 } 101 if got.DeletedAt == "" { 102 t.Error("deleted_at should be set after tombstone") 103 } 104 }) 105 106 t.Run("get missing document returns nil", func(t *testing.T) { 107 got, err := st.GetDocument(ctx, "nonexistent") 108 if err != nil { 109 t.Fatalf("get: %v", err) 110 } 111 if got != nil { 112 t.Error("expected nil for missing document") 113 } 114 }) 115 116 t.Run("sync state CRUD", func(t *testing.T) { 117 got, err := st.GetSyncState(ctx, "tap-consumer") 118 if err != nil { 119 t.Fatalf("get sync state: %v", err) 120 } 121 if got != nil { 122 t.Error("expected nil for missing sync state") 123 } 124 125 if err := st.SetSyncState(ctx, "tap-consumer", "cursor-001"); err != nil { 126 t.Fatalf("set sync state: %v", err) 127 } 128 129 got, err = st.GetSyncState(ctx, "tap-consumer") 130 if err != nil { 131 t.Fatalf("get sync state: %v", err) 132 } 133 if got == nil { 134 t.Fatal("expected sync state, got nil") 135 } 136 if got.Cursor != "cursor-001" { 137 t.Errorf("cursor: got %q, want %q", got.Cursor, "cursor-001") 138 } 139 140 if err := st.SetSyncState(ctx, "tap-consumer", "cursor-002"); err != nil { 141 t.Fatalf("update sync state: %v", err) 142 } 143 got, err = st.GetSyncState(ctx, "tap-consumer") 144 if err != nil { 145 t.Fatalf("get: %v", err) 146 } 147 if got.Cursor != "cursor-002" { 148 t.Errorf("cursor after update: got %q, want %q", got.Cursor, "cursor-002") 149 } 150 }) 151 152 t.Run("record state upsert", func(t *testing.T) { 153 uri := "at://did:plc:abc/sh.tangled.repo.issue/1" 154 if err := st.UpdateRecordState(ctx, uri, "open"); err != nil { 155 t.Fatalf("update record state: %v", err) 156 } 157 if err := st.UpdateRecordState(ctx, uri, "closed"); err != nil { 158 t.Fatalf("update record state to closed: %v", err) 159 } 160 }) 161 162 t.Run("identity handle upsert and get", func(t *testing.T) { 163 const did = "did:plc:identity1" 164 165 handle, err := st.GetIdentityHandle(ctx, did) 166 if err != nil { 167 t.Fatalf("get missing identity handle: %v", err) 168 } 169 if handle != "" { 170 t.Fatalf("expected empty missing handle, got %q", handle) 171 } 172 173 if err := st.UpsertIdentityHandle(ctx, did, "alice.tangled.org", true, "active"); err != nil { 174 t.Fatalf("upsert identity handle: %v", err) 175 } 176 177 handle, err = st.GetIdentityHandle(ctx, did) 178 if err != nil { 179 t.Fatalf("get identity handle: %v", err) 180 } 181 if handle != "alice.tangled.org" { 182 t.Fatalf("handle: got %q, want %q", handle, "alice.tangled.org") 183 } 184 185 if err := st.UpsertIdentityHandle(ctx, did, "alice2.tangled.org", false, "inactive"); err != nil { 186 t.Fatalf("update identity handle: %v", err) 187 } 188 189 handle, err = st.GetIdentityHandle(ctx, did) 190 if err != nil { 191 t.Fatalf("get identity handle after update: %v", err) 192 } 193 if handle != "alice2.tangled.org" { 194 t.Fatalf("handle after update: got %q, want %q", handle, "alice2.tangled.org") 195 } 196 }) 197 198 t.Run("follow subject discovery query", func(t *testing.T) { 199 followDoc := &store.Document{ 200 ID: "did:plc:owner|sh.tangled.graph.follow|f1", 201 DID: "did:plc:owner", 202 Collection: "sh.tangled.graph.follow", 203 RKey: "f1", 204 ATURI: "at://did:plc:owner/sh.tangled.graph.follow/f1", 205 CID: "cid-follow", 206 RecordType: "follow", 207 RepoDID: "did:plc:target", 208 } 209 if err := st.UpsertDocument(ctx, followDoc); err != nil { 210 t.Fatalf("upsert follow doc: %v", err) 211 } 212 213 subjects, err := st.GetFollowSubjects(ctx, "did:plc:owner") 214 if err != nil { 215 t.Fatalf("get follow subjects: %v", err) 216 } 217 if len(subjects) != 1 || subjects[0] != "did:plc:target" { 218 t.Fatalf("subjects: got %#v", subjects) 219 } 220 }) 221 222 t.Run("repo collaborator discovery query", func(t *testing.T) { 223 docs := []*store.Document{ 224 { 225 ID: "did:plc:collab1|sh.tangled.repo.issue|i1", 226 DID: "did:plc:collab1", 227 Collection: "sh.tangled.repo.issue", 228 RKey: "i1", 229 ATURI: "at://did:plc:collab1/sh.tangled.repo.issue/i1", 230 CID: "cid-c1", 231 RecordType: "issue", 232 RepoDID: "did:plc:owner", 233 }, 234 { 235 ID: "did:plc:collab2|sh.tangled.repo.pull.comment|pc1", 236 DID: "did:plc:collab2", 237 Collection: "sh.tangled.repo.pull.comment", 238 RKey: "pc1", 239 ATURI: "at://did:plc:collab2/sh.tangled.repo.pull.comment/pc1", 240 CID: "cid-c2", 241 RecordType: "pull_comment", 242 RepoDID: "did:plc:owner", 243 }, 244 { 245 ID: "did:plc:owner|sh.tangled.repo.issue|i-owner", 246 DID: "did:plc:owner", 247 Collection: "sh.tangled.repo.issue", 248 RKey: "i-owner", 249 ATURI: "at://did:plc:owner/sh.tangled.repo.issue/i-owner", 250 CID: "cid-owner", 251 RecordType: "issue", 252 RepoDID: "did:plc:owner", 253 }, 254 } 255 for _, doc := range docs { 256 if err := st.UpsertDocument(ctx, doc); err != nil { 257 t.Fatalf("upsert collaborator doc %s: %v", doc.ID, err) 258 } 259 } 260 261 collaborators, err := st.GetRepoCollaborators(ctx, "did:plc:owner") 262 if err != nil { 263 t.Fatalf("get collaborators: %v", err) 264 } 265 if len(collaborators) != 2 { 266 t.Fatalf("collaborators length: got %d want 2 (%#v)", len(collaborators), collaborators) 267 } 268 got := map[string]bool{} 269 for _, did := range collaborators { 270 got[did] = true 271 } 272 if !got["did:plc:collab1"] || !got["did:plc:collab2"] { 273 t.Fatalf("collaborators: got %#v", collaborators) 274 } 275 }) 276 277 t.Run("jetstream events insert and list", func(t *testing.T) { 278 evt := &store.JetstreamEvent{ 279 TimeUS: 1_000_000, 280 DID: "did:plc:evt1", 281 Kind: "commit", 282 Collection: "sh.tangled.repo", 283 RKey: "rkey1", 284 Operation: "create", 285 Payload: `{"did":"did:plc:evt1","time_us":1000000,"kind":"commit"}`, 286 ReceivedAt: "2026-01-01T00:00:00Z", 287 } 288 if err := st.InsertJetstreamEvent(ctx, evt, 500); err != nil { 289 t.Fatalf("insert jetstream event: %v", err) 290 } 291 292 events, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{Limit: 10}) 293 if err != nil { 294 t.Fatalf("list jetstream events: %v", err) 295 } 296 if len(events) < 1 { 297 t.Fatal("expected at least one jetstream event") 298 } 299 found := false 300 for _, e := range events { 301 if e.DID == "did:plc:evt1" && e.Collection == "sh.tangled.repo" { 302 found = true 303 if e.Operation != "create" { 304 t.Errorf("operation: got %q, want create", e.Operation) 305 } 306 if e.TimeUS != 1_000_000 { 307 t.Errorf("time_us: got %d, want 1000000", e.TimeUS) 308 } 309 } 310 } 311 if !found { 312 t.Error("inserted event not found in list") 313 } 314 }) 315 316 t.Run("jetstream events filter by collection", func(t *testing.T) { 317 evt := &store.JetstreamEvent{ 318 TimeUS: 2_000_000, 319 DID: "did:plc:evt2", 320 Kind: "commit", 321 Collection: "sh.tangled.repo.issue", 322 RKey: "rkey2", 323 Operation: "create", 324 Payload: `{"did":"did:plc:evt2","time_us":2000000,"kind":"commit"}`, 325 ReceivedAt: "2026-01-01T00:00:01Z", 326 } 327 if err := st.InsertJetstreamEvent(ctx, evt, 500); err != nil { 328 t.Fatalf("insert jetstream event: %v", err) 329 } 330 331 events, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{ 332 Collection: "sh.tangled.repo.issue", 333 Limit: 10, 334 }) 335 if err != nil { 336 t.Fatalf("list with collection filter: %v", err) 337 } 338 for _, e := range events { 339 if e.Collection != "sh.tangled.repo.issue" { 340 t.Errorf("filter leaked event with collection %q", e.Collection) 341 } 342 } 343 if len(events) == 0 { 344 t.Error("expected at least one event matching collection filter") 345 } 346 }) 347 348 t.Run("jetstream events bounded by maxEvents", func(t *testing.T) { 349 // Insert 5 events with max=3; only the 3 most recent should survive. 350 for i := int64(1); i <= 5; i++ { 351 e := &store.JetstreamEvent{ 352 TimeUS: 100 + i, 353 DID: "did:plc:bound", 354 Kind: "commit", 355 Collection: "sh.tangled.repo", 356 RKey: "rkey-bound", 357 Operation: "create", 358 Payload: `{}`, 359 ReceivedAt: "2026-01-01T00:00:00Z", 360 } 361 if err := st.InsertJetstreamEvent(ctx, e, 3); err != nil { 362 t.Fatalf("insert bounded event %d: %v", i, err) 363 } 364 } 365 366 all, err := st.ListJetstreamEvents(ctx, store.JetstreamEventFilter{ 367 DID: "did:plc:bound", 368 Limit: 100, 369 }) 370 if err != nil { 371 t.Fatalf("list bounded events: %v", err) 372 } 373 // After inserting 5 events total (including the one from the previous subtests), 374 // maxEvents=3 means at most 3 rows survive across all events. 375 if len(all) > 3 { 376 t.Errorf("expected at most 3 events after bounding, got %d", len(all)) 377 } 378 }) 379 380 t.Run("indexing jobs enqueue claim retry complete", func(t *testing.T) { 381 job := store.IndexingJobInput{ 382 DocumentID: "did:plc:owner|sh.tangled.repo|repo1", 383 DID: "did:plc:owner", 384 Collection: "sh.tangled.repo", 385 RKey: "repo1", 386 CID: "cid-repo1", 387 RecordJSON: `{"name":"repo1"}`, 388 Source: store.IndexSourceReadThrough, 389 } 390 if err := st.EnqueueIndexingJob(ctx, job); err != nil { 391 t.Fatalf("enqueue indexing job: %v", err) 392 } 393 if err := st.EnqueueIndexingJob(ctx, job); err != nil { 394 t.Fatalf("enqueue indexing job second call: %v", err) 395 } 396 397 claimed, err := st.ClaimIndexingJob(ctx, "worker-a", time.Now().Add(time.Minute).Format(time.RFC3339)) 398 if err != nil { 399 t.Fatalf("claim indexing job: %v", err) 400 } 401 if claimed == nil { 402 t.Fatal("expected claimed indexing job") 403 } 404 if claimed.DocumentID != job.DocumentID { 405 t.Fatalf("claimed document id: got %q want %q", claimed.DocumentID, job.DocumentID) 406 } 407 408 if err := st.RetryIndexingJob(ctx, job.DocumentID, "9999-12-31T23:59:59Z", "boom"); err != nil { 409 t.Fatalf("retry indexing job: %v", err) 410 } 411 412 none, err := st.ClaimIndexingJob(ctx, "worker-b", time.Now().Add(time.Minute).Format(time.RFC3339)) 413 if err != nil { 414 t.Fatalf("claim delayed indexing job: %v", err) 415 } 416 if none != nil { 417 t.Fatalf("expected no claim before schedule time, got %#v", none) 418 } 419 420 if err := st.RetryIndexingJob(ctx, job.DocumentID, "1970-01-01T00:00:00Z", "retry-now"); err != nil { 421 t.Fatalf("retry indexing job now: %v", err) 422 } 423 424 claimed, err = st.ClaimIndexingJob(ctx, "worker-c", time.Now().Add(time.Minute).Format(time.RFC3339)) 425 if err != nil { 426 t.Fatalf("claim retried indexing job: %v", err) 427 } 428 if claimed == nil { 429 t.Fatal("expected claimed retried indexing job") 430 } 431 432 if err := st.CompleteIndexingJob(ctx, job.DocumentID); err != nil { 433 t.Fatalf("complete indexing job: %v", err) 434 } 435 436 claimed, err = st.ClaimIndexingJob(ctx, "worker-d", time.Now().Add(time.Minute).Format(time.RFC3339)) 437 if err != nil { 438 t.Fatalf("claim after complete: %v", err) 439 } 440 if claimed != nil { 441 t.Fatalf("expected no job after complete, got %#v", claimed) 442 } 443 444 got, err := st.GetIndexingJob(ctx, job.DocumentID) 445 if err != nil { 446 t.Fatalf("get completed job: %v", err) 447 } 448 if got == nil || got.Status != store.IndexingJobCompleted { 449 t.Fatalf("expected completed job row, got %#v", got) 450 } 451 }) 452 453 t.Run("indexing claim is single winner", func(t *testing.T) { 454 job := store.IndexingJobInput{ 455 DocumentID: "did:plc:owner|sh.tangled.repo|repo2", 456 DID: "did:plc:owner", 457 Collection: "sh.tangled.repo", 458 RKey: "repo2", 459 CID: "cid-repo2", 460 RecordJSON: `{"name":"repo2"}`, 461 Source: store.IndexSourceReadThrough, 462 } 463 if err := st.EnqueueIndexingJob(ctx, job); err != nil { 464 t.Fatalf("enqueue job: %v", err) 465 } 466 467 var wg sync.WaitGroup 468 results := make(chan *store.IndexingJob, 2) 469 errs := make(chan error, 2) 470 for _, worker := range []string{"worker-1", "worker-2"} { 471 wg.Add(1) 472 go func(worker string) { 473 defer wg.Done() 474 claimed, err := st.ClaimIndexingJob( 475 ctx, worker, time.Now().Add(time.Minute).Format(time.RFC3339), 476 ) 477 errs <- err 478 results <- claimed 479 }(worker) 480 } 481 wg.Wait() 482 close(results) 483 close(errs) 484 485 claimedCount := 0 486 for err := range errs { 487 if err != nil { 488 t.Fatalf("claim concurrent job: %v", err) 489 } 490 } 491 for claimed := range results { 492 if claimed != nil { 493 claimedCount++ 494 } 495 } 496 if claimedCount != 1 { 497 t.Fatalf("expected exactly one claimant, got %d", claimedCount) 498 } 499 }) 500 501 t.Run("indexing audit and stats", func(t *testing.T) { 502 if err := st.AppendIndexingAudit(ctx, store.IndexingAuditInput{ 503 Source: store.IndexSourceReadThrough, 504 DocumentID: "doc-audit", 505 Collection: "sh.tangled.repo", 506 CID: "cid-audit", 507 Decision: "enqueued", 508 }); err != nil { 509 t.Fatalf("append indexing audit: %v", err) 510 } 511 stats, err := st.GetIndexingJobStats(ctx) 512 if err != nil { 513 t.Fatalf("get indexing stats: %v", err) 514 } 515 if stats.Completed < 1 { 516 t.Fatalf("expected completed jobs in stats, got %#v", stats) 517 } 518 entries, err := st.ListIndexingAudit(ctx, store.IndexingAuditFilter{DocumentID: "doc-audit"}) 519 if err != nil { 520 t.Fatalf("list indexing audit: %v", err) 521 } 522 if len(entries) != 1 || entries[0].Decision != "enqueued" { 523 t.Fatalf("unexpected audit rows: %#v", entries) 524 } 525 }) 526}