A community based topic aggregation platform built on atproto

fix(tests): update tests for identity event update-only behavior

The Jetstream identity event consumer was intentionally changed to only
UPDATE existing users, not create new ones. This prevents indexing
millions of Bluesky users who never interact with Coves. Users are now
indexed during:
- OAuth login
- Signup (via RegisterAccount.IndexUser())

Test fixes:
- integration/jetstream_consumer_test.go: Pre-create users before
testing identity event handling; renamed tests to reflect behavior
- integration/community_e2e_test.go: Add test data cleanup to prevent
cross-test pollution affecting alphabetical sort test
- integration/user_test.go: Add comprehensive test data cleanup
(subscriptions, posts, communities) in setupTestDB
- e2e/error_recovery_test.go: Pre-create users in all identity event
tests (reconnection, malformed events, PDS unavailability, etc.)
- e2e/user_signup_test.go: Query AppView API instead of test database
to verify user creation; removed unused Jetstream consumer setup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+82 -17
tests/e2e/error_recovery_test.go
··· 86 86 t.Run("Events processed successfully after connection", func(t *testing.T) { 87 87 // Even though we can't easily test WebSocket reconnection in unit tests, 88 88 // we can verify that events are processed correctly after establishing connection 89 - consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 90 89 ctx := context.Background() 91 90 91 + // Pre-create user - identity events only update existing users 92 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 93 + DID: "did:plc:reconnect123", 94 + Handle: "reconnect.old.test", 95 + PDSURL: "http://localhost:3001", 96 + }) 97 + if err != nil { 98 + t.Fatalf("Failed to create test user: %v", err) 99 + } 100 + 101 + consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 102 + 103 + // Send identity event with new handle 92 104 event := jetstream.JetstreamEvent{ 93 105 Did: "did:plc:reconnect123", 94 106 Kind: "identity", ··· 100 112 }, 101 113 } 102 114 103 - err := consumer.HandleIdentityEventPublic(ctx, &event) 115 + err = consumer.HandleIdentityEventPublic(ctx, &event) 104 116 if err != nil { 105 117 t.Fatalf("Failed to process event: %v", err) 106 118 } ··· 217 229 218 230 // Verify consumer can still process valid events after malformed ones 219 231 t.Run("Valid event after malformed events", func(t *testing.T) { 220 - consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 221 232 ctx := context.Background() 222 233 234 + // Pre-create user - identity events only update existing users 235 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 236 + DID: "did:plc:recovery123", 237 + Handle: "recovery.old.test", 238 + PDSURL: "http://localhost:3001", 239 + }) 240 + if err != nil { 241 + t.Fatalf("Failed to create test user: %v", err) 242 + } 243 + 244 + consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 245 + 246 + // Send valid identity event with new handle 223 247 validEvent := jetstream.JetstreamEvent{ 224 248 Did: "did:plc:recovery123", 225 249 Kind: "identity", ··· 231 255 }, 232 256 } 233 257 234 - err := consumer.HandleIdentityEventPublic(ctx, &validEvent) 258 + err = consumer.HandleIdentityEventPublic(ctx, &validEvent) 235 259 if err != nil { 236 260 t.Fatalf("Failed to process valid event after malformed events: %v", err) 237 261 } 238 262 239 - // Verify user was indexed 263 + // Verify user handle was updated 240 264 user, err := userService.GetUserByDID(ctx, "did:plc:recovery123") 241 265 if err != nil { 242 - t.Fatalf("User not indexed after malformed events: %v", err) 266 + t.Fatalf("User not found after valid event: %v", err) 243 267 } 244 268 245 269 if user.Handle != "recovery.test" { ··· 362 386 ctx := context.Background() 363 387 364 388 t.Run("Indexing continues during PDS unavailability", func(t *testing.T) { 365 - // Even though PDS is "unavailable", we can still index events from Jetstream 389 + // Even though PDS is "unavailable", we can still update events from Jetstream 366 390 // because we don't need to contact PDS for identity events 391 + 392 + // Pre-create user - identity events only update existing users 393 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 394 + DID: "did:plc:pdsfail123", 395 + Handle: "pdsfail.old.test", 396 + PDSURL: mockPDS.URL, 397 + }) 398 + if err != nil { 399 + t.Fatalf("Failed to create test user: %v", err) 400 + } 401 + 367 402 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 368 403 369 404 event := jetstream.JetstreamEvent{ ··· 377 412 }, 378 413 } 379 414 380 - err := consumer.HandleIdentityEventPublic(ctx, &event) 415 + err = consumer.HandleIdentityEventPublic(ctx, &event) 381 416 if err != nil { 382 - t.Fatalf("Failed to index event during PDS unavailability: %v", err) 417 + t.Fatalf("Failed to process event during PDS unavailability: %v", err) 383 418 } 384 419 385 - // Verify user was indexed 420 + // Verify user handle was updated 386 421 user, err := userService.GetUserByDID(ctx, "did:plc:pdsfail123") 387 422 if err != nil { 388 423 t.Fatalf("Failed to get user during PDS unavailability: %v", err) ··· 392 427 t.Errorf("Expected handle pdsfail.test, got %s", user.Handle) 393 428 } 394 429 395 - t.Log("✓ Indexing continues successfully even when PDS is unavailable") 430 + t.Log("✓ Handle updates continue successfully even when PDS is unavailable") 396 431 }) 397 432 398 433 t.Run("System recovers when PDS comes back online", func(t *testing.T) { 399 434 // Mark PDS as available again 400 435 shouldFail.Store(false) 401 436 437 + // Pre-create user - identity events only update existing users 438 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 439 + DID: "did:plc:pdsrecovery123", 440 + Handle: "pdsrecovery.old.test", 441 + PDSURL: mockPDS.URL, 442 + }) 443 + if err != nil { 444 + t.Fatalf("Failed to create test user: %v", err) 445 + } 446 + 402 447 // Now operations that require PDS should work 403 448 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 404 449 ··· 413 458 }, 414 459 } 415 460 416 - err := consumer.HandleIdentityEventPublic(ctx, &event) 461 + err = consumer.HandleIdentityEventPublic(ctx, &event) 417 462 if err != nil { 418 - t.Fatalf("Failed to index event after PDS recovery: %v", err) 463 + t.Fatalf("Failed to process event after PDS recovery: %v", err) 419 464 } 420 465 421 466 user, err := userService.GetUserByDID(ctx, "did:plc:pdsrecovery123") ··· 449 494 t.Run("Handle updates arriving out of order", func(t *testing.T) { 450 495 did := "did:plc:outoforder123" 451 496 497 + // Pre-create user - identity events only update existing users 498 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 499 + DID: did, 500 + Handle: "initial.handle", 501 + PDSURL: "http://localhost:3001", 502 + }) 503 + if err != nil { 504 + t.Fatalf("Failed to create test user: %v", err) 505 + } 506 + 452 507 // Event 3: Latest handle 453 508 event3 := jetstream.JetstreamEvent{ 454 509 Did: did, ··· 511 566 }) 512 567 513 568 t.Run("Duplicate events at different times", func(t *testing.T) { 514 - did := "did:plc:duplicate123" 569 + did := "did:plc:dupevents123" 515 570 516 - // Create user 571 + // Pre-create user - identity events only update existing users 572 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 573 + DID: did, 574 + Handle: "duplicate.handle", 575 + PDSURL: "http://localhost:3001", 576 + }) 577 + if err != nil { 578 + t.Fatalf("Failed to create test user: %v", err) 579 + } 580 + 581 + // Send identity event 517 582 event1 := jetstream.JetstreamEvent{ 518 583 Did: did, 519 584 Kind: "identity", ··· 525 590 }, 526 591 } 527 592 528 - err := consumer.HandleIdentityEventPublic(ctx, &event1) 593 + err = consumer.HandleIdentityEventPublic(ctx, &event1) 529 594 if err != nil { 530 595 t.Fatalf("Failed to process first event: %v", err) 531 596 } ··· 536 601 t.Fatalf("Failed to process duplicate event: %v", err) 537 602 } 538 603 539 - // Verify still only one user 604 + // Verify still only one user with same handle 540 605 user, err := userService.GetUserByDID(ctx, did) 541 606 if err != nil { 542 607 t.Fatalf("Failed to get user: %v", err)
+66 -96
tests/e2e/user_signup_test.go
··· 1 1 package e2e 2 2 3 3 import ( 4 - "Coves/internal/atproto/identity" 5 - "Coves/internal/atproto/jetstream" 6 - "Coves/internal/core/users" 7 - "Coves/internal/db/postgres" 8 4 "bytes" 9 - "context" 10 5 "database/sql" 11 6 "encoding/json" 12 7 "fmt" ··· 33 28 } 34 29 35 30 // TestE2E_UserSignup tests the full user signup flow: 36 - // Third-party client → social.coves.actor.signup XRPC → PDS account creation → Jetstream → AppView indexing 31 + // Third-party client → social.coves.actor.signup XRPC → PDS account creation + AppView indexing 37 32 // 38 33 // This tests the same code path that a third-party client or UI would use. 34 + // Users are indexed directly by the signup endpoint (not via Jetstream). 35 + // Jetstream is only used for handle changes on existing users. 39 36 // 40 37 // Prerequisites: 41 38 // - AppView running on localhost:8081 42 39 // - PDS running on localhost:3001 43 - // - Jetstream running on localhost:6008 (consuming from PDS) 44 - // - Test database on localhost:5434 40 + // - Jetstream running on localhost:6008 (for handle change events, not required for signup) 45 41 // 46 42 // Run with: 47 43 // 48 44 // make e2e-up # Start infrastructure 49 45 // go run ./cmd/server & # Start AppView 50 - // go test ./tests/integration -run TestE2E_UserSignup -v 46 + // go test ./tests/e2e -run TestE2E_UserSignup -v 51 47 func TestE2E_UserSignup(t *testing.T) { 52 48 if testing.Short() { 53 49 t.Skip("Skipping E2E test in short mode") ··· 63 59 t.Skip("PDS not available at localhost:3001 - run 'make e2e-up' first") 64 60 } 65 61 66 - // Check if Jetstream is available 62 + // Check if Jetstream is available (needed for full E2E infrastructure) 67 63 if !isJetstreamAvailable(t) { 68 64 t.Skip("Jetstream not available at localhost:6008 - run 'make e2e-up' first") 69 65 } 70 66 71 - db := setupTestDB(t) 72 - defer func() { 73 - if err := db.Close(); err != nil { 74 - t.Logf("Failed to close database: %v", err) 75 - } 76 - }() 77 - 78 - // Set up services 79 - userRepo := postgres.NewUserRepository(db) 80 - resolver := identity.NewResolver(db, identity.DefaultConfig()) 81 - userService := users.NewUserService(userRepo, resolver, "http://localhost:3001") 82 - 83 - // Start Jetstream consumer 84 - consumer := jetstream.NewUserEventConsumer( 85 - userService, 86 - resolver, 87 - "ws://localhost:6008/subscribe", 88 - "", // No PDS filter 89 - ) 90 - 91 - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) 92 - defer cancel() 93 - 94 - // Start consumer in background 95 - consumerDone := make(chan error, 1) 96 - go func() { 97 - consumerDone <- consumer.Start(ctx) 98 - }() 99 - 100 - // Give Jetstream consumer a moment to connect (no need to wait long) 101 - t.Log("Waiting for Jetstream consumer to connect...") 102 - time.Sleep(500 * time.Millisecond) 103 - 104 67 // Test 1: Create account on PDS 105 68 t.Run("Create account on PDS and verify indexing", func(t *testing.T) { 106 69 handle := fmt.Sprintf("alice-%d.local.coves.dev", time.Now().Unix()) ··· 108 71 109 72 t.Logf("Creating account: %s", handle) 110 73 111 - // Create account via UserService (what UI would call) 112 - did, err := createPDSAccount(t, userService, handle, email, "test1234") 74 + // Create account via AppView signup endpoint (what UI would call) 75 + did, err := createPDSAccount(t, handle, email, "test1234") 113 76 if err != nil { 114 77 t.Fatalf("Failed to create PDS account: %v", err) 115 78 } 116 79 117 80 t.Logf("Account created with DID: %s", did) 118 81 119 - // Wait for Jetstream to process and AppView to index (with retry) 120 - t.Log("Waiting for Jetstream → AppView indexing...") 121 - var user *users.User 82 + // Verify user was indexed via AppView API (signup indexes immediately) 83 + t.Log("Verifying user via AppView API...") 84 + var userDID, userHandle string 122 85 deadline := time.Now().Add(10 * time.Second) 123 86 for time.Now().Before(deadline) { 124 - user, err = userService.GetUserByDID(ctx, did) 87 + userDID, userHandle, err = getProfileViaAPI(did) 125 88 if err == nil { 126 - break // Successfully indexed! 89 + break // Successfully found! 127 90 } 128 91 time.Sleep(500 * time.Millisecond) 129 92 } 130 93 if err != nil { 131 - t.Fatalf("User not indexed in AppView after 10s: %v", err) 94 + t.Fatalf("User not found in AppView after 10s: %v", err) 132 95 } 133 96 134 - if user.Handle != handle { 135 - t.Errorf("Expected handle %s, got %s", handle, user.Handle) 97 + if userHandle != handle { 98 + t.Errorf("Expected handle %s, got %s", handle, userHandle) 136 99 } 137 100 138 - if user.DID != did { 139 - t.Errorf("Expected DID %s, got %s", did, user.DID) 101 + if userDID != did { 102 + t.Errorf("Expected DID %s, got %s", did, userDID) 140 103 } 141 104 142 105 t.Logf("✅ User successfully indexed: %s → %s", handle, did) 143 106 }) 144 107 145 - // Test 2: Idempotency 108 + // Test 2: Idempotency (verify same user from multiple API calls) 146 109 t.Run("Idempotent indexing on duplicate events", func(t *testing.T) { 147 110 handle := fmt.Sprintf("bob-%d.local.coves.dev", time.Now().Unix()) 148 111 email := fmt.Sprintf("bob-%d@test.com", time.Now().Unix()) 149 112 150 - // Create account via UserService 151 - did, err := createPDSAccount(t, userService, handle, email, "test1234") 113 + // Create account via AppView signup endpoint 114 + did, err := createPDSAccount(t, handle, email, "test1234") 152 115 if err != nil { 153 116 t.Fatalf("Failed to create PDS account: %v", err) 154 117 } 155 118 156 - // Wait for indexing (with retry) 157 - var user1 *users.User 119 + // Wait for indexing via AppView API 120 + var userDID1 string 158 121 deadline := time.Now().Add(10 * time.Second) 159 122 for time.Now().Before(deadline) { 160 - user1, err = userService.GetUserByDID(ctx, did) 123 + userDID1, _, err = getProfileViaAPI(did) 161 124 if err == nil { 162 125 break 163 126 } 164 127 time.Sleep(500 * time.Millisecond) 165 128 } 166 129 if err != nil { 167 - t.Fatalf("User not indexed after 10s: %v", err) 130 + t.Fatalf("User not found after 10s: %v", err) 168 131 } 169 132 170 - // Manually trigger duplicate indexing (simulates Jetstream replay) 171 - _, err = userService.CreateUser(ctx, users.CreateUserRequest{ 172 - DID: did, 173 - Handle: handle, 174 - PDSURL: "http://localhost:3001", 175 - }) 133 + // Query again - should get same user 134 + userDID2, _, err := getProfileViaAPI(did) 176 135 if err != nil { 177 - t.Fatalf("Idempotent CreateUser should not error: %v", err) 136 + t.Fatalf("Failed to get user on second query: %v", err) 178 137 } 179 138 180 - // Verify still only one user 181 - user2, err := userService.GetUserByDID(ctx, did) 182 - if err != nil { 183 - t.Fatalf("Failed to get user after duplicate: %v", err) 184 - } 185 - 186 - if user1.CreatedAt != user2.CreatedAt { 187 - t.Errorf("Duplicate indexing created new user (timestamps differ)") 139 + if userDID1 != userDID2 { 140 + t.Errorf("Got different DIDs on repeated queries: %s vs %s", userDID1, userDID2) 188 141 } 189 142 190 - t.Logf("✅ Idempotency verified: duplicate events handled gracefully") 143 + t.Logf("✅ Idempotency verified: repeated queries return same user") 191 144 }) 192 145 193 146 // Test 3: Multiple users 194 147 t.Run("Index multiple users concurrently", func(t *testing.T) { 195 148 const numUsers = 3 196 149 dids := make([]string, numUsers) 150 + handles := make([]string, numUsers) 197 151 198 152 for i := 0; i < numUsers; i++ { 199 153 handle := fmt.Sprintf("user%d-%d.local.coves.dev", i, time.Now().Unix()) 200 154 email := fmt.Sprintf("user%d-%d@test.com", i, time.Now().Unix()) 201 155 202 - did, err := createPDSAccount(t, userService, handle, email, "test1234") 156 + did, err := createPDSAccount(t, handle, email, "test1234") 203 157 if err != nil { 204 158 t.Fatalf("Failed to create account %d: %v", i, err) 205 159 } 206 160 dids[i] = did 161 + handles[i] = handle 207 162 t.Logf("Created user %d: %s", i, did) 208 163 209 164 // Small delay between creations 210 165 time.Sleep(500 * time.Millisecond) 211 166 } 212 167 213 - // Verify all indexed (with retry for each user) 168 + // Verify all indexed via AppView API (with retry for each user) 214 169 t.Log("Waiting for all users to be indexed...") 215 170 for i, did := range dids { 216 - var user *users.User 171 + var userHandle string 217 172 var err error 218 173 deadline := time.Now().Add(15 * time.Second) 219 174 for time.Now().Before(deadline) { 220 - user, err = userService.GetUserByDID(ctx, did) 175 + _, userHandle, err = getProfileViaAPI(did) 221 176 if err == nil { 222 177 break 223 178 } 224 179 time.Sleep(500 * time.Millisecond) 225 180 } 226 181 if err != nil { 227 - t.Errorf("User %d not indexed after 15s: %v", i, err) 182 + t.Errorf("User %d not found after 15s: %v", i, err) 228 183 continue 229 184 } 230 - t.Logf("✅ User %d indexed: %s", i, user.Handle) 185 + t.Logf("✅ User %d indexed: %s", i, userHandle) 231 186 } 232 187 }) 233 - 234 - // Clean shutdown 235 - cancel() 236 - select { 237 - case err := <-consumerDone: 238 - if err != nil && err != context.Canceled { 239 - t.Logf("Consumer stopped with error: %v", err) 240 - } 241 - case <-time.After(5 * time.Second): 242 - t.Log("Consumer shutdown timeout") 243 - } 244 188 } 245 189 246 190 // generateInviteCode generates a single-use invite code via PDS admin API ··· 295 239 296 240 // createPDSAccount creates an account via the coves.user.signup XRPC endpoint 297 241 // This is the same code path that a third-party client or UI would use 298 - func createPDSAccount(t *testing.T, userService users.UserService, handle, email, password string) (string, error) { 242 + func createPDSAccount(t *testing.T, handle, email, password string) (string, error) { 299 243 // Generate fresh invite code for each account 300 244 inviteCode, err := generateInviteCode(t) 301 245 if err != nil { ··· 433 377 434 378 return db 435 379 } 380 + 381 + // getProfileViaAPI queries the AppView API to get a user profile by DID 382 + func getProfileViaAPI(did string) (string, string, error) { 383 + resp, err := http.Get(fmt.Sprintf("http://localhost:8081/xrpc/social.coves.actor.getprofile?actor=%s", did)) 384 + if err != nil { 385 + return "", "", fmt.Errorf("failed to call getprofile: %w", err) 386 + } 387 + defer func() { _ = resp.Body.Close() }() 388 + 389 + if resp.StatusCode != http.StatusOK { 390 + return "", "", fmt.Errorf("getprofile returned status %d", resp.StatusCode) 391 + } 392 + 393 + var result struct { 394 + DID string `json:"did"` 395 + Profile struct { 396 + Handle string `json:"handle"` 397 + } `json:"profile"` 398 + } 399 + 400 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 401 + return "", "", fmt.Errorf("failed to decode response: %w", err) 402 + } 403 + 404 + return result.DID, result.Profile.Handle, nil 405 + }
+14
tests/integration/community_e2e_test.go
··· 68 68 t.Fatalf("Failed to run migrations: %v", migrateErr) 69 69 } 70 70 71 + // Clean up test data from previous runs (order matters due to FK constraints) 72 + // Delete subscriptions first (references communities and users) 73 + if _, cleanErr := db.Exec("DELETE FROM subscriptions"); cleanErr != nil { 74 + t.Logf("Warning: Failed to clean up subscriptions: %v", cleanErr) 75 + } 76 + // Delete posts (references communities) 77 + if _, cleanErr := db.Exec("DELETE FROM posts"); cleanErr != nil { 78 + t.Logf("Warning: Failed to clean up posts: %v", cleanErr) 79 + } 80 + // Delete communities 81 + if _, cleanErr := db.Exec("DELETE FROM communities"); cleanErr != nil { 82 + t.Logf("Warning: Failed to clean up communities: %v", cleanErr) 83 + } 84 + 71 85 // Check if PDS is running 72 86 pdsURL := os.Getenv("PDS_URL") 73 87 if pdsURL == "" {
+43 -35
tests/integration/jetstream_consumer_test.go
··· 25 25 26 26 ctx := context.Background() 27 27 28 - t.Run("Index new user from identity event", func(t *testing.T) { 29 - // Simulate an identity event from Jetstream 28 + t.Run("Skip identity event for non-existent user", func(t *testing.T) { 29 + // Identity events for users not in our database should be silently skipped 30 + // Users are only indexed during OAuth login/signup, not from Jetstream events 30 31 event := jetstream.JetstreamEvent{ 31 - Did: "did:plc:jetstream123", 32 + Did: "did:plc:nonexistent123", 32 33 Kind: "identity", 33 34 Identity: &jetstream.IdentityEvent{ 34 - Did: "did:plc:jetstream123", 35 - Handle: "alice.jetstream.test", 35 + Did: "did:plc:nonexistent123", 36 + Handle: "nonexistent.jetstream.test", 36 37 Seq: 12345, 37 38 Time: time.Now().Format(time.RFC3339), 38 39 }, ··· 40 41 41 42 consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 42 43 43 - // Handle the event 44 + // Handle the event - should return nil (skip silently, not error) 44 45 err := consumer.HandleIdentityEventPublic(ctx, &event) 45 46 if err != nil { 46 - t.Fatalf("failed to handle identity event: %v", err) 47 + t.Fatalf("expected nil error for non-existent user, got: %v", err) 47 48 } 48 49 49 - // Verify user was indexed 50 - user, err := userService.GetUserByDID(ctx, "did:plc:jetstream123") 51 - if err != nil { 52 - t.Fatalf("failed to get indexed user: %v", err) 53 - } 54 - 55 - if user.DID != "did:plc:jetstream123" { 56 - t.Errorf("expected DID did:plc:jetstream123, got %s", user.DID) 57 - } 58 - 59 - if user.Handle != "alice.jetstream.test" { 60 - t.Errorf("expected handle alice.jetstream.test, got %s", user.Handle) 50 + // Verify user was NOT created 51 + _, err = userService.GetUserByDID(ctx, "did:plc:nonexistent123") 52 + if err == nil { 53 + t.Fatal("expected user to NOT be created, but found in database") 61 54 } 62 55 }) 63 56 ··· 103 96 } 104 97 }) 105 98 106 - t.Run("Index multiple users", func(t *testing.T) { 107 - consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 108 - 109 - users := []struct { 110 - did string 111 - handle string 99 + t.Run("Update multiple existing users via identity events", func(t *testing.T) { 100 + // Pre-create users - identity events only update existing users 101 + testUsers := []struct { 102 + did string 103 + oldHandle string 104 + newHandle string 112 105 }{ 113 - {"did:plc:multi1", "user1.test"}, 114 - {"did:plc:multi2", "user2.test"}, 115 - {"did:plc:multi3", "user3.test"}, 106 + {"did:plc:multi1", "user1.old.test", "user1.new.test"}, 107 + {"did:plc:multi2", "user2.old.test", "user2.new.test"}, 108 + {"did:plc:multi3", "user3.old.test", "user3.new.test"}, 109 + } 110 + 111 + // Create users first 112 + for _, u := range testUsers { 113 + _, err := userService.CreateUser(ctx, users.CreateUserRequest{ 114 + DID: u.did, 115 + Handle: u.oldHandle, 116 + PDSURL: "https://bsky.social", 117 + }) 118 + if err != nil { 119 + t.Fatalf("failed to create user %s: %v", u.oldHandle, err) 120 + } 116 121 } 117 122 118 - for _, u := range users { 123 + consumer := jetstream.NewUserEventConsumer(userService, resolver, "", "") 124 + 125 + // Send identity events with new handles 126 + for _, u := range testUsers { 119 127 event := jetstream.JetstreamEvent{ 120 128 Did: u.did, 121 129 Kind: "identity", 122 130 Identity: &jetstream.IdentityEvent{ 123 131 Did: u.did, 124 - Handle: u.handle, 132 + Handle: u.newHandle, 125 133 Seq: 12345, 126 134 Time: time.Now().Format(time.RFC3339), 127 135 }, ··· 129 137 130 138 err := consumer.HandleIdentityEventPublic(ctx, &event) 131 139 if err != nil { 132 - t.Fatalf("failed to index user %s: %v", u.handle, err) 140 + t.Fatalf("failed to handle identity event for %s: %v", u.newHandle, err) 133 141 } 134 142 } 135 143 136 - // Verify all users indexed 137 - for _, u := range users { 144 + // Verify all users have updated handles 145 + for _, u := range testUsers { 138 146 user, err := userService.GetUserByDID(ctx, u.did) 139 147 if err != nil { 140 148 t.Fatalf("user %s not found: %v", u.did, err) 141 149 } 142 150 143 - if user.Handle != u.handle { 144 - t.Errorf("expected handle %s, got %s", u.handle, user.Handle) 151 + if user.Handle != u.newHandle { 152 + t.Errorf("expected handle %s, got %s", u.newHandle, user.Handle) 145 153 } 146 154 } 147 155 })
+18 -2
tests/integration/user_test.go
··· 78 78 t.Fatalf("Failed to run migrations: %v", migrateErr) 79 79 } 80 80 81 - // Clean up any existing test data 81 + // Clean up any existing test data (order matters due to FK constraints) 82 + // Delete subscriptions first (references communities and users) 83 + _, err = db.Exec("DELETE FROM subscriptions") 84 + if err != nil { 85 + t.Logf("Warning: Failed to clean up subscriptions: %v", err) 86 + } 87 + // Delete posts (references communities) 88 + _, err = db.Exec("DELETE FROM posts") 89 + if err != nil { 90 + t.Logf("Warning: Failed to clean up posts: %v", err) 91 + } 92 + // Delete communities 93 + _, err = db.Exec("DELETE FROM communities") 94 + if err != nil { 95 + t.Logf("Warning: Failed to clean up communities: %v", err) 96 + } 97 + // Delete users 82 98 _, err = db.Exec("DELETE FROM users WHERE handle LIKE '%.test'") 83 99 if err != nil { 84 - t.Logf("Warning: Failed to clean up test data: %v", err) 100 + t.Logf("Warning: Failed to clean up test users: %v", err) 85 101 } 86 102 87 103 return db