A community based topic aggregation platform built on atproto
at main 852 lines 26 kB view raw
1package integration 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/atproto/identity" 6 "Coves/internal/atproto/jetstream" 7 "Coves/internal/core/communities" 8 "Coves/internal/core/posts" 9 "Coves/internal/core/users" 10 "Coves/internal/db/postgres" 11 "context" 12 "database/sql" 13 "errors" 14 "fmt" 15 "net" 16 "net/http" 17 "os" 18 "strings" 19 "testing" 20 "time" 21 22 "github.com/gorilla/websocket" 23 _ "github.com/lib/pq" 24 "github.com/pressly/goose/v3" 25 26 oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth" 27 "github.com/bluesky-social/indigo/atproto/syntax" 28) 29 30// TestPostDeletion_JetstreamConsumer tests that the Jetstream consumer 31// correctly handles post deletion events by soft-deleting posts in the AppView database. 32func TestPostDeletion_JetstreamConsumer(t *testing.T) { 33 db := setupTestDB(t) 34 defer func() { 35 if err := db.Close(); err != nil { 36 t.Logf("Failed to close database: %v", err) 37 } 38 }() 39 40 ctx := context.Background() 41 42 // Cleanup old test data 43 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:deltest123'") 44 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:deltest123'") 45 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:delauthor123'") 46 47 // Setup repositories 48 userRepo := postgres.NewUserRepository(db) 49 communityRepo := postgres.NewCommunityRepository(db) 50 postRepo := postgres.NewPostRepository(db) 51 52 // Setup user service for post consumer 53 identityConfig := identity.DefaultConfig() 54 identityResolver := identity.NewResolver(db, identityConfig) 55 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001") 56 57 // Create test user (author) 58 author := createTestUser(t, db, "delauthor.test", "did:plc:delauthor123") 59 60 // Create test community 61 community := &communities.Community{ 62 DID: "did:plc:deltest123", 63 Handle: "c-deltest.test.coves.social", 64 Name: "deltest", 65 DisplayName: "Delete Test Community", 66 OwnerDID: "did:plc:deltest123", 67 CreatedByDID: author.DID, 68 HostedByDID: "did:web:coves.test", 69 Visibility: "public", 70 ModerationType: "moderator", 71 RecordURI: "at://did:plc:deltest123/social.coves.community.profile/self", 72 RecordCID: "fakecid123", 73 PDSAccessToken: "fake_token_for_testing", 74 PDSRefreshToken: "fake_refresh_token", 75 } 76 _, err := communityRepo.Create(ctx, community) 77 if err != nil { 78 t.Fatalf("Failed to create test community: %v", err) 79 } 80 81 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 82 83 t.Run("Create then delete post via Jetstream", func(t *testing.T) { 84 rkey := generateTID() 85 title := "Post to be deleted" 86 content := "This post will be deleted" 87 88 // Step 1: Create the post via Jetstream event 89 createEvent := jetstream.JetstreamEvent{ 90 Did: community.DID, 91 Kind: "commit", 92 Commit: &jetstream.CommitEvent{ 93 Operation: "create", 94 Collection: "social.coves.community.post", 95 RKey: rkey, 96 CID: "bafy2bzacedeltest1", 97 Record: map[string]interface{}{ 98 "$type": "social.coves.community.post", 99 "community": community.DID, 100 "author": author.DID, 101 "title": title, 102 "content": content, 103 "createdAt": time.Now().Format(time.RFC3339), 104 }, 105 }, 106 } 107 108 err := consumer.HandleEvent(ctx, &createEvent) 109 if err != nil { 110 t.Fatalf("Failed to create post: %v", err) 111 } 112 113 // Verify post was created 114 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey) 115 createdPost, err := postRepo.GetByURI(ctx, postURI) 116 if err != nil { 117 t.Fatalf("Post not indexed after create: %v", err) 118 } 119 if createdPost.DeletedAt != nil { 120 t.Fatal("Post should not be deleted initially") 121 } 122 123 t.Logf("✓ Post created: %s", postURI) 124 125 // Step 2: Delete the post via Jetstream event 126 deleteEvent := jetstream.JetstreamEvent{ 127 Did: community.DID, 128 Kind: "commit", 129 Commit: &jetstream.CommitEvent{ 130 Operation: "delete", 131 Collection: "social.coves.community.post", 132 RKey: rkey, 133 }, 134 } 135 136 err = consumer.HandleEvent(ctx, &deleteEvent) 137 if err != nil { 138 t.Fatalf("Failed to delete post: %v", err) 139 } 140 141 // Step 3: Verify post was soft-deleted 142 deletedPost, err := postRepo.GetByURI(ctx, postURI) 143 if err != nil { 144 t.Fatalf("Post should still exist after soft delete: %v", err) 145 } 146 if deletedPost.DeletedAt == nil { 147 t.Fatal("Post should have deleted_at set after delete") 148 } 149 150 t.Logf("✓ Post soft-deleted: deleted_at=%v", deletedPost.DeletedAt) 151 t.Log("✅ Delete flow complete: Create → Delete → Verify soft-deleted") 152 }) 153 154 t.Run("Delete is idempotent", func(t *testing.T) { 155 rkey := generateTID() 156 157 // Create a post first 158 createEvent := jetstream.JetstreamEvent{ 159 Did: community.DID, 160 Kind: "commit", 161 Commit: &jetstream.CommitEvent{ 162 Operation: "create", 163 Collection: "social.coves.community.post", 164 RKey: rkey, 165 CID: "bafy2bzaceidempotentdel", 166 Record: map[string]interface{}{ 167 "$type": "social.coves.community.post", 168 "community": community.DID, 169 "author": author.DID, 170 "title": "Idempotent delete test", 171 "content": "Testing idempotent deletion", 172 "createdAt": time.Now().Format(time.RFC3339), 173 }, 174 }, 175 } 176 err := consumer.HandleEvent(ctx, &createEvent) 177 if err != nil { 178 t.Fatalf("Failed to create post: %v", err) 179 } 180 181 // Delete once 182 deleteEvent := jetstream.JetstreamEvent{ 183 Did: community.DID, 184 Kind: "commit", 185 Commit: &jetstream.CommitEvent{ 186 Operation: "delete", 187 Collection: "social.coves.community.post", 188 RKey: rkey, 189 }, 190 } 191 err = consumer.HandleEvent(ctx, &deleteEvent) 192 if err != nil { 193 t.Fatalf("First delete failed: %v", err) 194 } 195 196 // Delete again (should be idempotent) 197 err = consumer.HandleEvent(ctx, &deleteEvent) 198 if err != nil { 199 t.Fatalf("Second delete should be idempotent, got error: %v", err) 200 } 201 202 t.Log("✓ Delete is idempotent - second delete did not fail") 203 }) 204 205 t.Run("Delete non-existent post is idempotent", func(t *testing.T) { 206 // Try to delete a post that was never created 207 deleteEvent := jetstream.JetstreamEvent{ 208 Did: community.DID, 209 Kind: "commit", 210 Commit: &jetstream.CommitEvent{ 211 Operation: "delete", 212 Collection: "social.coves.community.post", 213 RKey: "nonexistent123", 214 }, 215 } 216 217 err := consumer.HandleEvent(ctx, &deleteEvent) 218 if err != nil { 219 t.Fatalf("Delete of non-existent post should be idempotent, got error: %v", err) 220 } 221 222 t.Log("✓ Delete of non-existent post is idempotent") 223 }) 224} 225 226// TestPostDeletion_Authorization tests that only the post author can delete their posts 227func TestPostDeletion_Authorization(t *testing.T) { 228 if testing.Short() { 229 t.Skip("Skipping authorization test in short mode") 230 } 231 232 db := setupTestDB(t) 233 defer func() { _ = db.Close() }() 234 235 ctx := context.Background() 236 pdsURL := getTestPDSURL() 237 238 // Setup repositories 239 communityRepo := postgres.NewCommunityRepository(db) 240 postRepo := postgres.NewPostRepository(db) 241 242 // Create a mock community service for testing 243 communityService := communities.NewCommunityServiceWithPDSFactory( 244 communityRepo, 245 pdsURL, 246 "did:web:test", 247 "test.coves.social", 248 nil, // No provisioner needed for this test 249 nil, // No PDS factory 250 nil, // No blob service 251 ) 252 253 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) 254 255 // Create test user (attacker trying to delete another user's post) 256 attackerHandle := fmt.Sprintf("attacker%d.local.coves.dev", time.Now().UnixNano()%1000000) 257 attackerEmail := fmt.Sprintf("attacker-%d@test.local", time.Now().Unix()) 258 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123") 259 if err != nil { 260 t.Skipf("PDS not available: %v", err) 261 } 262 263 // Setup OAuth session for attacker 264 parsedDID, err := syntax.ParseDID(attackerDID) 265 if err != nil { 266 t.Fatalf("Failed to parse attacker DID: %v", err) 267 } 268 attackerSession := &oauthlib.ClientSessionData{ 269 AccountDID: parsedDID, 270 AccessToken: attackerToken, 271 HostURL: pdsURL, 272 } 273 274 // Create post URI belonging to a DIFFERENT user (the owner) 275 ownerDID := "did:plc:owner123" 276 postURI := fmt.Sprintf("at://%s/social.coves.community.post/test123", ownerDID) 277 278 t.Run("Non-author cannot delete post - URI contains wrong DID", func(t *testing.T) { 279 // The post URI contains ownerDID in the community position 280 // This should fail because attacker is not the community owner 281 // and wouldn't have credentials to delete from that repo 282 283 deleteReq := posts.DeletePostRequest{ 284 URI: postURI, 285 } 286 287 err := postService.DeletePost(ctx, attackerSession, deleteReq) 288 289 // We expect an error - either NotAuthorized or CommunityNotFound 290 // since the community doesn't exist in our test DB 291 if err == nil { 292 t.Fatal("Expected error when non-author tries to delete post, got nil") 293 } 294 295 t.Logf("✓ Non-author blocked from deleting post: %v", err) 296 }) 297 298 t.Run("Invalid URI format returns error", func(t *testing.T) { 299 deleteReq := posts.DeletePostRequest{ 300 URI: "invalid-uri-format", 301 } 302 303 err := postService.DeletePost(ctx, attackerSession, deleteReq) 304 305 if err == nil { 306 t.Fatal("Expected error for invalid URI, got nil") 307 } 308 309 // Should be a validation error 310 if !posts.IsValidationError(err) { 311 t.Logf("Got error (expected validation): %v", err) 312 } 313 314 t.Logf("✓ Invalid URI rejected: %v", err) 315 }) 316 317 t.Run("Empty URI returns error", func(t *testing.T) { 318 deleteReq := posts.DeletePostRequest{ 319 URI: "", 320 } 321 322 err := postService.DeletePost(ctx, attackerSession, deleteReq) 323 324 if err == nil { 325 t.Fatal("Expected error for empty URI, got nil") 326 } 327 328 t.Logf("✓ Empty URI rejected: %v", err) 329 }) 330 331 t.Run("Nil session returns error", func(t *testing.T) { 332 deleteReq := posts.DeletePostRequest{ 333 URI: postURI, 334 } 335 336 err := postService.DeletePost(ctx, nil, deleteReq) 337 338 if err == nil { 339 t.Fatal("Expected error for nil session, got nil") 340 } 341 342 t.Logf("✓ Nil session rejected: %v", err) 343 }) 344} 345 346// TestPostDeletion_ServiceAuthorization tests the author verification logic in the service layer 347// This test requires a live PDS to fully test the authorization flow 348func TestPostDeletion_ServiceAuthorization_LivePDS(t *testing.T) { 349 if testing.Short() { 350 t.Skip("Skipping live PDS test in short mode") 351 } 352 353 db := setupTestDB(t) 354 defer func() { _ = db.Close() }() 355 356 ctx := context.Background() 357 pdsURL := getTestPDSURL() 358 359 // Check if PDS is available 360 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 361 if err != nil { 362 t.Skipf("PDS not available: %v", err) 363 } 364 _ = healthResp.Body.Close() 365 366 // Setup repositories 367 communityRepo := postgres.NewCommunityRepository(db) 368 postRepo := postgres.NewPostRepository(db) 369 370 // Get instance credentials to determine correct domain 371 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 372 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 373 if instanceHandle == "" { 374 instanceHandle = "testuser123.local.coves.dev" 375 } 376 if instancePassword == "" { 377 instancePassword = "test-password-123" 378 } 379 380 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 381 if err != nil { 382 t.Skipf("Failed to authenticate with PDS: %v", err) 383 } 384 385 var instanceDomain string 386 if strings.HasPrefix(instanceDID, "did:web:") { 387 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 388 } else { 389 instanceDomain = "local.coves.dev" 390 } 391 392 // Create provisioner for community creation 393 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 394 395 communityService := communities.NewCommunityServiceWithPDSFactory( 396 communityRepo, 397 pdsURL, 398 instanceDID, 399 instanceDomain, 400 provisioner, 401 nil, 402 nil, 403 ) 404 405 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) 406 407 // Create two test users 408 ownerHandle := fmt.Sprintf("postowner%d.local.coves.dev", time.Now().UnixNano()%1000000) 409 ownerEmail := fmt.Sprintf("postowner-%d@test.local", time.Now().Unix()) 410 _, ownerDID, err := createPDSAccount(pdsURL, ownerHandle, ownerEmail, "password123") 411 if err != nil { 412 t.Skipf("Failed to create owner account: %v", err) 413 } 414 owner := createTestUser(t, db, ownerHandle, ownerDID) 415 416 attackerHandle := fmt.Sprintf("postattacker%d.local.coves.dev", time.Now().UnixNano()%1000000) 417 attackerEmail := fmt.Sprintf("postattacker-%d@test.local", time.Now().Unix()) 418 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123") 419 if err != nil { 420 t.Skipf("Failed to create attacker account: %v", err) 421 } 422 _ = createTestUser(t, db, attackerHandle, attackerDID) 423 424 // Setup attacker session 425 parsedAttackerDID, _ := syntax.ParseDID(attackerDID) 426 attackerSession := &oauthlib.ClientSessionData{ 427 AccountDID: parsedAttackerDID, 428 AccessToken: attackerToken, 429 HostURL: pdsURL, 430 } 431 432 // Create a test community 433 communityName := fmt.Sprintf("delauth%d", time.Now().UnixNano()%1000000) 434 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 435 Name: communityName, 436 DisplayName: "Delete Auth Test Community", 437 Description: "Testing post deletion authorization", 438 CreatedByDID: owner.DID, 439 Visibility: "public", 440 }) 441 if err != nil { 442 t.Fatalf("Failed to create community: %v", err) 443 } 444 445 t.Logf("✓ Community created: %s (%s)", community.Name, community.DID) 446 447 // Create a post as the owner 448 title := "Owner's Post" 449 content := "This post belongs to the owner" 450 createResp, err := postService.CreatePost( 451 middleware.SetTestUserDID(ctx, owner.DID), 452 posts.CreatePostRequest{ 453 Community: community.DID, 454 Title: &title, 455 Content: &content, 456 AuthorDID: owner.DID, 457 }, 458 ) 459 if err != nil { 460 t.Fatalf("Failed to create post: %v", err) 461 } 462 463 t.Logf("✓ Post created by owner: %s", createResp.URI) 464 465 t.Run("Attacker cannot delete owner's post", func(t *testing.T) { 466 deleteReq := posts.DeletePostRequest{ 467 URI: createResp.URI, 468 } 469 470 err := postService.DeletePost(ctx, attackerSession, deleteReq) 471 472 if err == nil { 473 t.Fatal("Expected ErrNotAuthorized when attacker tries to delete owner's post") 474 } 475 476 if !errors.Is(err, posts.ErrNotAuthorized) { 477 t.Errorf("Expected ErrNotAuthorized, got: %v", err) 478 } 479 480 t.Logf("✅ Authorization check passed: attacker blocked with %v", err) 481 }) 482} 483 484// TestPostE2E_DeleteWithJetstream tests post deletion with real PDS and Jetstream 485// This is a TRUE E2E test that follows the complete flow: 486// 1. Create real community on PDS 487// 2. Create real post on PDS 488// 3. Subscribe to Jetstream 489// 4. Delete post via service (which deletes from PDS) 490// 5. Receive delete event from Jetstream 491// 6. Verify post is soft-deleted in AppView DB 492func TestPostE2E_DeleteWithJetstream(t *testing.T) { 493 if testing.Short() { 494 t.Skip("Skipping E2E test in short mode") 495 } 496 497 // Setup test database 498 dbURL := os.Getenv("TEST_DATABASE_URL") 499 if dbURL == "" { 500 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 501 } 502 503 db, err := sql.Open("postgres", dbURL) 504 if err != nil { 505 t.Fatalf("Failed to connect to test database: %v", err) 506 } 507 defer func() { _ = db.Close() }() 508 509 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 510 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 511 } 512 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 513 t.Fatalf("Failed to run migrations: %v", migrateErr) 514 } 515 516 pdsURL := getTestPDSURL() 517 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 518 if err != nil { 519 t.Skipf("PDS not running at %s: %v", pdsURL, err) 520 } 521 _ = healthResp.Body.Close() 522 523 // Check Jetstream is available 524 pdsHostname := strings.TrimPrefix(pdsURL, "http://") 525 pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 526 pdsHostname = strings.Split(pdsHostname, ":")[0] 527 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post", pdsHostname) 528 529 testConn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 530 if err != nil { 531 t.Skipf("Jetstream not running at %s: %v", jetstreamURL, err) 532 } 533 _ = testConn.Close() 534 535 ctx := context.Background() 536 537 // Setup repositories 538 userRepo := postgres.NewUserRepository(db) 539 communityRepo := postgres.NewCommunityRepository(db) 540 postRepo := postgres.NewPostRepository(db) 541 542 // Setup identity resolver for user service 543 identityConfig := identity.DefaultConfig() 544 plcURL := os.Getenv("PLC_DIRECTORY_URL") 545 if plcURL == "" { 546 plcURL = "http://localhost:3002" 547 } 548 identityConfig.PLCURL = plcURL 549 identityResolver := identity.NewResolver(db, identityConfig) 550 userService := users.NewUserService(userRepo, identityResolver, pdsURL) 551 552 // Setup community service with provisioner for real PDS 553 var instanceDomain string 554 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE") 555 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD") 556 if instanceHandle == "" { 557 instanceHandle = "testuser123.local.coves.dev" 558 } 559 if instancePassword == "" { 560 instancePassword = "test-password-123" 561 } 562 563 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword) 564 if err != nil { 565 t.Skipf("Failed to authenticate with PDS: %v", err) 566 } 567 568 if strings.HasPrefix(instanceDID, "did:web:") { 569 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 570 } else { 571 instanceDomain = "coves.social" 572 } 573 574 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL) 575 communityService := communities.NewCommunityServiceWithPDSFactory( 576 communityRepo, 577 pdsURL, 578 instanceDID, 579 instanceDomain, 580 provisioner, 581 nil, 582 nil, 583 ) 584 585 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL) 586 587 // Create test user 588 testID := fmt.Sprintf("%d", time.Now().UnixNano()%1000000) 589 testUserHandle := fmt.Sprintf("postdel%s.local.coves.dev", testID) 590 testUserEmail := fmt.Sprintf("postdel%s@test.local", testID) 591 testUserPassword := "test-password-123" 592 593 _, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 594 if err != nil { 595 t.Fatalf("Failed to create test user on PDS: %v", err) 596 } 597 testUser := createTestUser(t, db, testUserHandle, userDID) 598 599 // Create test community on real PDS 600 communityName := fmt.Sprintf("postdel%s", testID) 601 t.Logf("\n📝 Creating community on PDS: %s", communityName) 602 603 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 604 Name: communityName, 605 DisplayName: "Post Delete E2E Test Community", 606 Description: "Testing post deletion E2E flow", 607 CreatedByDID: testUser.DID, 608 Visibility: "public", 609 }) 610 if err != nil { 611 t.Fatalf("Failed to create community: %v", err) 612 } 613 614 t.Logf("✅ Community created: %s (%s)", community.Name, community.DID) 615 616 // Setup Jetstream consumer 617 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 618 619 t.Run("delete post with real Jetstream indexing", func(t *testing.T) { 620 // Create post via service (writes to real PDS) 621 createEventChan := make(chan *jetstream.JetstreamEvent, 10) 622 createDone := make(chan bool) 623 624 go func() { 625 subscribeErr := subscribeToJetstreamForPostCreate(ctx, jetstreamURL, community.DID, postConsumer, createEventChan, createDone) 626 if subscribeErr != nil { 627 t.Logf("Create subscription error: %v", subscribeErr) 628 } 629 }() 630 631 time.Sleep(500 * time.Millisecond) 632 633 title := "Post to delete via E2E" 634 content := "This post will be deleted and we'll verify via Jetstream" 635 t.Logf("\n📝 Creating post on PDS...") 636 637 createResp, err := postService.CreatePost( 638 middleware.SetTestUserDID(ctx, testUser.DID), 639 posts.CreatePostRequest{ 640 Community: community.DID, 641 Title: &title, 642 Content: &content, 643 AuthorDID: testUser.DID, 644 }, 645 ) 646 if err != nil { 647 t.Fatalf("Failed to create post: %v", err) 648 } 649 650 t.Logf("✅ Post created: %s", createResp.URI) 651 652 // Wait for create event from Jetstream 653 select { 654 case <-createEventChan: 655 t.Logf("✅ Create event received from Jetstream") 656 case <-time.After(30 * time.Second): 657 t.Fatalf("Timeout waiting for create event") 658 } 659 close(createDone) 660 661 // Verify post exists in AppView 662 createdPost, err := postRepo.GetByURI(ctx, createResp.URI) 663 if err != nil { 664 t.Fatalf("Post should exist after create: %v", err) 665 } 666 if createdPost.DeletedAt != nil { 667 t.Fatal("Post should not be deleted initially") 668 } 669 670 // Now delete the post 671 t.Logf("\n🗑️ Deleting post via service...") 672 673 deleteEventChan := make(chan *jetstream.JetstreamEvent, 10) 674 deleteDone := make(chan bool) 675 676 go func() { 677 subscribeErr := subscribeToJetstreamForPostDelete(ctx, jetstreamURL, community.DID, postConsumer, deleteEventChan, deleteDone) 678 if subscribeErr != nil { 679 t.Logf("Delete subscription error: %v", subscribeErr) 680 } 681 }() 682 683 time.Sleep(500 * time.Millisecond) 684 685 // Create OAuth session for the post author 686 parsedDID, _ := syntax.ParseDID(testUser.DID) 687 // Get fresh token for the user 688 freshToken, _, err := authenticateWithPDS(pdsURL, testUserHandle, testUserPassword) 689 if err != nil { 690 t.Fatalf("Failed to get fresh token: %v", err) 691 } 692 session := &oauthlib.ClientSessionData{ 693 AccountDID: parsedDID, 694 AccessToken: freshToken, 695 HostURL: pdsURL, 696 } 697 698 err = postService.DeletePost(ctx, session, posts.DeletePostRequest{URI: createResp.URI}) 699 if err != nil { 700 t.Fatalf("Failed to delete post: %v", err) 701 } 702 703 t.Logf("✅ Post delete request sent to PDS") 704 705 // Wait for delete event from Jetstream 706 t.Logf("\n⏳ Waiting for delete event from Jetstream...") 707 708 select { 709 case event := <-deleteEventChan: 710 t.Logf("✅ Received delete event from Jetstream!") 711 t.Logf(" Operation: %s", event.Commit.Operation) 712 713 if event.Commit.Operation != "delete" { 714 t.Errorf("Expected operation 'delete', got '%s'", event.Commit.Operation) 715 } 716 717 // Verify post is soft-deleted in AppView 718 deletedPost, err := postRepo.GetByURI(ctx, createResp.URI) 719 if err != nil { 720 t.Fatalf("Failed to get deleted post: %v", err) 721 } 722 723 if deletedPost.DeletedAt == nil { 724 t.Errorf("Expected post to be soft-deleted (deleted_at should be set)") 725 } else { 726 t.Logf("✅ Post soft-deleted in AppView at: %v", *deletedPost.DeletedAt) 727 } 728 729 close(deleteDone) 730 731 case <-time.After(30 * time.Second): 732 t.Fatalf("Timeout: No delete event received within 30 seconds") 733 } 734 735 t.Logf("\n✅ TRUE E2E POST DELETE FLOW COMPLETE:") 736 t.Logf(" Client → Service → PDS DeleteRecord → Jetstream → Consumer → AppView ✓") 737 }) 738} 739 740// subscribeToJetstreamForPostCreate subscribes for post create events 741func subscribeToJetstreamForPostCreate( 742 ctx context.Context, 743 jetstreamURL string, 744 targetDID string, 745 consumer *jetstream.PostEventConsumer, 746 eventChan chan<- *jetstream.JetstreamEvent, 747 done <-chan bool, 748) error { 749 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 750 if err != nil { 751 return fmt.Errorf("failed to connect to Jetstream: %w", err) 752 } 753 defer func() { _ = conn.Close() }() 754 755 for { 756 select { 757 case <-done: 758 return nil 759 case <-ctx.Done(): 760 return ctx.Err() 761 default: 762 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 763 return fmt.Errorf("failed to set read deadline: %w", err) 764 } 765 766 var event jetstream.JetstreamEvent 767 err := conn.ReadJSON(&event) 768 if err != nil { 769 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 770 return nil 771 } 772 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 773 continue 774 } 775 return fmt.Errorf("failed to read Jetstream message: %w", err) 776 } 777 778 if event.Did == targetDID && event.Kind == "commit" && 779 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" && 780 event.Commit.Operation == "create" { 781 782 if err := consumer.HandleEvent(ctx, &event); err != nil { 783 return fmt.Errorf("failed to process event: %w", err) 784 } 785 786 select { 787 case eventChan <- &event: 788 return nil 789 case <-time.After(1 * time.Second): 790 return fmt.Errorf("timeout sending event to channel") 791 } 792 } 793 } 794 } 795} 796 797// subscribeToJetstreamForPostDelete subscribes for post delete events 798func subscribeToJetstreamForPostDelete( 799 ctx context.Context, 800 jetstreamURL string, 801 targetDID string, 802 consumer *jetstream.PostEventConsumer, 803 eventChan chan<- *jetstream.JetstreamEvent, 804 done <-chan bool, 805) error { 806 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 807 if err != nil { 808 return fmt.Errorf("failed to connect to Jetstream: %w", err) 809 } 810 defer func() { _ = conn.Close() }() 811 812 for { 813 select { 814 case <-done: 815 return nil 816 case <-ctx.Done(): 817 return ctx.Err() 818 default: 819 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 820 return fmt.Errorf("failed to set read deadline: %w", err) 821 } 822 823 var event jetstream.JetstreamEvent 824 err := conn.ReadJSON(&event) 825 if err != nil { 826 if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 827 return nil 828 } 829 if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 830 continue 831 } 832 return fmt.Errorf("failed to read Jetstream message: %w", err) 833 } 834 835 if event.Did == targetDID && event.Kind == "commit" && 836 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" && 837 event.Commit.Operation == "delete" { 838 839 if err := consumer.HandleEvent(ctx, &event); err != nil { 840 return fmt.Errorf("failed to process event: %w", err) 841 } 842 843 select { 844 case eventChan <- &event: 845 return nil 846 case <-time.After(1 * time.Second): 847 return fmt.Errorf("timeout sending event to channel") 848 } 849 } 850 } 851 } 852}