A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/atproto/jetstream"
5 "Coves/internal/atproto/pds"
6 "Coves/internal/atproto/utils"
7 "Coves/internal/core/comments"
8 "Coves/internal/db/postgres"
9 "context"
10 "database/sql"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "net/http"
16 "os"
17 "strings"
18 "testing"
19 "time"
20
21 oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth"
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 _ "github.com/lib/pq"
24 "github.com/pressly/goose/v3"
25)
26
27// TestCommentWrite_CreateTopLevelComment tests creating a comment on a post via E2E flow
28func TestCommentWrite_CreateTopLevelComment(t *testing.T) {
29 // Skip in short mode since this requires real PDS
30 if testing.Short() {
31 t.Skip("Skipping E2E test in short mode")
32 }
33
34 // Setup test database
35 dbURL := os.Getenv("TEST_DATABASE_URL")
36 if dbURL == "" {
37 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
38 }
39
40 db, err := sql.Open("postgres", dbURL)
41 if err != nil {
42 t.Fatalf("Failed to connect to test database: %v", err)
43 }
44 defer func() {
45 if closeErr := db.Close(); closeErr != nil {
46 t.Logf("Failed to close database: %v", closeErr)
47 }
48 }()
49
50 // Run migrations
51 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
52 t.Fatalf("Failed to set goose dialect: %v", dialectErr)
53 }
54 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
55 t.Fatalf("Failed to run migrations: %v", migrateErr)
56 }
57
58 // Check if PDS is running
59 pdsURL := getTestPDSURL()
60 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
61 if err != nil {
62 t.Skipf("PDS not running at %s: %v", pdsURL, err)
63 }
64 func() {
65 if closeErr := healthResp.Body.Close(); closeErr != nil {
66 t.Logf("Failed to close health response: %v", closeErr)
67 }
68 }()
69
70 ctx := context.Background()
71
72 // Setup repositories
73 commentRepo := postgres.NewCommentRepository(db)
74 postRepo := postgres.NewPostRepository(db)
75
76 // Setup service with password-based PDS client factory for E2E testing
77 // CommentPDSClientFactory creates a PDS client for comment operations
78 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
79 if session.AccessToken == "" {
80 return nil, fmt.Errorf("session has no access token")
81 }
82 if session.HostURL == "" {
83 return nil, fmt.Errorf("session has no host URL")
84 }
85
86 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
87 }
88
89 commentService := comments.NewCommentServiceWithPDSFactory(
90 commentRepo,
91 nil, // userRepo not needed for write ops
92 postRepo,
93 nil, // communityRepo not needed for write ops
94 nil, // logger
95 commentPDSFactory,
96 )
97
98 // Create test user on PDS
99 testUserHandle := fmt.Sprintf("cmw%d.local.coves.dev", time.Now().UnixNano()%1000000)
100 testUserEmail := fmt.Sprintf("commenter-%d@test.local", time.Now().Unix())
101 testUserPassword := "test-password-123"
102
103 t.Logf("Creating test user on PDS: %s", testUserHandle)
104 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
105 if err != nil {
106 t.Fatalf("Failed to create test user on PDS: %v", err)
107 }
108 t.Logf("Test user created: DID=%s", userDID)
109
110 // Index user in AppView
111 testUser := createTestUser(t, db, testUserHandle, userDID)
112
113 // Create test community and post to comment on
114 testCommunityDID, err := createFeedTestCommunity(db, ctx, "test-community", "owner.test")
115 if err != nil {
116 t.Fatalf("Failed to create test community: %v", err)
117 }
118
119 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
120 postCID := "bafypost123"
121
122 // Create mock OAuth session for service layer
123 mockStore := NewMockOAuthStore()
124 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL)
125
126 // ====================================================================================
127 // TEST: Create top-level comment on post
128 // ====================================================================================
129 t.Logf("\n📝 Creating top-level comment via service...")
130
131 commentReq := comments.CreateCommentRequest{
132 Reply: comments.ReplyRef{
133 Root: comments.StrongRef{
134 URI: postURI,
135 CID: postCID,
136 },
137 Parent: comments.StrongRef{
138 URI: postURI,
139 CID: postCID,
140 },
141 },
142 Content: "This is a test comment on the post",
143 Langs: []string{"en"},
144 }
145
146 // Get session from store
147 parsedDID, _ := parseTestDID(userDID)
148 session, err := mockStore.GetSession(ctx, parsedDID, "session-"+userDID)
149 if err != nil {
150 t.Fatalf("Failed to get session: %v", err)
151 }
152
153 commentResp, err := commentService.CreateComment(ctx, session, commentReq)
154 if err != nil {
155 t.Fatalf("Failed to create comment: %v", err)
156 }
157
158 t.Logf("✅ Comment created:")
159 t.Logf(" URI: %s", commentResp.URI)
160 t.Logf(" CID: %s", commentResp.CID)
161
162 // Verify comment record was written to PDS
163 t.Logf("\n🔍 Verifying comment record on PDS...")
164 rkey := utils.ExtractRKeyFromURI(commentResp.URI)
165 collection := "social.coves.community.comment"
166
167 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
168 pdsURL, userDID, collection, rkey))
169 if pdsErr != nil {
170 t.Fatalf("Failed to fetch comment record from PDS: %v", pdsErr)
171 }
172 defer func() {
173 if closeErr := pdsResp.Body.Close(); closeErr != nil {
174 t.Logf("Failed to close PDS response: %v", closeErr)
175 }
176 }()
177
178 if pdsResp.StatusCode != http.StatusOK {
179 body, _ := io.ReadAll(pdsResp.Body)
180 t.Fatalf("Comment record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body))
181 }
182
183 var pdsRecord struct {
184 Value map[string]interface{} `json:"value"`
185 CID string `json:"cid"`
186 }
187 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil {
188 t.Fatalf("Failed to decode PDS record: %v", decodeErr)
189 }
190
191 t.Logf("✅ Comment record found on PDS:")
192 t.Logf(" CID: %s", pdsRecord.CID)
193 t.Logf(" Content: %v", pdsRecord.Value["content"])
194
195 // Verify content
196 if pdsRecord.Value["content"] != "This is a test comment on the post" {
197 t.Errorf("Expected content 'This is a test comment on the post', got %v", pdsRecord.Value["content"])
198 }
199
200 // Simulate Jetstream consumer indexing the comment
201 t.Logf("\n🔄 Simulating Jetstream consumer indexing comment...")
202 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
203
204 commentEvent := jetstream.JetstreamEvent{
205 Did: userDID,
206 TimeUS: time.Now().UnixMicro(),
207 Kind: "commit",
208 Commit: &jetstream.CommitEvent{
209 Rev: "test-comment-rev",
210 Operation: "create",
211 Collection: "social.coves.community.comment",
212 RKey: rkey,
213 CID: pdsRecord.CID,
214 Record: map[string]interface{}{
215 "$type": "social.coves.community.comment",
216 "reply": map[string]interface{}{
217 "root": map[string]interface{}{
218 "uri": postURI,
219 "cid": postCID,
220 },
221 "parent": map[string]interface{}{
222 "uri": postURI,
223 "cid": postCID,
224 },
225 },
226 "content": "This is a test comment on the post",
227 "createdAt": time.Now().Format(time.RFC3339),
228 },
229 },
230 }
231
232 if handleErr := commentConsumer.HandleEvent(ctx, &commentEvent); handleErr != nil {
233 t.Fatalf("Failed to handle comment event: %v", handleErr)
234 }
235
236 // Verify comment was indexed in AppView
237 t.Logf("\n🔍 Verifying comment indexed in AppView...")
238 indexedComment, err := commentRepo.GetByURI(ctx, commentResp.URI)
239 if err != nil {
240 t.Fatalf("Comment not indexed in AppView: %v", err)
241 }
242
243 t.Logf("✅ Comment indexed in AppView:")
244 t.Logf(" CommenterDID: %s", indexedComment.CommenterDID)
245 t.Logf(" Content: %s", indexedComment.Content)
246 t.Logf(" RootURI: %s", indexedComment.RootURI)
247 t.Logf(" ParentURI: %s", indexedComment.ParentURI)
248
249 // Verify comment details
250 if indexedComment.CommenterDID != userDID {
251 t.Errorf("Expected commenter_did %s, got %s", userDID, indexedComment.CommenterDID)
252 }
253 if indexedComment.RootURI != postURI {
254 t.Errorf("Expected root_uri %s, got %s", postURI, indexedComment.RootURI)
255 }
256 if indexedComment.ParentURI != postURI {
257 t.Errorf("Expected parent_uri %s, got %s", postURI, indexedComment.ParentURI)
258 }
259 if indexedComment.Content != "This is a test comment on the post" {
260 t.Errorf("Expected content 'This is a test comment on the post', got %s", indexedComment.Content)
261 }
262
263 // Verify post comment count updated
264 t.Logf("\n🔍 Verifying post comment count updated...")
265 updatedPost, err := postRepo.GetByURI(ctx, postURI)
266 if err != nil {
267 t.Fatalf("Failed to get updated post: %v", err)
268 }
269
270 if updatedPost.CommentCount != 1 {
271 t.Errorf("Expected comment_count = 1, got %d", updatedPost.CommentCount)
272 }
273
274 t.Logf("✅ TRUE E2E COMMENT CREATE FLOW COMPLETE:")
275 t.Logf(" Client → Service → PDS Write → Jetstream → Consumer → AppView ✓")
276 t.Logf(" ✓ Comment written to PDS")
277 t.Logf(" ✓ Comment indexed in AppView")
278 t.Logf(" ✓ Post comment count updated")
279}
280
281// TestCommentWrite_CreateNestedReply tests creating a reply to another comment
282func TestCommentWrite_CreateNestedReply(t *testing.T) {
283 if testing.Short() {
284 t.Skip("Skipping E2E test in short mode")
285 }
286
287 db := setupTestDB(t)
288 defer func() { _ = db.Close() }()
289
290 ctx := context.Background()
291 pdsURL := getTestPDSURL()
292
293 // Setup repositories and service
294 commentRepo := postgres.NewCommentRepository(db)
295 postRepo := postgres.NewPostRepository(db)
296
297 // CommentPDSClientFactory creates a PDS client for comment operations
298 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
299 if session.AccessToken == "" {
300 return nil, fmt.Errorf("session has no access token")
301 }
302 if session.HostURL == "" {
303 return nil, fmt.Errorf("session has no host URL")
304 }
305
306 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
307 }
308
309 commentService := comments.NewCommentServiceWithPDSFactory(
310 commentRepo,
311 nil,
312 postRepo,
313 nil,
314 nil,
315 commentPDSFactory,
316 )
317
318 // Create test user
319 testUserHandle := fmt.Sprintf("rpl%d.local.coves.dev", time.Now().UnixNano()%1000000)
320 testUserEmail := fmt.Sprintf("replier-%d@test.local", time.Now().Unix())
321 testUserPassword := "test-password-123"
322
323 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
324 if err != nil {
325 t.Skipf("PDS not available: %v", err)
326 }
327
328 testUser := createTestUser(t, db, testUserHandle, userDID)
329
330 // Create test post and parent comment
331 testCommunityDID, _ := createFeedTestCommunity(db, ctx, "reply-community", "owner.test")
332 postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post", 0, time.Now())
333 postCID := "bafypost456"
334
335 // Create parent comment directly in DB (simulating already-indexed comment)
336 parentCommentURI := fmt.Sprintf("at://%s/social.coves.community.comment/parent123", userDID)
337 parentCommentCID := "bafyparent123"
338 _, err = db.ExecContext(ctx, `
339 INSERT INTO comments (uri, cid, rkey, commenter_did, root_uri, root_cid, parent_uri, parent_cid, content, created_at)
340 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
341 `, parentCommentURI, parentCommentCID, "parent123", userDID, postURI, postCID, postURI, postCID, "Parent comment")
342 if err != nil {
343 t.Fatalf("Failed to create parent comment: %v", err)
344 }
345
346 // Setup OAuth
347 mockStore := NewMockOAuthStore()
348 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL)
349
350 // Create nested reply
351 t.Logf("\n📝 Creating nested reply...")
352 replyReq := comments.CreateCommentRequest{
353 Reply: comments.ReplyRef{
354 Root: comments.StrongRef{
355 URI: postURI,
356 CID: postCID,
357 },
358 Parent: comments.StrongRef{
359 URI: parentCommentURI,
360 CID: parentCommentCID,
361 },
362 },
363 Content: "This is a reply to the parent comment",
364 Langs: []string{"en"},
365 }
366
367 parsedDID, _ := parseTestDID(userDID)
368 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID)
369
370 replyResp, err := commentService.CreateComment(ctx, session, replyReq)
371 if err != nil {
372 t.Fatalf("Failed to create reply: %v", err)
373 }
374
375 t.Logf("✅ Reply created: %s", replyResp.URI)
376
377 // Simulate Jetstream indexing
378 rkey := utils.ExtractRKeyFromURI(replyResp.URI)
379 commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
380
381 replyEvent := jetstream.JetstreamEvent{
382 Did: userDID,
383 TimeUS: time.Now().UnixMicro(),
384 Kind: "commit",
385 Commit: &jetstream.CommitEvent{
386 Rev: "test-reply-rev",
387 Operation: "create",
388 Collection: "social.coves.community.comment",
389 RKey: rkey,
390 CID: replyResp.CID,
391 Record: map[string]interface{}{
392 "$type": "social.coves.community.comment",
393 "reply": map[string]interface{}{
394 "root": map[string]interface{}{
395 "uri": postURI,
396 "cid": postCID,
397 },
398 "parent": map[string]interface{}{
399 "uri": parentCommentURI,
400 "cid": parentCommentCID,
401 },
402 },
403 "content": "This is a reply to the parent comment",
404 "createdAt": time.Now().Format(time.RFC3339),
405 },
406 },
407 }
408
409 if handleErr := commentConsumer.HandleEvent(ctx, &replyEvent); handleErr != nil {
410 t.Fatalf("Failed to handle reply event: %v", handleErr)
411 }
412
413 // Verify reply was indexed with correct parent
414 indexedReply, err := commentRepo.GetByURI(ctx, replyResp.URI)
415 if err != nil {
416 t.Fatalf("Reply not indexed: %v", err)
417 }
418
419 if indexedReply.RootURI != postURI {
420 t.Errorf("Expected root_uri %s, got %s", postURI, indexedReply.RootURI)
421 }
422 if indexedReply.ParentURI != parentCommentURI {
423 t.Errorf("Expected parent_uri %s, got %s", parentCommentURI, indexedReply.ParentURI)
424 }
425
426 t.Logf("✅ NESTED REPLY FLOW COMPLETE:")
427 t.Logf(" ✓ Reply created with correct parent reference")
428 t.Logf(" ✓ Reply indexed in AppView")
429}
430
431// TestCommentWrite_UpdateComment tests updating an existing comment
432func TestCommentWrite_UpdateComment(t *testing.T) {
433 if testing.Short() {
434 t.Skip("Skipping E2E test in short mode")
435 }
436
437 db := setupTestDB(t)
438 defer func() { _ = db.Close() }()
439
440 ctx := context.Background()
441 pdsURL := getTestPDSURL()
442
443 // Setup repositories and service
444 commentRepo := postgres.NewCommentRepository(db)
445
446 // CommentPDSClientFactory creates a PDS client for comment operations
447 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
448 if session.AccessToken == "" {
449 return nil, fmt.Errorf("session has no access token")
450 }
451 if session.HostURL == "" {
452 return nil, fmt.Errorf("session has no host URL")
453 }
454
455 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
456 }
457
458 commentService := comments.NewCommentServiceWithPDSFactory(
459 commentRepo,
460 nil,
461 nil,
462 nil,
463 nil,
464 commentPDSFactory,
465 )
466
467 // Create test user
468 testUserHandle := fmt.Sprintf("upd%d.local.coves.dev", time.Now().UnixNano()%1000000)
469 testUserEmail := fmt.Sprintf("updater-%d@test.local", time.Now().Unix())
470 testUserPassword := "test-password-123"
471
472 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
473 if err != nil {
474 t.Skipf("PDS not available: %v", err)
475 }
476
477 // Setup OAuth
478 mockStore := NewMockOAuthStore()
479 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL)
480
481 parsedDID, _ := parseTestDID(userDID)
482 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID)
483
484 // First, create a comment to update
485 t.Logf("\n📝 Creating initial comment...")
486 createReq := comments.CreateCommentRequest{
487 Reply: comments.ReplyRef{
488 Root: comments.StrongRef{
489 URI: "at://did:plc:test/social.coves.community.post/test123",
490 CID: "bafypost",
491 },
492 Parent: comments.StrongRef{
493 URI: "at://did:plc:test/social.coves.community.post/test123",
494 CID: "bafypost",
495 },
496 },
497 Content: "Original content",
498 Langs: []string{"en"},
499 }
500
501 createResp, err := commentService.CreateComment(ctx, session, createReq)
502 if err != nil {
503 t.Fatalf("Failed to create comment: %v", err)
504 }
505
506 t.Logf("✅ Initial comment created: %s", createResp.URI)
507
508 // Now update the comment
509 t.Logf("\n📝 Updating comment...")
510 updateReq := comments.UpdateCommentRequest{
511 URI: createResp.URI,
512 Content: "Updated content - this has been edited",
513 }
514
515 updateResp, err := commentService.UpdateComment(ctx, session, updateReq)
516 if err != nil {
517 t.Fatalf("Failed to update comment: %v", err)
518 }
519
520 t.Logf("✅ Comment updated:")
521 t.Logf(" URI: %s", updateResp.URI)
522 t.Logf(" New CID: %s", updateResp.CID)
523
524 // Verify the update on PDS
525 rkey := utils.ExtractRKeyFromURI(updateResp.URI)
526 pdsResp, err := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=social.coves.community.comment&rkey=%s",
527 pdsURL, userDID, rkey))
528 if err != nil {
529 t.Fatalf("Failed to get record from PDS: %v", err)
530 }
531 defer pdsResp.Body.Close()
532
533 var pdsRecord struct {
534 Value map[string]interface{} `json:"value"`
535 CID string `json:"cid"`
536 }
537 if err := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); err != nil {
538 t.Fatalf("Failed to decode PDS response: %v", err)
539 }
540
541 if pdsRecord.Value["content"] != "Updated content - this has been edited" {
542 t.Errorf("Expected updated content, got %v", pdsRecord.Value["content"])
543 }
544
545 t.Logf("✅ UPDATE FLOW COMPLETE:")
546 t.Logf(" ✓ Comment updated on PDS")
547 t.Logf(" ✓ New CID generated")
548 t.Logf(" ✓ Content verified")
549}
550
551// TestCommentWrite_DeleteComment tests deleting a comment
552func TestCommentWrite_DeleteComment(t *testing.T) {
553 if testing.Short() {
554 t.Skip("Skipping E2E test in short mode")
555 }
556
557 db := setupTestDB(t)
558 defer func() { _ = db.Close() }()
559
560 ctx := context.Background()
561 pdsURL := getTestPDSURL()
562
563 // Setup repositories and service
564 commentRepo := postgres.NewCommentRepository(db)
565
566 // CommentPDSClientFactory creates a PDS client for comment operations
567 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
568 if session.AccessToken == "" {
569 return nil, fmt.Errorf("session has no access token")
570 }
571 if session.HostURL == "" {
572 return nil, fmt.Errorf("session has no host URL")
573 }
574
575 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
576 }
577
578 commentService := comments.NewCommentServiceWithPDSFactory(
579 commentRepo,
580 nil,
581 nil,
582 nil,
583 nil,
584 commentPDSFactory,
585 )
586
587 // Create test user
588 testUserHandle := fmt.Sprintf("del%d.local.coves.dev", time.Now().UnixNano()%1000000)
589 testUserEmail := fmt.Sprintf("deleter-%d@test.local", time.Now().Unix())
590 testUserPassword := "test-password-123"
591
592 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
593 if err != nil {
594 t.Skipf("PDS not available: %v", err)
595 }
596
597 // Setup OAuth
598 mockStore := NewMockOAuthStore()
599 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL)
600
601 parsedDID, _ := parseTestDID(userDID)
602 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID)
603
604 // First, create a comment to delete
605 t.Logf("\n📝 Creating comment to delete...")
606 createReq := comments.CreateCommentRequest{
607 Reply: comments.ReplyRef{
608 Root: comments.StrongRef{
609 URI: "at://did:plc:test/social.coves.community.post/test123",
610 CID: "bafypost",
611 },
612 Parent: comments.StrongRef{
613 URI: "at://did:plc:test/social.coves.community.post/test123",
614 CID: "bafypost",
615 },
616 },
617 Content: "This comment will be deleted",
618 Langs: []string{"en"},
619 }
620
621 createResp, err := commentService.CreateComment(ctx, session, createReq)
622 if err != nil {
623 t.Fatalf("Failed to create comment: %v", err)
624 }
625
626 t.Logf("✅ Comment created: %s", createResp.URI)
627
628 // Now delete the comment
629 t.Logf("\n📝 Deleting comment...")
630 deleteReq := comments.DeleteCommentRequest{
631 URI: createResp.URI,
632 }
633
634 err = commentService.DeleteComment(ctx, session, deleteReq)
635 if err != nil {
636 t.Fatalf("Failed to delete comment: %v", err)
637 }
638
639 t.Logf("✅ Comment deleted")
640
641 // Verify deletion on PDS
642 rkey := utils.ExtractRKeyFromURI(createResp.URI)
643 pdsResp, err := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=social.coves.community.comment&rkey=%s",
644 pdsURL, userDID, rkey))
645 if err != nil {
646 t.Fatalf("Failed to get record from PDS: %v", err)
647 }
648 defer pdsResp.Body.Close()
649
650 if pdsResp.StatusCode != http.StatusBadRequest && pdsResp.StatusCode != http.StatusNotFound {
651 t.Errorf("Expected 400 or 404 for deleted comment, got %d", pdsResp.StatusCode)
652 }
653
654 t.Logf("✅ DELETE FLOW COMPLETE:")
655 t.Logf(" ✓ Comment deleted from PDS")
656 t.Logf(" ✓ Record no longer accessible")
657}
658
659// TestCommentWrite_CannotUpdateOthersComment tests authorization for updates
660func TestCommentWrite_CannotUpdateOthersComment(t *testing.T) {
661 if testing.Short() {
662 t.Skip("Skipping E2E test in short mode")
663 }
664
665 db := setupTestDB(t)
666 defer func() { _ = db.Close() }()
667
668 ctx := context.Background()
669 pdsURL := getTestPDSURL()
670
671 // CommentPDSClientFactory creates a PDS client for comment operations
672 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
673 if session.AccessToken == "" {
674 return nil, fmt.Errorf("session has no access token")
675 }
676 if session.HostURL == "" {
677 return nil, fmt.Errorf("session has no host URL")
678 }
679
680 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
681 }
682
683 // Setup service
684 commentService := comments.NewCommentServiceWithPDSFactory(
685 nil,
686 nil,
687 nil,
688 nil,
689 nil,
690 commentPDSFactory,
691 )
692
693 // Create first user (comment owner)
694 ownerHandle := fmt.Sprintf("own%d.local.coves.dev", time.Now().UnixNano()%1000000)
695 ownerEmail := fmt.Sprintf("owner-%d@test.local", time.Now().Unix())
696 _, ownerDID, err := createPDSAccount(pdsURL, ownerHandle, ownerEmail, "password123")
697 if err != nil {
698 t.Skipf("PDS not available: %v", err)
699 }
700
701 // Create second user (attacker)
702 attackerHandle := fmt.Sprintf("atk%d.local.coves.dev", time.Now().UnixNano()%1000000)
703 attackerEmail := fmt.Sprintf("attacker-%d@test.local", time.Now().Unix())
704 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123")
705 if err != nil {
706 t.Skipf("PDS not available: %v", err)
707 }
708
709 // Setup OAuth for attacker
710 mockStore := NewMockOAuthStore()
711 mockStore.AddSessionWithPDS(attackerDID, "session-"+attackerDID, attackerToken, pdsURL)
712
713 parsedDID, _ := parseTestDID(attackerDID)
714 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+attackerDID)
715
716 // Try to update comment owned by different user
717 t.Logf("\n🚨 Attempting to update another user's comment...")
718 updateReq := comments.UpdateCommentRequest{
719 URI: fmt.Sprintf("at://%s/social.coves.community.comment/test123", ownerDID),
720 Content: "Malicious update attempt",
721 }
722
723 _, err = commentService.UpdateComment(ctx, session, updateReq)
724
725 // Verify authorization error
726 if err == nil {
727 t.Fatal("Expected authorization error, got nil")
728 }
729 if !errors.Is(err, comments.ErrNotAuthorized) {
730 t.Errorf("Expected ErrNotAuthorized, got: %v", err)
731 }
732
733 t.Logf("✅ AUTHORIZATION CHECK PASSED:")
734 t.Logf(" ✓ User cannot update others' comments")
735}
736
737// TestCommentWrite_CannotDeleteOthersComment tests authorization for deletes
738func TestCommentWrite_CannotDeleteOthersComment(t *testing.T) {
739 if testing.Short() {
740 t.Skip("Skipping E2E test in short mode")
741 }
742
743 db := setupTestDB(t)
744 defer func() { _ = db.Close() }()
745
746 ctx := context.Background()
747 pdsURL := getTestPDSURL()
748
749 // CommentPDSClientFactory creates a PDS client for comment operations
750 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
751 if session.AccessToken == "" {
752 return nil, fmt.Errorf("session has no access token")
753 }
754 if session.HostURL == "" {
755 return nil, fmt.Errorf("session has no host URL")
756 }
757
758 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
759 }
760
761 // Setup service
762 commentService := comments.NewCommentServiceWithPDSFactory(
763 nil,
764 nil,
765 nil,
766 nil,
767 nil,
768 commentPDSFactory,
769 )
770
771 // Create first user (comment owner)
772 ownerHandle := fmt.Sprintf("own%d.local.coves.dev", time.Now().UnixNano()%1000000)
773 ownerEmail := fmt.Sprintf("owner-%d@test.local", time.Now().Unix())
774 _, ownerDID, err := createPDSAccount(pdsURL, ownerHandle, ownerEmail, "password123")
775 if err != nil {
776 t.Skipf("PDS not available: %v", err)
777 }
778
779 // Create second user (attacker)
780 attackerHandle := fmt.Sprintf("atk%d.local.coves.dev", time.Now().UnixNano()%1000000)
781 attackerEmail := fmt.Sprintf("attacker-%d@test.local", time.Now().Unix())
782 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123")
783 if err != nil {
784 t.Skipf("PDS not available: %v", err)
785 }
786
787 // Setup OAuth for attacker
788 mockStore := NewMockOAuthStore()
789 mockStore.AddSessionWithPDS(attackerDID, "session-"+attackerDID, attackerToken, pdsURL)
790
791 parsedDID, _ := parseTestDID(attackerDID)
792 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+attackerDID)
793
794 // Try to delete comment owned by different user
795 t.Logf("\n🚨 Attempting to delete another user's comment...")
796 deleteReq := comments.DeleteCommentRequest{
797 URI: fmt.Sprintf("at://%s/social.coves.community.comment/test123", ownerDID),
798 }
799
800 err = commentService.DeleteComment(ctx, session, deleteReq)
801
802 // Verify authorization error
803 if err == nil {
804 t.Fatal("Expected authorization error, got nil")
805 }
806 if !errors.Is(err, comments.ErrNotAuthorized) {
807 t.Errorf("Expected ErrNotAuthorized, got: %v", err)
808 }
809
810 t.Logf("✅ AUTHORIZATION CHECK PASSED:")
811 t.Logf(" ✓ User cannot delete others' comments")
812}
813
814// Helper function to parse DID for testing
815func parseTestDID(did string) (syntax.DID, error) {
816 return syntax.ParseDID(did)
817}
818
819// TestCommentWrite_ConcurrentModificationDetection tests that PutRecord's swapRecord
820// CID validation correctly detects concurrent modifications.
821// This verifies the optimistic locking mechanism that prevents lost updates.
822func TestCommentWrite_ConcurrentModificationDetection(t *testing.T) {
823 if testing.Short() {
824 t.Skip("Skipping E2E test in short mode")
825 }
826
827 db := setupTestDB(t)
828 defer func() { _ = db.Close() }()
829
830 ctx := context.Background()
831 pdsURL := getTestPDSURL()
832
833 // Setup repositories and service
834 commentRepo := postgres.NewCommentRepository(db)
835
836 commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) {
837 if session.AccessToken == "" {
838 return nil, fmt.Errorf("session has no access token")
839 }
840 if session.HostURL == "" {
841 return nil, fmt.Errorf("session has no host URL")
842 }
843 return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken)
844 }
845
846 commentService := comments.NewCommentServiceWithPDSFactory(
847 commentRepo,
848 nil,
849 nil,
850 nil,
851 nil,
852 commentPDSFactory,
853 )
854
855 // Create test user
856 testUserHandle := fmt.Sprintf("cnc%d.local.coves.dev", time.Now().UnixNano()%1000000)
857 testUserEmail := fmt.Sprintf("concurrency-%d@test.local", time.Now().Unix())
858 testUserPassword := "test-password-123"
859
860 pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
861 if err != nil {
862 t.Skipf("PDS not available: %v", err)
863 }
864
865 // Setup OAuth
866 mockStore := NewMockOAuthStore()
867 mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL)
868
869 parsedDID, _ := parseTestDID(userDID)
870 session, _ := mockStore.GetSession(ctx, parsedDID, "session-"+userDID)
871
872 // Step 1: Create a comment
873 t.Logf("\n📝 Step 1: Creating initial comment...")
874 createReq := comments.CreateCommentRequest{
875 Reply: comments.ReplyRef{
876 Root: comments.StrongRef{
877 URI: "at://did:plc:test/social.coves.community.post/test123",
878 CID: "bafypost",
879 },
880 Parent: comments.StrongRef{
881 URI: "at://did:plc:test/social.coves.community.post/test123",
882 CID: "bafypost",
883 },
884 },
885 Content: "Original content for concurrency test",
886 Langs: []string{"en"},
887 }
888
889 createResp, err := commentService.CreateComment(ctx, session, createReq)
890 if err != nil {
891 t.Fatalf("Failed to create comment: %v", err)
892 }
893 t.Logf("✅ Comment created: URI=%s, CID=%s", createResp.URI, createResp.CID)
894 originalCID := createResp.CID
895
896 // Step 2: Update the comment (this changes the CID)
897 t.Logf("\n📝 Step 2: Updating comment (this changes CID)...")
898 updateReq := comments.UpdateCommentRequest{
899 URI: createResp.URI,
900 Content: "Updated content - CID has changed",
901 }
902
903 updateResp, err := commentService.UpdateComment(ctx, session, updateReq)
904 if err != nil {
905 t.Fatalf("Failed to update comment: %v", err)
906 }
907 t.Logf("✅ Comment updated: New CID=%s", updateResp.CID)
908 newCID := updateResp.CID
909
910 // Verify CIDs are different
911 if originalCID == newCID {
912 t.Fatalf("CIDs should be different after update: original=%s, new=%s", originalCID, newCID)
913 }
914
915 // Step 3: Simulate concurrent modification detection using direct PDS client
916 // Create a PDS client and attempt to update with the stale (original) CID
917 t.Logf("\n🔍 Step 3: Testing concurrent modification detection with stale CID...")
918
919 pdsClient, err := pds.NewFromAccessToken(pdsURL, userDID, pdsAccessToken)
920 if err != nil {
921 t.Fatalf("Failed to create PDS client: %v", err)
922 }
923
924 rkey := utils.ExtractRKeyFromURI(createResp.URI)
925
926 // Try to update with the ORIGINAL (now stale) CID - this should fail with 409
927 staleRecord := map[string]interface{}{
928 "$type": "social.coves.community.comment",
929 "reply": map[string]interface{}{
930 "root": map[string]interface{}{
931 "uri": "at://did:plc:test/social.coves.community.post/test123",
932 "cid": "bafypost",
933 },
934 "parent": map[string]interface{}{
935 "uri": "at://did:plc:test/social.coves.community.post/test123",
936 "cid": "bafypost",
937 },
938 },
939 "content": "This update should fail - using stale CID",
940 "createdAt": time.Now().UTC().Format(time.RFC3339),
941 }
942
943 _, _, err = pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, staleRecord, originalCID)
944
945 // Verify we get an error indicating CID mismatch
946 // PDS returns 400 "bad request" with message "Record was at <cid>" when swap CID doesn't match
947 if err == nil {
948 t.Fatal("Expected error when updating with stale CID, got nil")
949 }
950
951 // Check for either ErrConflict (409) or CID mismatch error (400)
952 errMsg := err.Error()
953 isCIDMismatch := strings.Contains(errMsg, "Record was at") || errors.Is(err, pds.ErrConflict)
954 if !isCIDMismatch {
955 t.Errorf("Expected CID mismatch or ErrConflict, got: %v", err)
956 }
957
958 t.Logf("✅ Correctly detected concurrent modification!")
959 t.Logf(" Error: %v", err)
960
961 // Step 4: Verify that updating with the correct CID succeeds
962 t.Logf("\n📝 Step 4: Verifying update with correct CID succeeds...")
963 correctRecord := map[string]interface{}{
964 "$type": "social.coves.community.comment",
965 "reply": map[string]interface{}{
966 "root": map[string]interface{}{
967 "uri": "at://did:plc:test/social.coves.community.post/test123",
968 "cid": "bafypost",
969 },
970 "parent": map[string]interface{}{
971 "uri": "at://did:plc:test/social.coves.community.post/test123",
972 "cid": "bafypost",
973 },
974 },
975 "content": "This update should succeed - using correct CID",
976 "createdAt": time.Now().UTC().Format(time.RFC3339),
977 }
978
979 _, finalCID, err := pdsClient.PutRecord(ctx, "social.coves.community.comment", rkey, correctRecord, newCID)
980 if err != nil {
981 t.Fatalf("Update with correct CID should succeed, got: %v", err)
982 }
983
984 t.Logf("✅ Update with correct CID succeeded: New CID=%s", finalCID)
985
986 t.Logf("\n✅ CONCURRENT MODIFICATION DETECTION TEST COMPLETE:")
987 t.Logf(" ✓ PutRecord with stale CID correctly returns ErrConflict")
988 t.Logf(" ✓ PutRecord with correct CID succeeds")
989 t.Logf(" ✓ Optimistic locking prevents lost updates")
990}