A community based topic aggregation platform built on atproto
at main 990 lines 32 kB view raw
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/atproto/pds" 6 "Coves/internal/atproto/utils" 7 "Coves/internal/core/comments" 8 "Coves/internal/db/postgres" 9 "context" 10 "database/sql" 11 "encoding/json" 12 "errors" 13 "fmt" 14 "io" 15 "net/http" 16 "os" 17 "strings" 18 "testing" 19 "time" 20 21 oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth" 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 _ "github.com/lib/pq" 24 "github.com/pressly/goose/v3" 25) 26 27// TestCommentWrite_CreateTopLevelComment tests creating a comment on a post via E2E flow 28func TestCommentWrite_CreateTopLevelComment(t *testing.T) { 29 // Skip in short mode since this requires real PDS 30 if testing.Short() { 31 t.Skip("Skipping E2E test in short mode") 32 } 33 34 // Setup test database 35 dbURL := os.Getenv("TEST_DATABASE_URL") 36 if dbURL == "" { 37 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 38 } 39 40 db, err := sql.Open("postgres", dbURL) 41 if err != nil { 42 t.Fatalf("Failed to connect to test database: %v", err) 43 } 44 defer func() { 45 if closeErr := db.Close(); closeErr != nil { 46 t.Logf("Failed to close database: %v", closeErr) 47 } 48 }() 49 50 // Run migrations 51 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 52 t.Fatalf("Failed to set goose dialect: %v", dialectErr) 53 } 54 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 55 t.Fatalf("Failed to run migrations: %v", migrateErr) 56 } 57 58 // Check if PDS is running 59 pdsURL := getTestPDSURL() 60 healthResp, err := http.Get(pdsURL + "/xrpc/_health") 61 if err != nil { 62 t.Skipf("PDS not running at %s: %v", pdsURL, err) 63 } 64 func() { 65 if closeErr := healthResp.Body.Close(); closeErr != nil { 66 t.Logf("Failed to close health response: %v", closeErr) 67 } 68 }() 69 70 ctx := context.Background() 71 72 // Setup repositories 73 commentRepo := postgres.NewCommentRepository(db) 74 postRepo := postgres.NewPostRepository(db) 75 76 // Setup service with password-based PDS client factory for E2E testing 77 // CommentPDSClientFactory creates a PDS client for comment operations 78 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 79 if session.AccessToken == "" { 80 return nil, fmt.Errorf("session has no access token") 81 } 82 if session.HostURL == "" { 83 return nil, fmt.Errorf("session has no host URL") 84 } 85 86 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 87 } 88 89 commentService := comments.NewCommentServiceWithPDSFactory( 90 commentRepo, 91 nil, // userRepo not needed for write ops 92 postRepo, 93 nil, // communityRepo not needed for write ops 94 nil, // logger 95 commentPDSFactory, 96 ) 97 98 // Create test user on PDS 99 testUserHandle := fmt.Sprintf("cmw%d.local.coves.dev", time.Now().UnixNano()%1000000) 100 testUserEmail := fmt.Sprintf("commenter-%d@test.local", time.Now().Unix()) 101 testUserPassword := "test-password-123" 102 103 t.Logf("Creating test user on PDS: %s", testUserHandle) 104 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 105 if err != nil { 106 t.Fatalf("Failed to create test user on PDS: %v", err) 107 } 108 t.Logf("Test user created: DID=%s", userDID) 109 110 // Index user in AppView 111 testUser := createTestUser(t, db, testUserHandle, userDID) 112 113 // Create test community and post to comment on 114 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test") 115 if err != nil { 116 t.Fatalf("Failed to create test community: %v", err) 117 } 118 119 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 120 postCID := "bafypost123" 121 122 // Create mock OAuth session for service layer 123 mockStore := NewMockOAuthStore() 124 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 125 126 // ==================================================================================== 127 // TEST: Create top-level comment on post 128 // ==================================================================================== 129 t.Logf("\n📝 Creating top-level comment via service...") 130 131 commentReq := comments.CreateCommentRequest{ 132 Reply: comments.ReplyRef{ 133 Root: comments.StrongRef{ 134 URI: postURI, 135 CID: postCID, 136 }, 137 Parent: comments.StrongRef{ 138 URI: postURI, 139 CID: postCID, 140 }, 141 }, 142 Content: "This is a test comment on the post", 143 Langs: []string{"en"}, 144 } 145 146 // Get session from store 147 parsedDID, _ := parseTestDID(userDID) 148 session, err := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 149 if err != nil { 150 t.Fatalf("Failed to get session: %v", err) 151 } 152 153 commentResp, err := commentService.CreateComment(ctx, session, commentReq) 154 if err != nil { 155 t.Fatalf("Failed to create comment: %v", err) 156 } 157 158 t.Logf("✅ Comment created:") 159 t.Logf(" URI: %s", commentResp.URI) 160 t.Logf(" CID: %s", commentResp.CID) 161 162 // Verify comment record was written to PDS 163 t.Logf("\n🔍 Verifying comment record on PDS...") 164 rkey := utils.ExtractRKeyFromURI(commentResp.URI) 165 collection := "social.coves.community.comment" 166 167 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 168 pdsURL, userDID, collection, rkey)) 169 if pdsErr != nil { 170 t.Fatalf("Failed to fetch comment record from PDS: %v", pdsErr) 171 } 172 defer func() { 173 if closeErr := pdsResp.Body.Close(); closeErr != nil { 174 t.Logf("Failed to close PDS response: %v", closeErr) 175 } 176 }() 177 178 if pdsResp.StatusCode != http.StatusOK { 179 body, _ := io.ReadAll(pdsResp.Body) 180 t.Fatalf("Comment record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body)) 181 } 182 183 var pdsRecord struct { 184 Value map[string]interface{} `json:"value"` 185 CID string `json:"cid"` 186 } 187 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 188 t.Fatalf("Failed to decode PDS record: %v", decodeErr) 189 } 190 191 t.Logf("✅ Comment record found on PDS:") 192 t.Logf(" CID: %s", pdsRecord.CID) 193 t.Logf(" Content: %v", pdsRecord.Value["content"]) 194 195 // Verify content 196 if pdsRecord.Value["content"] != "This is a test comment on the post" { 197 t.Errorf("Expected content 'This is a test comment on the post', got %v", pdsRecord.Value["content"]) 198 } 199 200 // Simulate Jetstream consumer indexing the comment 201 t.Logf("\n🔄 Simulating Jetstream consumer indexing comment...") 202 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 203 204 commentEvent := jetstream.JetstreamEvent{ 205 Did: userDID, 206 TimeUS: time.Now().UnixMicro(), 207 Kind: "commit", 208 Commit: &jetstream.CommitEvent{ 209 Rev: "test-comment-rev", 210 Operation: "create", 211 Collection: "social.coves.community.comment", 212 RKey: rkey, 213 CID: pdsRecord.CID, 214 Record: map[string]interface{}{ 215 "$type": "social.coves.community.comment", 216 "reply": map[string]interface{}{ 217 "root": map[string]interface{}{ 218 "uri": postURI, 219 "cid": postCID, 220 }, 221 "parent": map[string]interface{}{ 222 "uri": postURI, 223 "cid": postCID, 224 }, 225 }, 226 "content": "This is a test comment on the post", 227 "createdAt": time.Now().Format(time.RFC3339), 228 }, 229 }, 230 } 231 232 if handleErr := commentConsumer.HandleEvent(ctx, &commentEvent); handleErr != nil { 233 t.Fatalf("Failed to handle comment event: %v", handleErr) 234 } 235 236 // Verify comment was indexed in AppView 237 t.Logf("\n🔍 Verifying comment indexed in AppView...") 238 indexedComment, err := commentRepo.GetByURI(ctx, commentResp.URI) 239 if err != nil { 240 t.Fatalf("Comment not indexed in AppView: %v", err) 241 } 242 243 t.Logf("✅ Comment indexed in AppView:") 244 t.Logf(" CommenterDID: %s", indexedComment.CommenterDID) 245 t.Logf(" Content: %s", indexedComment.Content) 246 t.Logf(" RootURI: %s", indexedComment.RootURI) 247 t.Logf(" ParentURI: %s", indexedComment.ParentURI) 248 249 // Verify comment details 250 if indexedComment.CommenterDID != userDID { 251 t.Errorf("Expected commenter_did %s, got %s", userDID, indexedComment.CommenterDID) 252 } 253 if indexedComment.RootURI != postURI { 254 t.Errorf("Expected root_uri %s, got %s", postURI, indexedComment.RootURI) 255 } 256 if indexedComment.ParentURI != postURI { 257 t.Errorf("Expected parent_uri %s, got %s", postURI, indexedComment.ParentURI) 258 } 259 if indexedComment.Content != "This is a test comment on the post" { 260 t.Errorf("Expected content 'This is a test comment on the post', got %s", indexedComment.Content) 261 } 262 263 // Verify post comment count updated 264 t.Logf("\n🔍 Verifying post comment count updated...") 265 updatedPost, err := postRepo.GetByURI(ctx, postURI) 266 if err != nil { 267 t.Fatalf("Failed to get updated post: %v", err) 268 } 269 270 if updatedPost.CommentCount != 1 { 271 t.Errorf("Expected comment_count = 1, got %d", updatedPost.CommentCount) 272 } 273 274 t.Logf("✅ TRUE E2E COMMENT CREATE FLOW COMPLETE:") 275 t.Logf(" Client → Service → PDS Write → Jetstream → Consumer → AppView ✓") 276 t.Logf(" ✓ Comment written to PDS") 277 t.Logf(" ✓ Comment indexed in AppView") 278 t.Logf(" ✓ Post comment count updated") 279} 280 281// TestCommentWrite_CreateNestedReply tests creating a reply to another comment 282func TestCommentWrite_CreateNestedReply(t *testing.T) { 283 if testing.Short() { 284 t.Skip("Skipping E2E test in short mode") 285 } 286 287 db := setupTestDB(t) 288 defer func() { _ = db.Close() }() 289 290 ctx := context.Background() 291 pdsURL := getTestPDSURL() 292 293 // Setup repositories and service 294 commentRepo := postgres.NewCommentRepository(db) 295 postRepo := postgres.NewPostRepository(db) 296 297 // CommentPDSClientFactory creates a PDS client for comment operations 298 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 299 if session.AccessToken == "" { 300 return nil, fmt.Errorf("session has no access token") 301 } 302 if session.HostURL == "" { 303 return nil, fmt.Errorf("session has no host URL") 304 } 305 306 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 307 } 308 309 commentService := comments.NewCommentServiceWithPDSFactory( 310 commentRepo, 311 nil, 312 postRepo, 313 nil, 314 nil, 315 commentPDSFactory, 316 ) 317 318 // Create test user 319 testUserHandle := fmt.Sprintf("rpl%d.local.coves.dev", time.Now().UnixNano()%1000000) 320 testUserEmail := fmt.Sprintf("replier-%d@test.local", time.Now().Unix()) 321 testUserPassword := "test-password-123" 322 323 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 324 if err != nil { 325 t.Skipf("PDS not available: %v", err) 326 } 327 328 testUser := createTestUser(t, db, testUserHandle, userDID) 329 330 // Create test post and parent comment 331 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "reply-community", "owner.test") 332 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now()) 333 postCID := "bafypost456" 334 335 // Create parent comment directly in DB (simulating already-indexed comment) 336 parentCommentURI := fmt.Sprintf("at://%s/social.coves.community.comment/parent123", userDID) 337 parentCommentCID := "bafyparent123" 338 _, err = db.ExecContext(ctx, ` 339 INSERT INTO comments (uri, cid, rkey, commenter_did, root_uri, root_cid, parent_uri, parent_cid, content, created_at) 340 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW()) 341 `, parentCommentURI, parentCommentCID, "parent123", userDID, postURI, postCID, postURI, postCID, "Parent comment") 342 if err != nil { 343 t.Fatalf("Failed to create parent comment: %v", err) 344 } 345 346 // Setup OAuth 347 mockStore := NewMockOAuthStore() 348 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 349 350 // Create nested reply 351 t.Logf("\n📝 Creating nested reply...") 352 replyReq := comments.CreateCommentRequest{ 353 Reply: comments.ReplyRef{ 354 Root: comments.StrongRef{ 355 URI: postURI, 356 CID: postCID, 357 }, 358 Parent: comments.StrongRef{ 359 URI: parentCommentURI, 360 CID: parentCommentCID, 361 }, 362 }, 363 Content: "This is a reply to the parent comment", 364 Langs: []string{"en"}, 365 } 366 367 parsedDID, _ := parseTestDID(userDID) 368 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 369 370 replyResp, err := commentService.CreateComment(ctx, session, replyReq) 371 if err != nil { 372 t.Fatalf("Failed to create reply: %v", err) 373 } 374 375 t.Logf("✅ Reply created: %s", replyResp.URI) 376 377 // Simulate Jetstream indexing 378 rkey := utils.ExtractRKeyFromURI(replyResp.URI) 379 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 380 381 replyEvent := jetstream.JetstreamEvent{ 382 Did: userDID, 383 TimeUS: time.Now().UnixMicro(), 384 Kind: "commit", 385 Commit: &jetstream.CommitEvent{ 386 Rev: "test-reply-rev", 387 Operation: "create", 388 Collection: "social.coves.community.comment", 389 RKey: rkey, 390 CID: replyResp.CID, 391 Record: map[string]interface{}{ 392 "$type": "social.coves.community.comment", 393 "reply": map[string]interface{}{ 394 "root": map[string]interface{}{ 395 "uri": postURI, 396 "cid": postCID, 397 }, 398 "parent": map[string]interface{}{ 399 "uri": parentCommentURI, 400 "cid": parentCommentCID, 401 }, 402 }, 403 "content": "This is a reply to the parent comment", 404 "createdAt": time.Now().Format(time.RFC3339), 405 }, 406 }, 407 } 408 409 if handleErr := commentConsumer.HandleEvent(ctx, &replyEvent); handleErr != nil { 410 t.Fatalf("Failed to handle reply event: %v", handleErr) 411 } 412 413 // Verify reply was indexed with correct parent 414 indexedReply, err := commentRepo.GetByURI(ctx, replyResp.URI) 415 if err != nil { 416 t.Fatalf("Reply not indexed: %v", err) 417 } 418 419 if indexedReply.RootURI != postURI { 420 t.Errorf("Expected root_uri %s, got %s", postURI, indexedReply.RootURI) 421 } 422 if indexedReply.ParentURI != parentCommentURI { 423 t.Errorf("Expected parent_uri %s, got %s", parentCommentURI, indexedReply.ParentURI) 424 } 425 426 t.Logf("✅ NESTED REPLY FLOW COMPLETE:") 427 t.Logf(" ✓ Reply created with correct parent reference") 428 t.Logf(" ✓ Reply indexed in AppView") 429} 430 431// TestCommentWrite_UpdateComment tests updating an existing comment 432func TestCommentWrite_UpdateComment(t *testing.T) { 433 if testing.Short() { 434 t.Skip("Skipping E2E test in short mode") 435 } 436 437 db := setupTestDB(t) 438 defer func() { _ = db.Close() }() 439 440 ctx := context.Background() 441 pdsURL := getTestPDSURL() 442 443 // Setup repositories and service 444 commentRepo := postgres.NewCommentRepository(db) 445 446 // CommentPDSClientFactory creates a PDS client for comment operations 447 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 448 if session.AccessToken == "" { 449 return nil, fmt.Errorf("session has no access token") 450 } 451 if session.HostURL == "" { 452 return nil, fmt.Errorf("session has no host URL") 453 } 454 455 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 456 } 457 458 commentService := comments.NewCommentServiceWithPDSFactory( 459 commentRepo, 460 nil, 461 nil, 462 nil, 463 nil, 464 commentPDSFactory, 465 ) 466 467 // Create test user 468 testUserHandle := fmt.Sprintf("upd%d.local.coves.dev", time.Now().UnixNano()%1000000) 469 testUserEmail := fmt.Sprintf("updater-%d@test.local", time.Now().Unix()) 470 testUserPassword := "test-password-123" 471 472 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 473 if err != nil { 474 t.Skipf("PDS not available: %v", err) 475 } 476 477 // Setup OAuth 478 mockStore := NewMockOAuthStore() 479 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 480 481 parsedDID, _ := parseTestDID(userDID) 482 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 483 484 // First, create a comment to update 485 t.Logf("\n📝 Creating initial comment...") 486 createReq := comments.CreateCommentRequest{ 487 Reply: comments.ReplyRef{ 488 Root: comments.StrongRef{ 489 URI: "at://did:plc:test/social.coves.community.post/test123", 490 CID: "bafypost", 491 }, 492 Parent: comments.StrongRef{ 493 URI: "at://did:plc:test/social.coves.community.post/test123", 494 CID: "bafypost", 495 }, 496 }, 497 Content: "Original content", 498 Langs: []string{"en"}, 499 } 500 501 createResp, err := commentService.CreateComment(ctx, session, createReq) 502 if err != nil { 503 t.Fatalf("Failed to create comment: %v", err) 504 } 505 506 t.Logf("✅ Initial comment created: %s", createResp.URI) 507 508 // Now update the comment 509 t.Logf("\n📝 Updating comment...") 510 updateReq := comments.UpdateCommentRequest{ 511 URI: createResp.URI, 512 Content: "Updated content - this has been edited", 513 } 514 515 updateResp, err := commentService.UpdateComment(ctx, session, updateReq) 516 if err != nil { 517 t.Fatalf("Failed to update comment: %v", err) 518 } 519 520 t.Logf("✅ Comment updated:") 521 t.Logf(" URI: %s", updateResp.URI) 522 t.Logf(" New CID: %s", updateResp.CID) 523 524 // Verify the update on PDS 525 rkey := utils.ExtractRKeyFromURI(updateResp.URI) 526 pdsResp, err := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=social.coves.community.comment&rkey=%s", 527 pdsURL, userDID, rkey)) 528 if err != nil { 529 t.Fatalf("Failed to get record from PDS: %v", err) 530 } 531 defer pdsResp.Body.Close() 532 533 var pdsRecord struct { 534 Value map[string]interface{} `json:"value"` 535 CID string `json:"cid"` 536 } 537 if err := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); err != nil { 538 t.Fatalf("Failed to decode PDS response: %v", err) 539 } 540 541 if pdsRecord.Value["content"] != "Updated content - this has been edited" { 542 t.Errorf("Expected updated content, got %v", pdsRecord.Value["content"]) 543 } 544 545 t.Logf("✅ UPDATE FLOW COMPLETE:") 546 t.Logf(" ✓ Comment updated on PDS") 547 t.Logf(" ✓ New CID generated") 548 t.Logf(" ✓ Content verified") 549} 550 551// TestCommentWrite_DeleteComment tests deleting a comment 552func TestCommentWrite_DeleteComment(t *testing.T) { 553 if testing.Short() { 554 t.Skip("Skipping E2E test in short mode") 555 } 556 557 db := setupTestDB(t) 558 defer func() { _ = db.Close() }() 559 560 ctx := context.Background() 561 pdsURL := getTestPDSURL() 562 563 // Setup repositories and service 564 commentRepo := postgres.NewCommentRepository(db) 565 566 // CommentPDSClientFactory creates a PDS client for comment operations 567 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 568 if session.AccessToken == "" { 569 return nil, fmt.Errorf("session has no access token") 570 } 571 if session.HostURL == "" { 572 return nil, fmt.Errorf("session has no host URL") 573 } 574 575 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 576 } 577 578 commentService := comments.NewCommentServiceWithPDSFactory( 579 commentRepo, 580 nil, 581 nil, 582 nil, 583 nil, 584 commentPDSFactory, 585 ) 586 587 // Create test user 588 testUserHandle := fmt.Sprintf("del%d.local.coves.dev", time.Now().UnixNano()%1000000) 589 testUserEmail := fmt.Sprintf("deleter-%d@test.local", time.Now().Unix()) 590 testUserPassword := "test-password-123" 591 592 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 593 if err != nil { 594 t.Skipf("PDS not available: %v", err) 595 } 596 597 // Setup OAuth 598 mockStore := NewMockOAuthStore() 599 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 600 601 parsedDID, _ := parseTestDID(userDID) 602 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 603 604 // First, create a comment to delete 605 t.Logf("\n📝 Creating comment to delete...") 606 createReq := comments.CreateCommentRequest{ 607 Reply: comments.ReplyRef{ 608 Root: comments.StrongRef{ 609 URI: "at://did:plc:test/social.coves.community.post/test123", 610 CID: "bafypost", 611 }, 612 Parent: comments.StrongRef{ 613 URI: "at://did:plc:test/social.coves.community.post/test123", 614 CID: "bafypost", 615 }, 616 }, 617 Content: "This comment will be deleted", 618 Langs: []string{"en"}, 619 } 620 621 createResp, err := commentService.CreateComment(ctx, session, createReq) 622 if err != nil { 623 t.Fatalf("Failed to create comment: %v", err) 624 } 625 626 t.Logf("✅ Comment created: %s", createResp.URI) 627 628 // Now delete the comment 629 t.Logf("\n📝 Deleting comment...") 630 deleteReq := comments.DeleteCommentRequest{ 631 URI: createResp.URI, 632 } 633 634 err = commentService.DeleteComment(ctx, session, deleteReq) 635 if err != nil { 636 t.Fatalf("Failed to delete comment: %v", err) 637 } 638 639 t.Logf("✅ Comment deleted") 640 641 // Verify deletion on PDS 642 rkey := utils.ExtractRKeyFromURI(createResp.URI) 643 pdsResp, err := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=social.coves.community.comment&rkey=%s", 644 pdsURL, userDID, rkey)) 645 if err != nil { 646 t.Fatalf("Failed to get record from PDS: %v", err) 647 } 648 defer pdsResp.Body.Close() 649 650 if pdsResp.StatusCode != http.StatusBadRequest && pdsResp.StatusCode != http.StatusNotFound { 651 t.Errorf("Expected 400 or 404 for deleted comment, got %d", pdsResp.StatusCode) 652 } 653 654 t.Logf("✅ DELETE FLOW COMPLETE:") 655 t.Logf(" ✓ Comment deleted from PDS") 656 t.Logf(" ✓ Record no longer accessible") 657} 658 659// TestCommentWrite_CannotUpdateOthersComment tests authorization for updates 660func TestCommentWrite_CannotUpdateOthersComment(t *testing.T) { 661 if testing.Short() { 662 t.Skip("Skipping E2E test in short mode") 663 } 664 665 db := setupTestDB(t) 666 defer func() { _ = db.Close() }() 667 668 ctx := context.Background() 669 pdsURL := getTestPDSURL() 670 671 // CommentPDSClientFactory creates a PDS client for comment operations 672 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 673 if session.AccessToken == "" { 674 return nil, fmt.Errorf("session has no access token") 675 } 676 if session.HostURL == "" { 677 return nil, fmt.Errorf("session has no host URL") 678 } 679 680 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 681 } 682 683 // Setup service 684 commentService := comments.NewCommentServiceWithPDSFactory( 685 nil, 686 nil, 687 nil, 688 nil, 689 nil, 690 commentPDSFactory, 691 ) 692 693 // Create first user (comment owner) 694 ownerHandle := fmt.Sprintf("own%d.local.coves.dev", time.Now().UnixNano()%1000000) 695 ownerEmail := fmt.Sprintf("owner-%d@test.local", time.Now().Unix()) 696 _, ownerDID, err := createPDSAccount(pdsURL, ownerHandle, ownerEmail, "password123") 697 if err != nil { 698 t.Skipf("PDS not available: %v", err) 699 } 700 701 // Create second user (attacker) 702 attackerHandle := fmt.Sprintf("atk%d.local.coves.dev", time.Now().UnixNano()%1000000) 703 attackerEmail := fmt.Sprintf("attacker-%d@test.local", time.Now().Unix()) 704 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123") 705 if err != nil { 706 t.Skipf("PDS not available: %v", err) 707 } 708 709 // Setup OAuth for attacker 710 mockStore := NewMockOAuthStore() 711 mockStore.AddSessionWithPDS(attackerDID, "session-"+attackerDID, attackerToken, pdsURL) 712 713 parsedDID, _ := parseTestDID(attackerDID) 714 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+attackerDID) 715 716 // Try to update comment owned by different user 717 t.Logf("\n🚨 Attempting to update another user's comment...") 718 updateReq := comments.UpdateCommentRequest{ 719 URI: fmt.Sprintf("at://%s/social.coves.community.comment/test123", ownerDID), 720 Content: "Malicious update attempt", 721 } 722 723 _, err = commentService.UpdateComment(ctx, session, updateReq) 724 725 // Verify authorization error 726 if err == nil { 727 t.Fatal("Expected authorization error, got nil") 728 } 729 if !errors.Is(err, comments.ErrNotAuthorized) { 730 t.Errorf("Expected ErrNotAuthorized, got: %v", err) 731 } 732 733 t.Logf("✅ AUTHORIZATION CHECK PASSED:") 734 t.Logf(" ✓ User cannot update others' comments") 735} 736 737// TestCommentWrite_CannotDeleteOthersComment tests authorization for deletes 738func TestCommentWrite_CannotDeleteOthersComment(t *testing.T) { 739 if testing.Short() { 740 t.Skip("Skipping E2E test in short mode") 741 } 742 743 db := setupTestDB(t) 744 defer func() { _ = db.Close() }() 745 746 ctx := context.Background() 747 pdsURL := getTestPDSURL() 748 749 // CommentPDSClientFactory creates a PDS client for comment operations 750 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 751 if session.AccessToken == "" { 752 return nil, fmt.Errorf("session has no access token") 753 } 754 if session.HostURL == "" { 755 return nil, fmt.Errorf("session has no host URL") 756 } 757 758 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 759 } 760 761 // Setup service 762 commentService := comments.NewCommentServiceWithPDSFactory( 763 nil, 764 nil, 765 nil, 766 nil, 767 nil, 768 commentPDSFactory, 769 ) 770 771 // Create first user (comment owner) 772 ownerHandle := fmt.Sprintf("own%d.local.coves.dev", time.Now().UnixNano()%1000000) 773 ownerEmail := fmt.Sprintf("owner-%d@test.local", time.Now().Unix()) 774 _, ownerDID, err := createPDSAccount(pdsURL, ownerHandle, ownerEmail, "password123") 775 if err != nil { 776 t.Skipf("PDS not available: %v", err) 777 } 778 779 // Create second user (attacker) 780 attackerHandle := fmt.Sprintf("atk%d.local.coves.dev", time.Now().UnixNano()%1000000) 781 attackerEmail := fmt.Sprintf("attacker-%d@test.local", time.Now().Unix()) 782 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123") 783 if err != nil { 784 t.Skipf("PDS not available: %v", err) 785 } 786 787 // Setup OAuth for attacker 788 mockStore := NewMockOAuthStore() 789 mockStore.AddSessionWithPDS(attackerDID, "session-"+attackerDID, attackerToken, pdsURL) 790 791 parsedDID, _ := parseTestDID(attackerDID) 792 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+attackerDID) 793 794 // Try to delete comment owned by different user 795 t.Logf("\n🚨 Attempting to delete another user's comment...") 796 deleteReq := comments.DeleteCommentRequest{ 797 URI: fmt.Sprintf("at://%s/social.coves.community.comment/test123", ownerDID), 798 } 799 800 err = commentService.DeleteComment(ctx, session, deleteReq) 801 802 // Verify authorization error 803 if err == nil { 804 t.Fatal("Expected authorization error, got nil") 805 } 806 if !errors.Is(err, comments.ErrNotAuthorized) { 807 t.Errorf("Expected ErrNotAuthorized, got: %v", err) 808 } 809 810 t.Logf("✅ AUTHORIZATION CHECK PASSED:") 811 t.Logf(" ✓ User cannot delete others' comments") 812} 813 814// Helper function to parse DID for testing 815func parseTestDID(did string) (syntax.DID, error) { 816 return syntax.ParseDID(did) 817} 818 819// TestCommentWrite_ConcurrentModificationDetection tests that PutRecord's swapRecord 820// CID validation correctly detects concurrent modifications. 821// This verifies the optimistic locking mechanism that prevents lost updates. 822func TestCommentWrite_ConcurrentModificationDetection(t *testing.T) { 823 if testing.Short() { 824 t.Skip("Skipping E2E test in short mode") 825 } 826 827 db := setupTestDB(t) 828 defer func() { _ = db.Close() }() 829 830 ctx := context.Background() 831 pdsURL := getTestPDSURL() 832 833 // Setup repositories and service 834 commentRepo := postgres.NewCommentRepository(db) 835 836 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 837 if session.AccessToken == "" { 838 return nil, fmt.Errorf("session has no access token") 839 } 840 if session.HostURL == "" { 841 return nil, fmt.Errorf("session has no host URL") 842 } 843 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 844 } 845 846 commentService := comments.NewCommentServiceWithPDSFactory( 847 commentRepo, 848 nil, 849 nil, 850 nil, 851 nil, 852 commentPDSFactory, 853 ) 854 855 // Create test user 856 testUserHandle := fmt.Sprintf("cnc%d.local.coves.dev", time.Now().UnixNano()%1000000) 857 testUserEmail := fmt.Sprintf("concurrency-%d@test.local", time.Now().Unix()) 858 testUserPassword := "test-password-123" 859 860 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 861 if err != nil { 862 t.Skipf("PDS not available: %v", err) 863 } 864 865 // Setup OAuth 866 mockStore := NewMockOAuthStore() 867 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 868 869 parsedDID, _ := parseTestDID(userDID) 870 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 871 872 // Step 1: Create a comment 873 t.Logf("\n📝 Step 1: Creating initial comment...") 874 createReq := comments.CreateCommentRequest{ 875 Reply: comments.ReplyRef{ 876 Root: comments.StrongRef{ 877 URI: "at://did:plc:test/social.coves.community.post/test123", 878 CID: "bafypost", 879 }, 880 Parent: comments.StrongRef{ 881 URI: "at://did:plc:test/social.coves.community.post/test123", 882 CID: "bafypost", 883 }, 884 }, 885 Content: "Original content for concurrency test", 886 Langs: []string{"en"}, 887 } 888 889 createResp, err := commentService.CreateComment(ctx, session, createReq) 890 if err != nil { 891 t.Fatalf("Failed to create comment: %v", err) 892 } 893 t.Logf("✅ Comment created: URI=%s, CID=%s", createResp.URI, createResp.CID) 894 originalCID := createResp.CID 895 896 // Step 2: Update the comment (this changes the CID) 897 t.Logf("\n📝 Step 2: Updating comment (this changes CID)...") 898 updateReq := comments.UpdateCommentRequest{ 899 URI: createResp.URI, 900 Content: "Updated content - CID has changed", 901 } 902 903 updateResp, err := commentService.UpdateComment(ctx, session, updateReq) 904 if err != nil { 905 t.Fatalf("Failed to update comment: %v", err) 906 } 907 t.Logf("✅ Comment updated: New CID=%s", updateResp.CID) 908 newCID := updateResp.CID 909 910 // Verify CIDs are different 911 if originalCID == newCID { 912 t.Fatalf("CIDs should be different after update: original=%s, new=%s", originalCID, newCID) 913 } 914 915 // Step 3: Simulate concurrent modification detection using direct PDS client 916 // Create a PDS client and attempt to update with the stale (original) CID 917 t.Logf("\n🔍 Step 3: Testing concurrent modification detection with stale CID...") 918 919 pdsClient, err := pds.NewFromAccessToken(pdsURL, userDID, pdsAccessToken) 920 if err != nil { 921 t.Fatalf("Failed to create PDS client: %v", err) 922 } 923 924 rkey := utils.ExtractRKeyFromURI(createResp.URI) 925 926 // Try to update with the ORIGINAL (now stale) CID - this should fail with 409 927 staleRecord := map[string]interface{}{ 928 "$type": "social.coves.community.comment", 929 "reply": map[string]interface{}{ 930 "root": map[string]interface{}{ 931 "uri": "at://did:plc:test/social.coves.community.post/test123", 932 "cid": "bafypost", 933 }, 934 "parent": map[string]interface{}{ 935 "uri": "at://did:plc:test/social.coves.community.post/test123", 936 "cid": "bafypost", 937 }, 938 }, 939 "content": "This update should fail - using stale CID", 940 "createdAt": time.Now().UTC().Format(time.RFC3339), 941 } 942 943 _, _, err = pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, staleRecord, originalCID) 944 945 // Verify we get an error indicating CID mismatch 946 // PDS returns 400 "bad request" with message "Record was at <cid>" when swap CID doesn't match 947 if err == nil { 948 t.Fatal("Expected error when updating with stale CID, got nil") 949 } 950 951 // Check for either ErrConflict (409) or CID mismatch error (400) 952 errMsg := err.Error() 953 isCIDMismatch := strings.Contains(errMsg, "Record was at") || errors.Is(err, pds.ErrConflict) 954 if !isCIDMismatch { 955 t.Errorf("Expected CID mismatch or ErrConflict, got: %v", err) 956 } 957 958 t.Logf("✅ Correctly detected concurrent modification!") 959 t.Logf(" Error: %v", err) 960 961 // Step 4: Verify that updating with the correct CID succeeds 962 t.Logf("\n📝 Step 4: Verifying update with correct CID succeeds...") 963 correctRecord := map[string]interface{}{ 964 "$type": "social.coves.community.comment", 965 "reply": map[string]interface{}{ 966 "root": map[string]interface{}{ 967 "uri": "at://did:plc:test/social.coves.community.post/test123", 968 "cid": "bafypost", 969 }, 970 "parent": map[string]interface{}{ 971 "uri": "at://did:plc:test/social.coves.community.post/test123", 972 "cid": "bafypost", 973 }, 974 }, 975 "content": "This update should succeed - using correct CID", 976 "createdAt": time.Now().UTC().Format(time.RFC3339), 977 } 978 979 _, finalCID, err := pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, correctRecord, newCID) 980 if err != nil { 981 t.Fatalf("Update with correct CID should succeed, got: %v", err) 982 } 983 984 t.Logf("✅ Update with correct CID succeeded: New CID=%s", finalCID) 985 986 t.Logf("\n✅ CONCURRENT MODIFICATION DETECTION TEST COMPLETE:") 987 t.Logf(" ✓ PutRecord with stale CID correctly returns ErrConflict") 988 t.Logf(" ✓ PutRecord with correct CID succeeds") 989 t.Logf(" ✓ Optimistic locking prevents lost updates") 990}