A community based topic aggregation platform built on atproto
at main 484 lines 16 kB view raw
1package integration 2 3import ( 4 "Coves/internal/api/routes" 5 "Coves/internal/atproto/jetstream" 6 "Coves/internal/atproto/utils" 7 "Coves/internal/core/userblocks" 8 "Coves/internal/db/postgres" 9 "bytes" 10 "context" 11 "encoding/json" 12 "fmt" 13 "io" 14 "net/http" 15 "net/http/httptest" 16 "testing" 17 "time" 18 19 "github.com/go-chi/chi/v5" 20 _ "github.com/lib/pq" 21) 22 23// TestUserBlockE2E_BlockAndUnblock tests the full user block lifecycle with a real PDS. 24// Flow: Client -> XRPC -> PDS Write -> Verify on PDS -> Jetstream -> Consumer -> AppView 25// Then: Client -> XRPC Unblock -> PDS Delete -> Jetstream -> Consumer -> AppView removal 26func TestUserBlockE2E_BlockAndUnblock(t *testing.T) { 27 if testing.Short() { 28 t.Skip("Skipping E2E test in short mode") 29 } 30 31 db := setupTestDB(t) 32 defer func() { _ = db.Close() }() 33 34 ctx := context.Background() 35 pdsURL := getTestPDSURL() 36 37 // Health check PDS 38 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 39 if err != nil { 40 t.Skipf("PDS not running at %s: %v", pdsURL, err) 41 } 42 func() { 43 if closeErr := healthResp.Body.Close(); closeErr != nil { 44 t.Logf("Failed to close health response: %v", closeErr) 45 } 46 }() 47 48 // Clean up user_blocks at the end to avoid polluting other tests 49 defer func() { 50 _, _ = db.Exec("DELETE FROM user_blocks") 51 }() 52 53 // Setup repository 54 blockRepo := postgres.NewUserBlockRepository(db) 55 56 // Create User A (blocker) on real PDS 57 userAHandle := fmt.Sprintf("blka%d.local.coves.dev", time.Now().UnixNano()%1000000) 58 userAEmail := fmt.Sprintf("blockerA-%d@test.local", time.Now().Unix()) 59 userAPassword := "test-password-123" 60 61 t.Logf("Creating User A (blocker) on PDS: %s", userAHandle) 62 pdsAccessTokenA, userADID, err := createPDSAccount(pdsURL, userAHandle, userAEmail, userAPassword) 63 if err != nil { 64 t.Fatalf("Failed to create User A on PDS: %v", err) 65 } 66 t.Logf("User A created: DID=%s", userADID) 67 68 // Create User B (target) on real PDS 69 userBHandle := fmt.Sprintf("blkb%d.local.coves.dev", time.Now().UnixNano()%1000000) 70 userBEmail := fmt.Sprintf("blockerB-%d@test.local", time.Now().Unix()) 71 userBPassword := "test-password-123" 72 73 t.Logf("Creating User B (target) on PDS: %s", userBHandle) 74 _, userBDID, err := createPDSAccount(pdsURL, userBHandle, userBEmail, userBPassword) 75 if err != nil { 76 t.Fatalf("Failed to create User B on PDS: %v", err) 77 } 78 t.Logf("User B created: DID=%s", userBDID) 79 80 // Index both users in AppView 81 createTestUser(t, db, userAHandle, userADID) 82 createTestUser(t, db, userBHandle, userBDID) 83 84 // Setup OAuth middleware with User A's real PDS access token 85 e2eAuth := NewE2EOAuthMiddleware() 86 tokenA := e2eAuth.AddUserWithPDSToken(userADID, pdsAccessTokenA, pdsURL) 87 88 // Setup service with password-based PDS client factory 89 service := userblocks.NewServiceWithPDSFactory(blockRepo, nil, UserBlockPasswordAuthPDSClientFactory()) 90 91 // Setup HTTP server with XRPC routes 92 r := chi.NewRouter() 93 routes.RegisterUserBlockRoutes(r, service, e2eAuth.OAuthAuthMiddleware) 94 httpServer := httptest.NewServer(r) 95 defer httpServer.Close() 96 97 // Setup Jetstream consumer with block repo 98 userConsumer := jetstream.NewUserEventConsumer(nil, nil, "", "", 99 jetstream.WithUserBlockRepo(blockRepo)) 100 101 // ==================================================================================== 102 // BLOCK PHASE: User A blocks User B 103 // ==================================================================================== 104 t.Logf("\n--- BLOCK PHASE: User A blocks User B ---") 105 106 blockReq := map[string]interface{}{ 107 "subject": userBDID, 108 } 109 110 reqBody, marshalErr := json.Marshal(blockReq) 111 if marshalErr != nil { 112 t.Fatalf("Failed to marshal block request: %v", marshalErr) 113 } 114 115 req, err := http.NewRequest(http.MethodPost, 116 httpServer.URL+"/xrpc/social.coves.actor.blockUser", 117 bytes.NewBuffer(reqBody)) 118 if err != nil { 119 t.Fatalf("Failed to create block request: %v", err) 120 } 121 req.Header.Set("Content-Type", "application/json") 122 req.Header.Set("Authorization", "Bearer "+tokenA) 123 124 resp, err := http.DefaultClient.Do(req) 125 if err != nil { 126 t.Fatalf("Failed to POST block: %v", err) 127 } 128 defer func() { _ = resp.Body.Close() }() 129 130 if resp.StatusCode != http.StatusOK { 131 body, readErr := io.ReadAll(resp.Body) 132 if readErr != nil { 133 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr) 134 } 135 t.Logf("XRPC Block Failed") 136 t.Logf(" Status: %d", resp.StatusCode) 137 t.Logf(" Response: %s", string(body)) 138 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body)) 139 } 140 141 var blockResp struct { 142 Block struct { 143 RecordURI string `json:"recordUri"` 144 RecordCID string `json:"recordCid"` 145 } `json:"block"` 146 } 147 148 if decodeErr := json.NewDecoder(resp.Body).Decode(&blockResp); decodeErr != nil { 149 t.Fatalf("Failed to decode block response: %v", decodeErr) 150 } 151 152 t.Logf("XRPC block response received:") 153 t.Logf(" RecordURI: %s", blockResp.Block.RecordURI) 154 t.Logf(" RecordCID: %s", blockResp.Block.RecordCID) 155 156 if blockResp.Block.RecordURI == "" { 157 t.Fatal("Block response missing recordUri") 158 } 159 if blockResp.Block.RecordCID == "" { 160 t.Fatal("Block response missing recordCid") 161 } 162 163 // Verify block record was written to PDS 164 t.Logf("\nVerifying block record on PDS...") 165 rkey := utils.ExtractRKeyFromURI(blockResp.Block.RecordURI) 166 collection := "social.coves.actor.block" 167 168 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 169 pdsURL, userADID, collection, rkey)) 170 if pdsErr != nil { 171 t.Fatalf("Failed to fetch block record from PDS: %v", pdsErr) 172 } 173 defer func() { 174 if closeErr := pdsResp.Body.Close(); closeErr != nil { 175 t.Logf("Failed to close PDS response: %v", closeErr) 176 } 177 }() 178 179 if pdsResp.StatusCode != http.StatusOK { 180 body, _ := io.ReadAll(pdsResp.Body) 181 t.Fatalf("Block record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body)) 182 } 183 184 var pdsRecord struct { 185 Value map[string]interface{} `json:"value"` 186 CID string `json:"cid"` 187 } 188 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 189 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 190 } 191 192 t.Logf("Block record found on PDS:") 193 t.Logf(" CID: %s", pdsRecord.CID) 194 t.Logf(" Subject: %v", pdsRecord.Value["subject"]) 195 196 // Verify subject DID in PDS record 197 if pdsRecord.Value["subject"] != userBDID { 198 t.Errorf("Expected subject '%s', got %v", userBDID, pdsRecord.Value["subject"]) 199 } 200 201 // Simulate Jetstream CREATE event 202 t.Logf("\nSimulating Jetstream CREATE event for block...") 203 createEvent := jetstream.JetstreamEvent{ 204 Did: userADID, 205 TimeUS: time.Now().UnixMicro(), 206 Kind: "commit", 207 Commit: &jetstream.CommitEvent{ 208 Rev: "test-block-rev-create", 209 Operation: "create", 210 Collection: "social.coves.actor.block", 211 RKey: rkey, 212 CID: pdsRecord.CID, 213 Record: map[string]interface{}{ 214 "$type": "social.coves.actor.block", 215 "subject": userBDID, 216 "createdAt": time.Now().Format(time.RFC3339), 217 }, 218 }, 219 } 220 221 if handleErr := userConsumer.HandleEvent(ctx, &createEvent); handleErr != nil { 222 t.Fatalf("Failed to handle block create event: %v", handleErr) 223 } 224 225 // Verify block indexed in AppView via repo.GetBlock() 226 t.Logf("\nVerifying block indexed in AppView...") 227 indexedBlock, err := blockRepo.GetBlock(ctx, userADID, userBDID) 228 if err != nil { 229 t.Fatalf("Block not indexed in AppView: %v", err) 230 } 231 232 t.Logf("Block indexed in AppView:") 233 t.Logf(" BlockerDID: %s", indexedBlock.BlockerDID) 234 t.Logf(" BlockedDID: %s", indexedBlock.BlockedDID) 235 t.Logf(" RecordURI: %s", indexedBlock.RecordURI) 236 t.Logf(" RecordCID: %s", indexedBlock.RecordCID) 237 238 if indexedBlock.BlockerDID != userADID { 239 t.Errorf("Expected blocker_did %s, got %s", userADID, indexedBlock.BlockerDID) 240 } 241 if indexedBlock.BlockedDID != userBDID { 242 t.Errorf("Expected blocked_did %s, got %s", userBDID, indexedBlock.BlockedDID) 243 } 244 245 // Verify via IsBlocked 246 isBlocked, err := blockRepo.IsBlocked(ctx, userADID, userBDID) 247 if err != nil { 248 t.Fatalf("Failed to check IsBlocked: %v", err) 249 } 250 if !isBlocked { 251 t.Error("Expected IsBlocked to return true, got false") 252 } 253 254 t.Logf("BLOCK PHASE COMPLETE:") 255 t.Logf(" Client -> XRPC -> PDS Write -> Jetstream -> Consumer -> AppView") 256 t.Logf(" Block written to PDS") 257 t.Logf(" Block indexed in AppView") 258 t.Logf(" IsBlocked confirmed") 259 260 // ==================================================================================== 261 // UNBLOCK PHASE: User A unblocks User B 262 // ==================================================================================== 263 t.Logf("\n--- UNBLOCK PHASE: User A unblocks User B ---") 264 265 unblockReq := map[string]interface{}{ 266 "subject": userBDID, 267 } 268 269 unblockBody, marshalErr := json.Marshal(unblockReq) 270 if marshalErr != nil { 271 t.Fatalf("Failed to marshal unblock request: %v", marshalErr) 272 } 273 274 unblockHTTPReq, err := http.NewRequest(http.MethodPost, 275 httpServer.URL+"/xrpc/social.coves.actor.unblockUser", 276 bytes.NewBuffer(unblockBody)) 277 if err != nil { 278 t.Fatalf("Failed to create unblock request: %v", err) 279 } 280 unblockHTTPReq.Header.Set("Content-Type", "application/json") 281 unblockHTTPReq.Header.Set("Authorization", "Bearer "+tokenA) 282 283 unblockResp, err := http.DefaultClient.Do(unblockHTTPReq) 284 if err != nil { 285 t.Fatalf("Failed to POST unblock: %v", err) 286 } 287 defer func() { 288 if closeErr := unblockResp.Body.Close(); closeErr != nil { 289 t.Logf("Failed to close unblock response: %v", closeErr) 290 } 291 }() 292 293 if unblockResp.StatusCode != http.StatusOK { 294 body, _ := io.ReadAll(unblockResp.Body) 295 t.Fatalf("Unblock failed: status %d, body: %s", unblockResp.StatusCode, string(body)) 296 } 297 298 t.Logf("Unblock XRPC request succeeded") 299 300 // Verify record deleted from PDS (expect error/404) 301 t.Logf("\nVerifying block record deleted from PDS...") 302 pdsDeleteCheck, pdsDeleteErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 303 pdsURL, userADID, collection, rkey)) 304 if pdsDeleteErr != nil { 305 t.Fatalf("Failed to check PDS record after unblock: %v", pdsDeleteErr) 306 } 307 defer func() { 308 if closeErr := pdsDeleteCheck.Body.Close(); closeErr != nil { 309 t.Logf("Failed to close PDS delete check response: %v", closeErr) 310 } 311 }() 312 313 switch pdsDeleteCheck.StatusCode { 314 case http.StatusOK: 315 t.Error("Expected block record to be deleted from PDS, but it still exists") 316 case http.StatusBadRequest, http.StatusNotFound: 317 // Expected: PDS returns 400 (RecordNotFound) or 404 for deleted records 318 t.Logf("Block record confirmed deleted from PDS (status: %d)", pdsDeleteCheck.StatusCode) 319 default: 320 body, _ := io.ReadAll(pdsDeleteCheck.Body) 321 t.Errorf("Unexpected status when checking deleted record: %d, body: %s", pdsDeleteCheck.StatusCode, string(body)) 322 } 323 324 // Simulate Jetstream DELETE event 325 t.Logf("\nSimulating Jetstream DELETE event for unblock...") 326 deleteEvent := jetstream.JetstreamEvent{ 327 Did: userADID, 328 TimeUS: time.Now().UnixMicro(), 329 Kind: "commit", 330 Commit: &jetstream.CommitEvent{ 331 Rev: "test-block-rev-delete", 332 Operation: "delete", 333 Collection: "social.coves.actor.block", 334 RKey: rkey, 335 }, 336 } 337 338 if handleErr := userConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil { 339 t.Fatalf("Failed to handle block delete event: %v", handleErr) 340 } 341 342 // Verify block removed from AppView 343 t.Logf("\nVerifying block removed from AppView...") 344 _, err = blockRepo.GetBlock(ctx, userADID, userBDID) 345 if err == nil { 346 t.Error("Expected block to be deleted from AppView, but it still exists") 347 } 348 349 // Verify via IsBlocked 350 isBlockedAfter, err := blockRepo.IsBlocked(ctx, userADID, userBDID) 351 if err != nil { 352 t.Fatalf("Failed to check IsBlocked after unblock: %v", err) 353 } 354 if isBlockedAfter { 355 t.Error("Expected IsBlocked to return false after unblock, got true") 356 } 357 358 t.Logf("UNBLOCK PHASE COMPLETE:") 359 t.Logf(" Unblock request sent via XRPC") 360 t.Logf(" Block record deleted from PDS") 361 t.Logf(" Block removed from AppView") 362 t.Logf(" IsBlocked confirmed false") 363 364 t.Logf("\nTRUE E2E BLOCK/UNBLOCK FLOW COMPLETE:") 365 t.Logf(" Client -> XRPC -> PDS Write -> Jetstream -> Consumer -> AppView") 366 t.Logf(" Client -> XRPC Unblock -> PDS Delete -> Jetstream -> Consumer -> AppView removal") 367} 368 369// TestUserBlockE2E_SelfBlockPrevented tests that a user cannot block themselves. 370// This validates the self-block guard in the service layer with a real PDS. 371func TestUserBlockE2E_SelfBlockPrevented(t *testing.T) { 372 if testing.Short() { 373 t.Skip("Skipping E2E test in short mode") 374 } 375 376 db := setupTestDB(t) 377 defer func() { _ = db.Close() }() 378 379 pdsURL := getTestPDSURL() 380 381 // Health check PDS 382 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 383 if err != nil { 384 t.Skipf("PDS not running at %s: %v", pdsURL, err) 385 } 386 func() { 387 if closeErr := healthResp.Body.Close(); closeErr != nil { 388 t.Logf("Failed to close health response: %v", closeErr) 389 } 390 }() 391 392 // Clean up user_blocks at the end 393 defer func() { 394 _, _ = db.Exec("DELETE FROM user_blocks") 395 }() 396 397 // Setup repository 398 blockRepo := postgres.NewUserBlockRepository(db) 399 400 // Create User A on real PDS 401 userAHandle := fmt.Sprintf("self%d.local.coves.dev", time.Now().UnixNano()%1000000) 402 userAEmail := fmt.Sprintf("selfblock-%d@test.local", time.Now().Unix()) 403 userAPassword := "test-password-123" 404 405 t.Logf("Creating User A on PDS: %s", userAHandle) 406 pdsAccessTokenA, userADID, err := createPDSAccount(pdsURL, userAHandle, userAEmail, userAPassword) 407 if err != nil { 408 t.Fatalf("Failed to create User A on PDS: %v", err) 409 } 410 t.Logf("User A created: DID=%s", userADID) 411 412 // Index user in AppView 413 createTestUser(t, db, userAHandle, userADID) 414 415 // Setup OAuth middleware with User A's real PDS access token 416 e2eAuth := NewE2EOAuthMiddleware() 417 tokenA := e2eAuth.AddUserWithPDSToken(userADID, pdsAccessTokenA, pdsURL) 418 419 // Setup service with password-based PDS client factory 420 service := userblocks.NewServiceWithPDSFactory(blockRepo, nil, UserBlockPasswordAuthPDSClientFactory()) 421 422 // Setup HTTP server 423 r := chi.NewRouter() 424 routes.RegisterUserBlockRoutes(r, service, e2eAuth.OAuthAuthMiddleware) 425 httpServer := httptest.NewServer(r) 426 defer httpServer.Close() 427 428 // Attempt to self-block 429 t.Logf("\nAttempting to block self (should fail)...") 430 431 selfBlockReq := map[string]interface{}{ 432 "subject": userADID, 433 } 434 435 reqBody, marshalErr := json.Marshal(selfBlockReq) 436 if marshalErr != nil { 437 t.Fatalf("Failed to marshal self-block request: %v", marshalErr) 438 } 439 440 req, err := http.NewRequest(http.MethodPost, 441 httpServer.URL+"/xrpc/social.coves.actor.blockUser", 442 bytes.NewBuffer(reqBody)) 443 if err != nil { 444 t.Fatalf("Failed to create self-block request: %v", err) 445 } 446 req.Header.Set("Content-Type", "application/json") 447 req.Header.Set("Authorization", "Bearer "+tokenA) 448 449 resp, err := http.DefaultClient.Do(req) 450 if err != nil { 451 t.Fatalf("Failed to POST self-block: %v", err) 452 } 453 defer func() { _ = resp.Body.Close() }() 454 455 // Should return 400 Bad Request 456 if resp.StatusCode != http.StatusBadRequest { 457 body, _ := io.ReadAll(resp.Body) 458 t.Fatalf("Expected 400 for self-block, got %d: %s", resp.StatusCode, string(body)) 459 } 460 461 // Parse error response 462 var errResp struct { 463 Error string `json:"error"` 464 Message string `json:"message"` 465 } 466 if decodeErr := json.NewDecoder(resp.Body).Decode(&errResp); decodeErr != nil { 467 t.Fatalf("Failed to decode error response: %v", decodeErr) 468 } 469 470 // Verify error message mentions self-blocking 471 if !contains(errResp.Message, "cannot block yourself") { 472 t.Errorf("Expected error about self-blocking, got: %s", errResp.Message) 473 } 474 475 t.Logf("Self-block correctly prevented:") 476 t.Logf(" Status: %d", resp.StatusCode) 477 t.Logf(" Error: %s", errResp.Error) 478 t.Logf(" Message: %s", errResp.Message) 479 480 t.Logf("\nSELF-BLOCK PREVENTION TEST COMPLETE:") 481 t.Logf(" User attempted to block themselves") 482 t.Logf(" Server returned 400 with 'cannot block yourself'") 483 t.Logf(" No record written to PDS") 484}