A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/atproto/identity"
6 "Coves/internal/atproto/jetstream"
7 "Coves/internal/core/communities"
8 "Coves/internal/core/posts"
9 "Coves/internal/core/users"
10 "Coves/internal/db/postgres"
11 "context"
12 "database/sql"
13 "errors"
14 "fmt"
15 "net"
16 "net/http"
17 "os"
18 "strings"
19 "testing"
20 "time"
21
22 "github.com/gorilla/websocket"
23 _ "github.com/lib/pq"
24 "github.com/pressly/goose/v3"
25
26 oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth"
27 "github.com/bluesky-social/indigo/atproto/syntax"
28)
29
30// TestPostDeletion_JetstreamConsumer tests that the Jetstream consumer
31// correctly handles post deletion events by soft-deleting posts in the AppView database.
32func TestPostDeletion_JetstreamConsumer(t *testing.T) {
33 db := setupTestDB(t)
34 defer func() {
35 if err := db.Close(); err != nil {
36 t.Logf("Failed to close database: %v", err)
37 }
38 }()
39
40 ctx := context.Background()
41
42 // Cleanup old test data
43 _, _ = db.Exec("DELETE FROM posts WHERE community_did = 'did:plc:deltest123'")
44 _, _ = db.Exec("DELETE FROM communities WHERE did = 'did:plc:deltest123'")
45 _, _ = db.Exec("DELETE FROM users WHERE did = 'did:plc:delauthor123'")
46
47 // Setup repositories
48 userRepo := postgres.NewUserRepository(db)
49 communityRepo := postgres.NewCommunityRepository(db)
50 postRepo := postgres.NewPostRepository(db)
51
52 // Setup user service for post consumer
53 identityConfig := identity.DefaultConfig()
54 identityResolver := identity.NewResolver(db, identityConfig)
55 userService := users.NewUserService(userRepo, identityResolver, "http://localhost:3001")
56
57 // Create test user (author)
58 author := createTestUser(t, db, "delauthor.test", "did:plc:delauthor123")
59
60 // Create test community
61 community := &communities.Community{
62 DID: "did:plc:deltest123",
63 Handle: "c-deltest.test.coves.social",
64 Name: "deltest",
65 DisplayName: "Delete Test Community",
66 OwnerDID: "did:plc:deltest123",
67 CreatedByDID: author.DID,
68 HostedByDID: "did:web:coves.test",
69 Visibility: "public",
70 ModerationType: "moderator",
71 RecordURI: "at://did:plc:deltest123/social.coves.community.profile/self",
72 RecordCID: "fakecid123",
73 PDSAccessToken: "fake_token_for_testing",
74 PDSRefreshToken: "fake_refresh_token",
75 }
76 _, err := communityRepo.Create(ctx, community)
77 if err != nil {
78 t.Fatalf("Failed to create test community: %v", err)
79 }
80
81 consumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
82
83 t.Run("Create then delete post via Jetstream", func(t *testing.T) {
84 rkey := generateTID()
85 title := "Post to be deleted"
86 content := "This post will be deleted"
87
88 // Step 1: Create the post via Jetstream event
89 createEvent := jetstream.JetstreamEvent{
90 Did: community.DID,
91 Kind: "commit",
92 Commit: &jetstream.CommitEvent{
93 Operation: "create",
94 Collection: "social.coves.community.post",
95 RKey: rkey,
96 CID: "bafy2bzacedeltest1",
97 Record: map[string]interface{}{
98 "$type": "social.coves.community.post",
99 "community": community.DID,
100 "author": author.DID,
101 "title": title,
102 "content": content,
103 "createdAt": time.Now().Format(time.RFC3339),
104 },
105 },
106 }
107
108 err := consumer.HandleEvent(ctx, &createEvent)
109 if err != nil {
110 t.Fatalf("Failed to create post: %v", err)
111 }
112
113 // Verify post was created
114 postURI := fmt.Sprintf("at://%s/social.coves.community.post/%s", community.DID, rkey)
115 createdPost, err := postRepo.GetByURI(ctx, postURI)
116 if err != nil {
117 t.Fatalf("Post not indexed after create: %v", err)
118 }
119 if createdPost.DeletedAt != nil {
120 t.Fatal("Post should not be deleted initially")
121 }
122
123 t.Logf("✓ Post created: %s", postURI)
124
125 // Step 2: Delete the post via Jetstream event
126 deleteEvent := jetstream.JetstreamEvent{
127 Did: community.DID,
128 Kind: "commit",
129 Commit: &jetstream.CommitEvent{
130 Operation: "delete",
131 Collection: "social.coves.community.post",
132 RKey: rkey,
133 },
134 }
135
136 err = consumer.HandleEvent(ctx, &deleteEvent)
137 if err != nil {
138 t.Fatalf("Failed to delete post: %v", err)
139 }
140
141 // Step 3: Verify post was soft-deleted
142 deletedPost, err := postRepo.GetByURI(ctx, postURI)
143 if err != nil {
144 t.Fatalf("Post should still exist after soft delete: %v", err)
145 }
146 if deletedPost.DeletedAt == nil {
147 t.Fatal("Post should have deleted_at set after delete")
148 }
149
150 t.Logf("✓ Post soft-deleted: deleted_at=%v", deletedPost.DeletedAt)
151 t.Log("✅ Delete flow complete: Create → Delete → Verify soft-deleted")
152 })
153
154 t.Run("Delete is idempotent", func(t *testing.T) {
155 rkey := generateTID()
156
157 // Create a post first
158 createEvent := jetstream.JetstreamEvent{
159 Did: community.DID,
160 Kind: "commit",
161 Commit: &jetstream.CommitEvent{
162 Operation: "create",
163 Collection: "social.coves.community.post",
164 RKey: rkey,
165 CID: "bafy2bzaceidempotentdel",
166 Record: map[string]interface{}{
167 "$type": "social.coves.community.post",
168 "community": community.DID,
169 "author": author.DID,
170 "title": "Idempotent delete test",
171 "content": "Testing idempotent deletion",
172 "createdAt": time.Now().Format(time.RFC3339),
173 },
174 },
175 }
176 err := consumer.HandleEvent(ctx, &createEvent)
177 if err != nil {
178 t.Fatalf("Failed to create post: %v", err)
179 }
180
181 // Delete once
182 deleteEvent := jetstream.JetstreamEvent{
183 Did: community.DID,
184 Kind: "commit",
185 Commit: &jetstream.CommitEvent{
186 Operation: "delete",
187 Collection: "social.coves.community.post",
188 RKey: rkey,
189 },
190 }
191 err = consumer.HandleEvent(ctx, &deleteEvent)
192 if err != nil {
193 t.Fatalf("First delete failed: %v", err)
194 }
195
196 // Delete again (should be idempotent)
197 err = consumer.HandleEvent(ctx, &deleteEvent)
198 if err != nil {
199 t.Fatalf("Second delete should be idempotent, got error: %v", err)
200 }
201
202 t.Log("✓ Delete is idempotent - second delete did not fail")
203 })
204
205 t.Run("Delete non-existent post is idempotent", func(t *testing.T) {
206 // Try to delete a post that was never created
207 deleteEvent := jetstream.JetstreamEvent{
208 Did: community.DID,
209 Kind: "commit",
210 Commit: &jetstream.CommitEvent{
211 Operation: "delete",
212 Collection: "social.coves.community.post",
213 RKey: "nonexistent123",
214 },
215 }
216
217 err := consumer.HandleEvent(ctx, &deleteEvent)
218 if err != nil {
219 t.Fatalf("Delete of non-existent post should be idempotent, got error: %v", err)
220 }
221
222 t.Log("✓ Delete of non-existent post is idempotent")
223 })
224}
225
226// TestPostDeletion_Authorization tests that only the post author can delete their posts
227func TestPostDeletion_Authorization(t *testing.T) {
228 if testing.Short() {
229 t.Skip("Skipping authorization test in short mode")
230 }
231
232 db := setupTestDB(t)
233 defer func() { _ = db.Close() }()
234
235 ctx := context.Background()
236 pdsURL := getTestPDSURL()
237
238 // Setup repositories
239 communityRepo := postgres.NewCommunityRepository(db)
240 postRepo := postgres.NewPostRepository(db)
241
242 // Create a mock community service for testing
243 communityService := communities.NewCommunityServiceWithPDSFactory(
244 communityRepo,
245 pdsURL,
246 "did:web:test",
247 "test.coves.social",
248 nil, // No provisioner needed for this test
249 nil, // No PDS factory
250 nil, // No blob service
251 )
252
253 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL)
254
255 // Create test user (attacker trying to delete another user's post)
256 attackerHandle := fmt.Sprintf("attacker%d.local.coves.dev", time.Now().UnixNano()%1000000)
257 attackerEmail := fmt.Sprintf("attacker-%d@test.local", time.Now().Unix())
258 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123")
259 if err != nil {
260 t.Skipf("PDS not available: %v", err)
261 }
262
263 // Setup OAuth session for attacker
264 parsedDID, err := syntax.ParseDID(attackerDID)
265 if err != nil {
266 t.Fatalf("Failed to parse attacker DID: %v", err)
267 }
268 attackerSession := &oauthlib.ClientSessionData{
269 AccountDID: parsedDID,
270 AccessToken: attackerToken,
271 HostURL: pdsURL,
272 }
273
274 // Create post URI belonging to a DIFFERENT user (the owner)
275 ownerDID := "did:plc:owner123"
276 postURI := fmt.Sprintf("at://%s/social.coves.community.post/test123", ownerDID)
277
278 t.Run("Non-author cannot delete post - URI contains wrong DID", func(t *testing.T) {
279 // The post URI contains ownerDID in the community position
280 // This should fail because attacker is not the community owner
281 // and wouldn't have credentials to delete from that repo
282
283 deleteReq := posts.DeletePostRequest{
284 URI: postURI,
285 }
286
287 err := postService.DeletePost(ctx, attackerSession, deleteReq)
288
289 // We expect an error - either NotAuthorized or CommunityNotFound
290 // since the community doesn't exist in our test DB
291 if err == nil {
292 t.Fatal("Expected error when non-author tries to delete post, got nil")
293 }
294
295 t.Logf("✓ Non-author blocked from deleting post: %v", err)
296 })
297
298 t.Run("Invalid URI format returns error", func(t *testing.T) {
299 deleteReq := posts.DeletePostRequest{
300 URI: "invalid-uri-format",
301 }
302
303 err := postService.DeletePost(ctx, attackerSession, deleteReq)
304
305 if err == nil {
306 t.Fatal("Expected error for invalid URI, got nil")
307 }
308
309 // Should be a validation error
310 if !posts.IsValidationError(err) {
311 t.Logf("Got error (expected validation): %v", err)
312 }
313
314 t.Logf("✓ Invalid URI rejected: %v", err)
315 })
316
317 t.Run("Empty URI returns error", func(t *testing.T) {
318 deleteReq := posts.DeletePostRequest{
319 URI: "",
320 }
321
322 err := postService.DeletePost(ctx, attackerSession, deleteReq)
323
324 if err == nil {
325 t.Fatal("Expected error for empty URI, got nil")
326 }
327
328 t.Logf("✓ Empty URI rejected: %v", err)
329 })
330
331 t.Run("Nil session returns error", func(t *testing.T) {
332 deleteReq := posts.DeletePostRequest{
333 URI: postURI,
334 }
335
336 err := postService.DeletePost(ctx, nil, deleteReq)
337
338 if err == nil {
339 t.Fatal("Expected error for nil session, got nil")
340 }
341
342 t.Logf("✓ Nil session rejected: %v", err)
343 })
344}
345
346// TestPostDeletion_ServiceAuthorization tests the author verification logic in the service layer
347// This test requires a live PDS to fully test the authorization flow
348func TestPostDeletion_ServiceAuthorization_LivePDS(t *testing.T) {
349 if testing.Short() {
350 t.Skip("Skipping live PDS test in short mode")
351 }
352
353 db := setupTestDB(t)
354 defer func() { _ = db.Close() }()
355
356 ctx := context.Background()
357 pdsURL := getTestPDSURL()
358
359 // Check if PDS is available
360 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
361 if err != nil {
362 t.Skipf("PDS not available: %v", err)
363 }
364 _ = healthResp.Body.Close()
365
366 // Setup repositories
367 communityRepo := postgres.NewCommunityRepository(db)
368 postRepo := postgres.NewPostRepository(db)
369
370 // Get instance credentials to determine correct domain
371 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
372 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
373 if instanceHandle == "" {
374 instanceHandle = "testuser123.local.coves.dev"
375 }
376 if instancePassword == "" {
377 instancePassword = "test-password-123"
378 }
379
380 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
381 if err != nil {
382 t.Skipf("Failed to authenticate with PDS: %v", err)
383 }
384
385 var instanceDomain string
386 if strings.HasPrefix(instanceDID, "did:web:") {
387 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
388 } else {
389 instanceDomain = "local.coves.dev"
390 }
391
392 // Create provisioner for community creation
393 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
394
395 communityService := communities.NewCommunityServiceWithPDSFactory(
396 communityRepo,
397 pdsURL,
398 instanceDID,
399 instanceDomain,
400 provisioner,
401 nil,
402 nil,
403 )
404
405 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL)
406
407 // Create two test users
408 ownerHandle := fmt.Sprintf("postowner%d.local.coves.dev", time.Now().UnixNano()%1000000)
409 ownerEmail := fmt.Sprintf("postowner-%d@test.local", time.Now().Unix())
410 _, ownerDID, err := createPDSAccount(pdsURL, ownerHandle, ownerEmail, "password123")
411 if err != nil {
412 t.Skipf("Failed to create owner account: %v", err)
413 }
414 owner := createTestUser(t, db, ownerHandle, ownerDID)
415
416 attackerHandle := fmt.Sprintf("postattacker%d.local.coves.dev", time.Now().UnixNano()%1000000)
417 attackerEmail := fmt.Sprintf("postattacker-%d@test.local", time.Now().Unix())
418 attackerToken, attackerDID, err := createPDSAccount(pdsURL, attackerHandle, attackerEmail, "password123")
419 if err != nil {
420 t.Skipf("Failed to create attacker account: %v", err)
421 }
422 _ = createTestUser(t, db, attackerHandle, attackerDID)
423
424 // Setup attacker session
425 parsedAttackerDID, _ := syntax.ParseDID(attackerDID)
426 attackerSession := &oauthlib.ClientSessionData{
427 AccountDID: parsedAttackerDID,
428 AccessToken: attackerToken,
429 HostURL: pdsURL,
430 }
431
432 // Create a test community
433 communityName := fmt.Sprintf("delauth%d", time.Now().UnixNano()%1000000)
434 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{
435 Name: communityName,
436 DisplayName: "Delete Auth Test Community",
437 Description: "Testing post deletion authorization",
438 CreatedByDID: owner.DID,
439 Visibility: "public",
440 })
441 if err != nil {
442 t.Fatalf("Failed to create community: %v", err)
443 }
444
445 t.Logf("✓ Community created: %s (%s)", community.Name, community.DID)
446
447 // Create a post as the owner
448 title := "Owner's Post"
449 content := "This post belongs to the owner"
450 createResp, err := postService.CreatePost(
451 middleware.SetTestUserDID(ctx, owner.DID),
452 posts.CreatePostRequest{
453 Community: community.DID,
454 Title: &title,
455 Content: &content,
456 AuthorDID: owner.DID,
457 },
458 )
459 if err != nil {
460 t.Fatalf("Failed to create post: %v", err)
461 }
462
463 t.Logf("✓ Post created by owner: %s", createResp.URI)
464
465 t.Run("Attacker cannot delete owner's post", func(t *testing.T) {
466 deleteReq := posts.DeletePostRequest{
467 URI: createResp.URI,
468 }
469
470 err := postService.DeletePost(ctx, attackerSession, deleteReq)
471
472 if err == nil {
473 t.Fatal("Expected ErrNotAuthorized when attacker tries to delete owner's post")
474 }
475
476 if !errors.Is(err, posts.ErrNotAuthorized) {
477 t.Errorf("Expected ErrNotAuthorized, got: %v", err)
478 }
479
480 t.Logf("✅ Authorization check passed: attacker blocked with %v", err)
481 })
482}
483
484// TestPostE2E_DeleteWithJetstream tests post deletion with real PDS and Jetstream
485// This is a TRUE E2E test that follows the complete flow:
486// 1. Create real community on PDS
487// 2. Create real post on PDS
488// 3. Subscribe to Jetstream
489// 4. Delete post via service (which deletes from PDS)
490// 5. Receive delete event from Jetstream
491// 6. Verify post is soft-deleted in AppView DB
492func TestPostE2E_DeleteWithJetstream(t *testing.T) {
493 if testing.Short() {
494 t.Skip("Skipping E2E test in short mode")
495 }
496
497 // Setup test database
498 dbURL := os.Getenv("TEST_DATABASE_URL")
499 if dbURL == "" {
500 dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable"
501 }
502
503 db, err := sql.Open("postgres", dbURL)
504 if err != nil {
505 t.Fatalf("Failed to connect to test database: %v", err)
506 }
507 defer func() { _ = db.Close() }()
508
509 if dialectErr := goose.SetDialect("postgres"); dialectErr != nil {
510 t.Fatalf("Failed to set goose dialect: %v", dialectErr)
511 }
512 if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil {
513 t.Fatalf("Failed to run migrations: %v", migrateErr)
514 }
515
516 pdsURL := getTestPDSURL()
517 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
518 if err != nil {
519 t.Skipf("PDS not running at %s: %v", pdsURL, err)
520 }
521 _ = healthResp.Body.Close()
522
523 // Check Jetstream is available
524 pdsHostname := strings.TrimPrefix(pdsURL, "http://")
525 pdsHostname = strings.TrimPrefix(pdsHostname, "https://")
526 pdsHostname = strings.Split(pdsHostname, ":")[0]
527 jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.post", pdsHostname)
528
529 testConn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
530 if err != nil {
531 t.Skipf("Jetstream not running at %s: %v", jetstreamURL, err)
532 }
533 _ = testConn.Close()
534
535 ctx := context.Background()
536
537 // Setup repositories
538 userRepo := postgres.NewUserRepository(db)
539 communityRepo := postgres.NewCommunityRepository(db)
540 postRepo := postgres.NewPostRepository(db)
541
542 // Setup identity resolver for user service
543 identityConfig := identity.DefaultConfig()
544 plcURL := os.Getenv("PLC_DIRECTORY_URL")
545 if plcURL == "" {
546 plcURL = "http://localhost:3002"
547 }
548 identityConfig.PLCURL = plcURL
549 identityResolver := identity.NewResolver(db, identityConfig)
550 userService := users.NewUserService(userRepo, identityResolver, pdsURL)
551
552 // Setup community service with provisioner for real PDS
553 var instanceDomain string
554 instanceHandle := os.Getenv("PDS_INSTANCE_HANDLE")
555 instancePassword := os.Getenv("PDS_INSTANCE_PASSWORD")
556 if instanceHandle == "" {
557 instanceHandle = "testuser123.local.coves.dev"
558 }
559 if instancePassword == "" {
560 instancePassword = "test-password-123"
561 }
562
563 _, instanceDID, err := authenticateWithPDS(pdsURL, instanceHandle, instancePassword)
564 if err != nil {
565 t.Skipf("Failed to authenticate with PDS: %v", err)
566 }
567
568 if strings.HasPrefix(instanceDID, "did:web:") {
569 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
570 } else {
571 instanceDomain = "coves.social"
572 }
573
574 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, pdsURL)
575 communityService := communities.NewCommunityServiceWithPDSFactory(
576 communityRepo,
577 pdsURL,
578 instanceDID,
579 instanceDomain,
580 provisioner,
581 nil,
582 nil,
583 )
584
585 postService := posts.NewPostService(postRepo, communityService, nil, nil, nil, nil, pdsURL)
586
587 // Create test user
588 testID := fmt.Sprintf("%d", time.Now().UnixNano()%1000000)
589 testUserHandle := fmt.Sprintf("postdel%s.local.coves.dev", testID)
590 testUserEmail := fmt.Sprintf("postdel%s@test.local", testID)
591 testUserPassword := "test-password-123"
592
593 _, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword)
594 if err != nil {
595 t.Fatalf("Failed to create test user on PDS: %v", err)
596 }
597 testUser := createTestUser(t, db, testUserHandle, userDID)
598
599 // Create test community on real PDS
600 communityName := fmt.Sprintf("postdel%s", testID)
601 t.Logf("\n📝 Creating community on PDS: %s", communityName)
602
603 community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{
604 Name: communityName,
605 DisplayName: "Post Delete E2E Test Community",
606 Description: "Testing post deletion E2E flow",
607 CreatedByDID: testUser.DID,
608 Visibility: "public",
609 })
610 if err != nil {
611 t.Fatalf("Failed to create community: %v", err)
612 }
613
614 t.Logf("✅ Community created: %s (%s)", community.Name, community.DID)
615
616 // Setup Jetstream consumer
617 postConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
618
619 t.Run("delete post with real Jetstream indexing", func(t *testing.T) {
620 // Create post via service (writes to real PDS)
621 createEventChan := make(chan *jetstream.JetstreamEvent, 10)
622 createDone := make(chan bool)
623
624 go func() {
625 subscribeErr := subscribeToJetstreamForPostCreate(ctx, jetstreamURL, community.DID, postConsumer, createEventChan, createDone)
626 if subscribeErr != nil {
627 t.Logf("Create subscription error: %v", subscribeErr)
628 }
629 }()
630
631 time.Sleep(500 * time.Millisecond)
632
633 title := "Post to delete via E2E"
634 content := "This post will be deleted and we'll verify via Jetstream"
635 t.Logf("\n📝 Creating post on PDS...")
636
637 createResp, err := postService.CreatePost(
638 middleware.SetTestUserDID(ctx, testUser.DID),
639 posts.CreatePostRequest{
640 Community: community.DID,
641 Title: &title,
642 Content: &content,
643 AuthorDID: testUser.DID,
644 },
645 )
646 if err != nil {
647 t.Fatalf("Failed to create post: %v", err)
648 }
649
650 t.Logf("✅ Post created: %s", createResp.URI)
651
652 // Wait for create event from Jetstream
653 select {
654 case <-createEventChan:
655 t.Logf("✅ Create event received from Jetstream")
656 case <-time.After(30 * time.Second):
657 t.Fatalf("Timeout waiting for create event")
658 }
659 close(createDone)
660
661 // Verify post exists in AppView
662 createdPost, err := postRepo.GetByURI(ctx, createResp.URI)
663 if err != nil {
664 t.Fatalf("Post should exist after create: %v", err)
665 }
666 if createdPost.DeletedAt != nil {
667 t.Fatal("Post should not be deleted initially")
668 }
669
670 // Now delete the post
671 t.Logf("\n🗑️ Deleting post via service...")
672
673 deleteEventChan := make(chan *jetstream.JetstreamEvent, 10)
674 deleteDone := make(chan bool)
675
676 go func() {
677 subscribeErr := subscribeToJetstreamForPostDelete(ctx, jetstreamURL, community.DID, postConsumer, deleteEventChan, deleteDone)
678 if subscribeErr != nil {
679 t.Logf("Delete subscription error: %v", subscribeErr)
680 }
681 }()
682
683 time.Sleep(500 * time.Millisecond)
684
685 // Create OAuth session for the post author
686 parsedDID, _ := syntax.ParseDID(testUser.DID)
687 // Get fresh token for the user
688 freshToken, _, err := authenticateWithPDS(pdsURL, testUserHandle, testUserPassword)
689 if err != nil {
690 t.Fatalf("Failed to get fresh token: %v", err)
691 }
692 session := &oauthlib.ClientSessionData{
693 AccountDID: parsedDID,
694 AccessToken: freshToken,
695 HostURL: pdsURL,
696 }
697
698 err = postService.DeletePost(ctx, session, posts.DeletePostRequest{URI: createResp.URI})
699 if err != nil {
700 t.Fatalf("Failed to delete post: %v", err)
701 }
702
703 t.Logf("✅ Post delete request sent to PDS")
704
705 // Wait for delete event from Jetstream
706 t.Logf("\n⏳ Waiting for delete event from Jetstream...")
707
708 select {
709 case event := <-deleteEventChan:
710 t.Logf("✅ Received delete event from Jetstream!")
711 t.Logf(" Operation: %s", event.Commit.Operation)
712
713 if event.Commit.Operation != "delete" {
714 t.Errorf("Expected operation 'delete', got '%s'", event.Commit.Operation)
715 }
716
717 // Verify post is soft-deleted in AppView
718 deletedPost, err := postRepo.GetByURI(ctx, createResp.URI)
719 if err != nil {
720 t.Fatalf("Failed to get deleted post: %v", err)
721 }
722
723 if deletedPost.DeletedAt == nil {
724 t.Errorf("Expected post to be soft-deleted (deleted_at should be set)")
725 } else {
726 t.Logf("✅ Post soft-deleted in AppView at: %v", *deletedPost.DeletedAt)
727 }
728
729 close(deleteDone)
730
731 case <-time.After(30 * time.Second):
732 t.Fatalf("Timeout: No delete event received within 30 seconds")
733 }
734
735 t.Logf("\n✅ TRUE E2E POST DELETE FLOW COMPLETE:")
736 t.Logf(" Client → Service → PDS DeleteRecord → Jetstream → Consumer → AppView ✓")
737 })
738}
739
740// subscribeToJetstreamForPostCreate subscribes for post create events
741func subscribeToJetstreamForPostCreate(
742 ctx context.Context,
743 jetstreamURL string,
744 targetDID string,
745 consumer *jetstream.PostEventConsumer,
746 eventChan chan<- *jetstream.JetstreamEvent,
747 done <-chan bool,
748) error {
749 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
750 if err != nil {
751 return fmt.Errorf("failed to connect to Jetstream: %w", err)
752 }
753 defer func() { _ = conn.Close() }()
754
755 for {
756 select {
757 case <-done:
758 return nil
759 case <-ctx.Done():
760 return ctx.Err()
761 default:
762 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
763 return fmt.Errorf("failed to set read deadline: %w", err)
764 }
765
766 var event jetstream.JetstreamEvent
767 err := conn.ReadJSON(&event)
768 if err != nil {
769 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
770 return nil
771 }
772 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
773 continue
774 }
775 return fmt.Errorf("failed to read Jetstream message: %w", err)
776 }
777
778 if event.Did == targetDID && event.Kind == "commit" &&
779 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" &&
780 event.Commit.Operation == "create" {
781
782 if err := consumer.HandleEvent(ctx, &event); err != nil {
783 return fmt.Errorf("failed to process event: %w", err)
784 }
785
786 select {
787 case eventChan <- &event:
788 return nil
789 case <-time.After(1 * time.Second):
790 return fmt.Errorf("timeout sending event to channel")
791 }
792 }
793 }
794 }
795}
796
797// subscribeToJetstreamForPostDelete subscribes for post delete events
798func subscribeToJetstreamForPostDelete(
799 ctx context.Context,
800 jetstreamURL string,
801 targetDID string,
802 consumer *jetstream.PostEventConsumer,
803 eventChan chan<- *jetstream.JetstreamEvent,
804 done <-chan bool,
805) error {
806 conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil)
807 if err != nil {
808 return fmt.Errorf("failed to connect to Jetstream: %w", err)
809 }
810 defer func() { _ = conn.Close() }()
811
812 for {
813 select {
814 case <-done:
815 return nil
816 case <-ctx.Done():
817 return ctx.Err()
818 default:
819 if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
820 return fmt.Errorf("failed to set read deadline: %w", err)
821 }
822
823 var event jetstream.JetstreamEvent
824 err := conn.ReadJSON(&event)
825 if err != nil {
826 if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
827 return nil
828 }
829 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
830 continue
831 }
832 return fmt.Errorf("failed to read Jetstream message: %w", err)
833 }
834
835 if event.Did == targetDID && event.Kind == "commit" &&
836 event.Commit != nil && event.Commit.Collection == "social.coves.community.post" &&
837 event.Commit.Operation == "delete" {
838
839 if err := consumer.HandleEvent(ctx, &event); err != nil {
840 return fmt.Errorf("failed to process event: %w", err)
841 }
842
843 select {
844 case eventChan <- &event:
845 return nil
846 case <-time.After(1 * time.Second):
847 return fmt.Errorf("timeout sending event to channel")
848 }
849 }
850 }
851 }
852}