A community based topic aggregation platform built on atproto
1package integration
2
3import (
4 "Coves/internal/api/routes"
5 "Coves/internal/atproto/jetstream"
6 "Coves/internal/atproto/utils"
7 "Coves/internal/core/userblocks"
8 "Coves/internal/db/postgres"
9 "bytes"
10 "context"
11 "encoding/json"
12 "fmt"
13 "io"
14 "net/http"
15 "net/http/httptest"
16 "testing"
17 "time"
18
19 "github.com/go-chi/chi/v5"
20 _ "github.com/lib/pq"
21)
22
23// TestUserBlockE2E_BlockAndUnblock tests the full user block lifecycle with a real PDS.
24// Flow: Client -> XRPC -> PDS Write -> Verify on PDS -> Jetstream -> Consumer -> AppView
25// Then: Client -> XRPC Unblock -> PDS Delete -> Jetstream -> Consumer -> AppView removal
26func TestUserBlockE2E_BlockAndUnblock(t *testing.T) {
27 if testing.Short() {
28 t.Skip("Skipping E2E test in short mode")
29 }
30
31 db := setupTestDB(t)
32 defer func() { _ = db.Close() }()
33
34 ctx := context.Background()
35 pdsURL := getTestPDSURL()
36
37 // Health check PDS
38 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
39 if err != nil {
40 t.Skipf("PDS not running at %s: %v", pdsURL, err)
41 }
42 func() {
43 if closeErr := healthResp.Body.Close(); closeErr != nil {
44 t.Logf("Failed to close health response: %v", closeErr)
45 }
46 }()
47
48 // Clean up user_blocks at the end to avoid polluting other tests
49 defer func() {
50 _, _ = db.Exec("DELETE FROM user_blocks")
51 }()
52
53 // Setup repository
54 blockRepo := postgres.NewUserBlockRepository(db)
55
56 // Create User A (blocker) on real PDS
57 userAHandle := fmt.Sprintf("blka%d.local.coves.dev", time.Now().UnixNano()%1000000)
58 userAEmail := fmt.Sprintf("blockerA-%d@test.local", time.Now().Unix())
59 userAPassword := "test-password-123"
60
61 t.Logf("Creating User A (blocker) on PDS: %s", userAHandle)
62 pdsAccessTokenA, userADID, err := createPDSAccount(pdsURL, userAHandle, userAEmail, userAPassword)
63 if err != nil {
64 t.Fatalf("Failed to create User A on PDS: %v", err)
65 }
66 t.Logf("User A created: DID=%s", userADID)
67
68 // Create User B (target) on real PDS
69 userBHandle := fmt.Sprintf("blkb%d.local.coves.dev", time.Now().UnixNano()%1000000)
70 userBEmail := fmt.Sprintf("blockerB-%d@test.local", time.Now().Unix())
71 userBPassword := "test-password-123"
72
73 t.Logf("Creating User B (target) on PDS: %s", userBHandle)
74 _, userBDID, err := createPDSAccount(pdsURL, userBHandle, userBEmail, userBPassword)
75 if err != nil {
76 t.Fatalf("Failed to create User B on PDS: %v", err)
77 }
78 t.Logf("User B created: DID=%s", userBDID)
79
80 // Index both users in AppView
81 createTestUser(t, db, userAHandle, userADID)
82 createTestUser(t, db, userBHandle, userBDID)
83
84 // Setup OAuth middleware with User A's real PDS access token
85 e2eAuth := NewE2EOAuthMiddleware()
86 tokenA := e2eAuth.AddUserWithPDSToken(userADID, pdsAccessTokenA, pdsURL)
87
88 // Setup service with password-based PDS client factory
89 service := userblocks.NewServiceWithPDSFactory(blockRepo, nil, UserBlockPasswordAuthPDSClientFactory())
90
91 // Setup HTTP server with XRPC routes
92 r := chi.NewRouter()
93 routes.RegisterUserBlockRoutes(r, service, e2eAuth.OAuthAuthMiddleware)
94 httpServer := httptest.NewServer(r)
95 defer httpServer.Close()
96
97 // Setup Jetstream consumer with block repo
98 userConsumer := jetstream.NewUserEventConsumer(nil, nil, "", "",
99 jetstream.WithUserBlockRepo(blockRepo))
100
101 // ====================================================================================
102 // BLOCK PHASE: User A blocks User B
103 // ====================================================================================
104 t.Logf("\n--- BLOCK PHASE: User A blocks User B ---")
105
106 blockReq := map[string]interface{}{
107 "subject": userBDID,
108 }
109
110 reqBody, marshalErr := json.Marshal(blockReq)
111 if marshalErr != nil {
112 t.Fatalf("Failed to marshal block request: %v", marshalErr)
113 }
114
115 req, err := http.NewRequest(http.MethodPost,
116 httpServer.URL+"/xrpc/social.coves.actor.blockUser",
117 bytes.NewBuffer(reqBody))
118 if err != nil {
119 t.Fatalf("Failed to create block request: %v", err)
120 }
121 req.Header.Set("Content-Type", "application/json")
122 req.Header.Set("Authorization", "Bearer "+tokenA)
123
124 resp, err := http.DefaultClient.Do(req)
125 if err != nil {
126 t.Fatalf("Failed to POST block: %v", err)
127 }
128 defer func() { _ = resp.Body.Close() }()
129
130 if resp.StatusCode != http.StatusOK {
131 body, readErr := io.ReadAll(resp.Body)
132 if readErr != nil {
133 t.Fatalf("Expected 200, got %d (failed to read body: %v)", resp.StatusCode, readErr)
134 }
135 t.Logf("XRPC Block Failed")
136 t.Logf(" Status: %d", resp.StatusCode)
137 t.Logf(" Response: %s", string(body))
138 t.Fatalf("Expected 200, got %d: %s", resp.StatusCode, string(body))
139 }
140
141 var blockResp struct {
142 Block struct {
143 RecordURI string `json:"recordUri"`
144 RecordCID string `json:"recordCid"`
145 } `json:"block"`
146 }
147
148 if decodeErr := json.NewDecoder(resp.Body).Decode(&blockResp); decodeErr != nil {
149 t.Fatalf("Failed to decode block response: %v", decodeErr)
150 }
151
152 t.Logf("XRPC block response received:")
153 t.Logf(" RecordURI: %s", blockResp.Block.RecordURI)
154 t.Logf(" RecordCID: %s", blockResp.Block.RecordCID)
155
156 if blockResp.Block.RecordURI == "" {
157 t.Fatal("Block response missing recordUri")
158 }
159 if blockResp.Block.RecordCID == "" {
160 t.Fatal("Block response missing recordCid")
161 }
162
163 // Verify block record was written to PDS
164 t.Logf("\nVerifying block record on PDS...")
165 rkey := utils.ExtractRKeyFromURI(blockResp.Block.RecordURI)
166 collection := "social.coves.actor.block"
167
168 pdsResp, pdsErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
169 pdsURL, userADID, collection, rkey))
170 if pdsErr != nil {
171 t.Fatalf("Failed to fetch block record from PDS: %v", pdsErr)
172 }
173 defer func() {
174 if closeErr := pdsResp.Body.Close(); closeErr != nil {
175 t.Logf("Failed to close PDS response: %v", closeErr)
176 }
177 }()
178
179 if pdsResp.StatusCode != http.StatusOK {
180 body, _ := io.ReadAll(pdsResp.Body)
181 t.Fatalf("Block record not found on PDS: status %d, body: %s", pdsResp.StatusCode, string(body))
182 }
183
184 var pdsRecord struct {
185 Value map[string]interface{} `json:"value"`
186 CID string `json:"cid"`
187 }
188 if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil {
189 t.Fatalf("Failed to decode PDS record: %v", decodeErr)
190 }
191
192 t.Logf("Block record found on PDS:")
193 t.Logf(" CID: %s", pdsRecord.CID)
194 t.Logf(" Subject: %v", pdsRecord.Value["subject"])
195
196 // Verify subject DID in PDS record
197 if pdsRecord.Value["subject"] != userBDID {
198 t.Errorf("Expected subject '%s', got %v", userBDID, pdsRecord.Value["subject"])
199 }
200
201 // Simulate Jetstream CREATE event
202 t.Logf("\nSimulating Jetstream CREATE event for block...")
203 createEvent := jetstream.JetstreamEvent{
204 Did: userADID,
205 TimeUS: time.Now().UnixMicro(),
206 Kind: "commit",
207 Commit: &jetstream.CommitEvent{
208 Rev: "test-block-rev-create",
209 Operation: "create",
210 Collection: "social.coves.actor.block",
211 RKey: rkey,
212 CID: pdsRecord.CID,
213 Record: map[string]interface{}{
214 "$type": "social.coves.actor.block",
215 "subject": userBDID,
216 "createdAt": time.Now().Format(time.RFC3339),
217 },
218 },
219 }
220
221 if handleErr := userConsumer.HandleEvent(ctx, &createEvent); handleErr != nil {
222 t.Fatalf("Failed to handle block create event: %v", handleErr)
223 }
224
225 // Verify block indexed in AppView via repo.GetBlock()
226 t.Logf("\nVerifying block indexed in AppView...")
227 indexedBlock, err := blockRepo.GetBlock(ctx, userADID, userBDID)
228 if err != nil {
229 t.Fatalf("Block not indexed in AppView: %v", err)
230 }
231
232 t.Logf("Block indexed in AppView:")
233 t.Logf(" BlockerDID: %s", indexedBlock.BlockerDID)
234 t.Logf(" BlockedDID: %s", indexedBlock.BlockedDID)
235 t.Logf(" RecordURI: %s", indexedBlock.RecordURI)
236 t.Logf(" RecordCID: %s", indexedBlock.RecordCID)
237
238 if indexedBlock.BlockerDID != userADID {
239 t.Errorf("Expected blocker_did %s, got %s", userADID, indexedBlock.BlockerDID)
240 }
241 if indexedBlock.BlockedDID != userBDID {
242 t.Errorf("Expected blocked_did %s, got %s", userBDID, indexedBlock.BlockedDID)
243 }
244
245 // Verify via IsBlocked
246 isBlocked, err := blockRepo.IsBlocked(ctx, userADID, userBDID)
247 if err != nil {
248 t.Fatalf("Failed to check IsBlocked: %v", err)
249 }
250 if !isBlocked {
251 t.Error("Expected IsBlocked to return true, got false")
252 }
253
254 t.Logf("BLOCK PHASE COMPLETE:")
255 t.Logf(" Client -> XRPC -> PDS Write -> Jetstream -> Consumer -> AppView")
256 t.Logf(" Block written to PDS")
257 t.Logf(" Block indexed in AppView")
258 t.Logf(" IsBlocked confirmed")
259
260 // ====================================================================================
261 // UNBLOCK PHASE: User A unblocks User B
262 // ====================================================================================
263 t.Logf("\n--- UNBLOCK PHASE: User A unblocks User B ---")
264
265 unblockReq := map[string]interface{}{
266 "subject": userBDID,
267 }
268
269 unblockBody, marshalErr := json.Marshal(unblockReq)
270 if marshalErr != nil {
271 t.Fatalf("Failed to marshal unblock request: %v", marshalErr)
272 }
273
274 unblockHTTPReq, err := http.NewRequest(http.MethodPost,
275 httpServer.URL+"/xrpc/social.coves.actor.unblockUser",
276 bytes.NewBuffer(unblockBody))
277 if err != nil {
278 t.Fatalf("Failed to create unblock request: %v", err)
279 }
280 unblockHTTPReq.Header.Set("Content-Type", "application/json")
281 unblockHTTPReq.Header.Set("Authorization", "Bearer "+tokenA)
282
283 unblockResp, err := http.DefaultClient.Do(unblockHTTPReq)
284 if err != nil {
285 t.Fatalf("Failed to POST unblock: %v", err)
286 }
287 defer func() {
288 if closeErr := unblockResp.Body.Close(); closeErr != nil {
289 t.Logf("Failed to close unblock response: %v", closeErr)
290 }
291 }()
292
293 if unblockResp.StatusCode != http.StatusOK {
294 body, _ := io.ReadAll(unblockResp.Body)
295 t.Fatalf("Unblock failed: status %d, body: %s", unblockResp.StatusCode, string(body))
296 }
297
298 t.Logf("Unblock XRPC request succeeded")
299
300 // Verify record deleted from PDS (expect error/404)
301 t.Logf("\nVerifying block record deleted from PDS...")
302 pdsDeleteCheck, pdsDeleteErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s",
303 pdsURL, userADID, collection, rkey))
304 if pdsDeleteErr != nil {
305 t.Fatalf("Failed to check PDS record after unblock: %v", pdsDeleteErr)
306 }
307 defer func() {
308 if closeErr := pdsDeleteCheck.Body.Close(); closeErr != nil {
309 t.Logf("Failed to close PDS delete check response: %v", closeErr)
310 }
311 }()
312
313 switch pdsDeleteCheck.StatusCode {
314 case http.StatusOK:
315 t.Error("Expected block record to be deleted from PDS, but it still exists")
316 case http.StatusBadRequest, http.StatusNotFound:
317 // Expected: PDS returns 400 (RecordNotFound) or 404 for deleted records
318 t.Logf("Block record confirmed deleted from PDS (status: %d)", pdsDeleteCheck.StatusCode)
319 default:
320 body, _ := io.ReadAll(pdsDeleteCheck.Body)
321 t.Errorf("Unexpected status when checking deleted record: %d, body: %s", pdsDeleteCheck.StatusCode, string(body))
322 }
323
324 // Simulate Jetstream DELETE event
325 t.Logf("\nSimulating Jetstream DELETE event for unblock...")
326 deleteEvent := jetstream.JetstreamEvent{
327 Did: userADID,
328 TimeUS: time.Now().UnixMicro(),
329 Kind: "commit",
330 Commit: &jetstream.CommitEvent{
331 Rev: "test-block-rev-delete",
332 Operation: "delete",
333 Collection: "social.coves.actor.block",
334 RKey: rkey,
335 },
336 }
337
338 if handleErr := userConsumer.HandleEvent(ctx, &deleteEvent); handleErr != nil {
339 t.Fatalf("Failed to handle block delete event: %v", handleErr)
340 }
341
342 // Verify block removed from AppView
343 t.Logf("\nVerifying block removed from AppView...")
344 _, err = blockRepo.GetBlock(ctx, userADID, userBDID)
345 if err == nil {
346 t.Error("Expected block to be deleted from AppView, but it still exists")
347 }
348
349 // Verify via IsBlocked
350 isBlockedAfter, err := blockRepo.IsBlocked(ctx, userADID, userBDID)
351 if err != nil {
352 t.Fatalf("Failed to check IsBlocked after unblock: %v", err)
353 }
354 if isBlockedAfter {
355 t.Error("Expected IsBlocked to return false after unblock, got true")
356 }
357
358 t.Logf("UNBLOCK PHASE COMPLETE:")
359 t.Logf(" Unblock request sent via XRPC")
360 t.Logf(" Block record deleted from PDS")
361 t.Logf(" Block removed from AppView")
362 t.Logf(" IsBlocked confirmed false")
363
364 t.Logf("\nTRUE E2E BLOCK/UNBLOCK FLOW COMPLETE:")
365 t.Logf(" Client -> XRPC -> PDS Write -> Jetstream -> Consumer -> AppView")
366 t.Logf(" Client -> XRPC Unblock -> PDS Delete -> Jetstream -> Consumer -> AppView removal")
367}
368
369// TestUserBlockE2E_SelfBlockPrevented tests that a user cannot block themselves.
370// This validates the self-block guard in the service layer with a real PDS.
371func TestUserBlockE2E_SelfBlockPrevented(t *testing.T) {
372 if testing.Short() {
373 t.Skip("Skipping E2E test in short mode")
374 }
375
376 db := setupTestDB(t)
377 defer func() { _ = db.Close() }()
378
379 pdsURL := getTestPDSURL()
380
381 // Health check PDS
382 healthResp, err := http.Get(pdsURL + "/xrpc/_health")
383 if err != nil {
384 t.Skipf("PDS not running at %s: %v", pdsURL, err)
385 }
386 func() {
387 if closeErr := healthResp.Body.Close(); closeErr != nil {
388 t.Logf("Failed to close health response: %v", closeErr)
389 }
390 }()
391
392 // Clean up user_blocks at the end
393 defer func() {
394 _, _ = db.Exec("DELETE FROM user_blocks")
395 }()
396
397 // Setup repository
398 blockRepo := postgres.NewUserBlockRepository(db)
399
400 // Create User A on real PDS
401 userAHandle := fmt.Sprintf("self%d.local.coves.dev", time.Now().UnixNano()%1000000)
402 userAEmail := fmt.Sprintf("selfblock-%d@test.local", time.Now().Unix())
403 userAPassword := "test-password-123"
404
405 t.Logf("Creating User A on PDS: %s", userAHandle)
406 pdsAccessTokenA, userADID, err := createPDSAccount(pdsURL, userAHandle, userAEmail, userAPassword)
407 if err != nil {
408 t.Fatalf("Failed to create User A on PDS: %v", err)
409 }
410 t.Logf("User A created: DID=%s", userADID)
411
412 // Index user in AppView
413 createTestUser(t, db, userAHandle, userADID)
414
415 // Setup OAuth middleware with User A's real PDS access token
416 e2eAuth := NewE2EOAuthMiddleware()
417 tokenA := e2eAuth.AddUserWithPDSToken(userADID, pdsAccessTokenA, pdsURL)
418
419 // Setup service with password-based PDS client factory
420 service := userblocks.NewServiceWithPDSFactory(blockRepo, nil, UserBlockPasswordAuthPDSClientFactory())
421
422 // Setup HTTP server
423 r := chi.NewRouter()
424 routes.RegisterUserBlockRoutes(r, service, e2eAuth.OAuthAuthMiddleware)
425 httpServer := httptest.NewServer(r)
426 defer httpServer.Close()
427
428 // Attempt to self-block
429 t.Logf("\nAttempting to block self (should fail)...")
430
431 selfBlockReq := map[string]interface{}{
432 "subject": userADID,
433 }
434
435 reqBody, marshalErr := json.Marshal(selfBlockReq)
436 if marshalErr != nil {
437 t.Fatalf("Failed to marshal self-block request: %v", marshalErr)
438 }
439
440 req, err := http.NewRequest(http.MethodPost,
441 httpServer.URL+"/xrpc/social.coves.actor.blockUser",
442 bytes.NewBuffer(reqBody))
443 if err != nil {
444 t.Fatalf("Failed to create self-block request: %v", err)
445 }
446 req.Header.Set("Content-Type", "application/json")
447 req.Header.Set("Authorization", "Bearer "+tokenA)
448
449 resp, err := http.DefaultClient.Do(req)
450 if err != nil {
451 t.Fatalf("Failed to POST self-block: %v", err)
452 }
453 defer func() { _ = resp.Body.Close() }()
454
455 // Should return 400 Bad Request
456 if resp.StatusCode != http.StatusBadRequest {
457 body, _ := io.ReadAll(resp.Body)
458 t.Fatalf("Expected 400 for self-block, got %d: %s", resp.StatusCode, string(body))
459 }
460
461 // Parse error response
462 var errResp struct {
463 Error string `json:"error"`
464 Message string `json:"message"`
465 }
466 if decodeErr := json.NewDecoder(resp.Body).Decode(&errResp); decodeErr != nil {
467 t.Fatalf("Failed to decode error response: %v", decodeErr)
468 }
469
470 // Verify error message mentions self-blocking
471 if !contains(errResp.Message, "cannot block yourself") {
472 t.Errorf("Expected error about self-blocking, got: %s", errResp.Message)
473 }
474
475 t.Logf("Self-block correctly prevented:")
476 t.Logf(" Status: %d", resp.StatusCode)
477 t.Logf(" Error: %s", errResp.Error)
478 t.Logf(" Message: %s", errResp.Message)
479
480 t.Logf("\nSELF-BLOCK PREVENTION TEST COMPLETE:")
481 t.Logf(" User attempted to block themselves")
482 t.Logf(" Server returned 400 with 'cannot block yourself'")
483 t.Logf(" No record written to PDS")
484}