A community based topic aggregation platform built on atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 506 lines 16 kB view raw
1package integration 2 3import ( 4 "Coves/internal/atproto/jetstream" 5 "Coves/internal/core/communities" 6 "context" 7 "database/sql" 8 "fmt" 9 "testing" 10 "time" 11 12 postgresRepo "Coves/internal/db/postgres" 13) 14 15// TestSubscriptionIndexing_ContentVisibility tests that contentVisibility is properly indexed 16// from Jetstream events and stored in the AppView database 17func TestSubscriptionIndexing_ContentVisibility(t *testing.T) { 18 if testing.Short() { 19 t.Skip("Skipping integration test in short mode") 20 } 21 22 ctx := context.Background() 23 db := setupTestDB(t) 24 defer cleanupTestDB(t, db) 25 26 repo := createTestCommunityRepo(t, db) 27 // Skip verification in tests 28 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 29 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 30 31 // Create a test community first (with unique DID) 32 testDID := fmt.Sprintf("did:plc:test-community-%d", time.Now().UnixNano()) 33 community := createTestCommunity(t, repo, "test-community-visibility", testDID) 34 35 t.Run("indexes subscription with contentVisibility=5", func(t *testing.T) { 36 userDID := "did:plc:test-user-123" 37 rkey := "test-sub-1" 38 uri := "at://" + userDID + "/social.coves.community.subscription/" + rkey 39 40 // Simulate Jetstream CREATE event for subscription 41 event := &jetstream.JetstreamEvent{ 42 Did: userDID, 43 Kind: "commit", 44 TimeUS: time.Now().UnixMicro(), 45 Commit: &jetstream.CommitEvent{ 46 Rev: "test-rev-1", 47 Operation: "create", 48 Collection: "social.coves.community.subscription", // CORRECT collection name 49 RKey: rkey, 50 CID: "bafytest123", 51 Record: map[string]interface{}{ 52 "$type": "social.coves.community.subscription", 53 "subject": community.DID, 54 "createdAt": time.Now().Format(time.RFC3339), 55 "contentVisibility": float64(5), // JSON numbers decode as float64 56 }, 57 }, 58 } 59 60 // Process event through consumer 61 err := consumer.HandleEvent(ctx, event) 62 if err != nil { 63 t.Fatalf("Failed to handle subscription event: %v", err) 64 } 65 66 // Verify subscription was indexed with correct contentVisibility 67 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 68 if err != nil { 69 t.Fatalf("Failed to get subscription: %v", err) 70 } 71 72 if subscription.ContentVisibility != 5 { 73 t.Errorf("Expected contentVisibility=5, got %d", subscription.ContentVisibility) 74 } 75 76 if subscription.UserDID != userDID { 77 t.Errorf("Expected userDID=%s, got %s", userDID, subscription.UserDID) 78 } 79 80 if subscription.CommunityDID != community.DID { 81 t.Errorf("Expected communityDID=%s, got %s", community.DID, subscription.CommunityDID) 82 } 83 84 if subscription.RecordURI != uri { 85 t.Errorf("Expected recordURI=%s, got %s", uri, subscription.RecordURI) 86 } 87 88 t.Logf("✓ Subscription indexed with contentVisibility=5") 89 }) 90 91 t.Run("defaults to contentVisibility=3 when not provided", func(t *testing.T) { 92 userDID := "did:plc:test-user-default" 93 rkey := "test-sub-default" 94 95 // Simulate Jetstream CREATE event WITHOUT contentVisibility field 96 event := &jetstream.JetstreamEvent{ 97 Did: userDID, 98 Kind: "commit", 99 TimeUS: time.Now().UnixMicro(), 100 Commit: &jetstream.CommitEvent{ 101 Rev: "test-rev-default", 102 Operation: "create", 103 Collection: "social.coves.community.subscription", 104 RKey: rkey, 105 CID: "bafydefault", 106 Record: map[string]interface{}{ 107 "$type": "social.coves.community.subscription", 108 "subject": community.DID, 109 "createdAt": time.Now().Format(time.RFC3339), 110 // contentVisibility NOT provided 111 }, 112 }, 113 } 114 115 // Process event 116 err := consumer.HandleEvent(ctx, event) 117 if err != nil { 118 t.Fatalf("Failed to handle subscription event: %v", err) 119 } 120 121 // Verify defaults to 3 122 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 123 if err != nil { 124 t.Fatalf("Failed to get subscription: %v", err) 125 } 126 127 if subscription.ContentVisibility != 3 { 128 t.Errorf("Expected contentVisibility=3 (default), got %d", subscription.ContentVisibility) 129 } 130 131 t.Logf("✓ Subscription defaulted to contentVisibility=3") 132 }) 133 134 t.Run("clamps contentVisibility to valid range (1-5)", func(t *testing.T) { 135 testCases := []struct { 136 name string 137 input float64 138 expected int 139 }{ 140 {input: 0, expected: 1, name: "zero clamped to 1"}, 141 {input: -5, expected: 1, name: "negative clamped to 1"}, 142 {input: 10, expected: 5, name: "10 clamped to 5"}, 143 {input: 100, expected: 5, name: "100 clamped to 5"}, 144 {input: 1, expected: 1, name: "1 stays 1"}, 145 {input: 3, expected: 3, name: "3 stays 3"}, 146 {input: 5, expected: 5, name: "5 stays 5"}, 147 } 148 149 for i, tc := range testCases { 150 t.Run(tc.name, func(t *testing.T) { 151 userDID := fmt.Sprintf("did:plc:test-clamp-%d", i) 152 rkey := fmt.Sprintf("test-sub-clamp-%d", i) 153 154 event := &jetstream.JetstreamEvent{ 155 Did: userDID, 156 Kind: "commit", 157 TimeUS: time.Now().UnixMicro(), 158 Commit: &jetstream.CommitEvent{ 159 Rev: "test-rev-clamp", 160 Operation: "create", 161 Collection: "social.coves.community.subscription", 162 RKey: rkey, 163 CID: "bafyclamp", 164 Record: map[string]interface{}{ 165 "$type": "social.coves.community.subscription", 166 "subject": community.DID, 167 "createdAt": time.Now().Format(time.RFC3339), 168 "contentVisibility": tc.input, 169 }, 170 }, 171 } 172 173 err := consumer.HandleEvent(ctx, event) 174 if err != nil { 175 t.Fatalf("Failed to handle subscription event: %v", err) 176 } 177 178 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 179 if err != nil { 180 t.Fatalf("Failed to get subscription: %v", err) 181 } 182 183 if subscription.ContentVisibility != tc.expected { 184 t.Errorf("Input %.0f: expected %d, got %d", tc.input, tc.expected, subscription.ContentVisibility) 185 } 186 187 t.Logf("✓ Input %.0f clamped to %d", tc.input, subscription.ContentVisibility) 188 }) 189 } 190 }) 191 192 t.Run("idempotency: duplicate subscription events don't fail", func(t *testing.T) { 193 userDID := "did:plc:test-idempotent" 194 rkey := "test-sub-idempotent" 195 196 event := &jetstream.JetstreamEvent{ 197 Did: userDID, 198 Kind: "commit", 199 TimeUS: time.Now().UnixMicro(), 200 Commit: &jetstream.CommitEvent{ 201 Rev: "test-rev-idempotent", 202 Operation: "create", 203 Collection: "social.coves.community.subscription", 204 RKey: rkey, 205 CID: "bafyidempotent", 206 Record: map[string]interface{}{ 207 "$type": "social.coves.community.subscription", 208 "subject": community.DID, 209 "createdAt": time.Now().Format(time.RFC3339), 210 "contentVisibility": float64(4), 211 }, 212 }, 213 } 214 215 // Process first time 216 err := consumer.HandleEvent(ctx, event) 217 if err != nil { 218 t.Fatalf("Failed to handle first subscription event: %v", err) 219 } 220 221 // Process again (Jetstream replay scenario) 222 err = consumer.HandleEvent(ctx, event) 223 if err != nil { 224 t.Errorf("Idempotency failed: second event should not error, got: %v", err) 225 } 226 227 // Verify only one subscription exists 228 subscription, err := repo.GetSubscription(ctx, userDID, community.DID) 229 if err != nil { 230 t.Fatalf("Failed to get subscription: %v", err) 231 } 232 233 if subscription.ContentVisibility != 4 { 234 t.Errorf("Expected contentVisibility=4, got %d", subscription.ContentVisibility) 235 } 236 237 t.Logf("✓ Duplicate events handled idempotently") 238 }) 239} 240 241// TestSubscriptionIndexing_DeleteOperations tests unsubscribe (DELETE) event handling 242func TestSubscriptionIndexing_DeleteOperations(t *testing.T) { 243 if testing.Short() { 244 t.Skip("Skipping integration test in short mode") 245 } 246 247 ctx := context.Background() 248 db := setupTestDB(t) 249 defer cleanupTestDB(t, db) 250 251 repo := createTestCommunityRepo(t, db) 252 // Skip verification in tests 253 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 254 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 255 256 // Create test community (with unique DID) 257 testDID := fmt.Sprintf("did:plc:test-unsub-%d", time.Now().UnixNano()) 258 community := createTestCommunity(t, repo, "test-unsubscribe", testDID) 259 260 t.Run("deletes subscription when DELETE event received", func(t *testing.T) { 261 userDID := "did:plc:test-user-delete" 262 rkey := "test-sub-delete" 263 264 // First, create a subscription 265 createEvent := &jetstream.JetstreamEvent{ 266 Did: userDID, 267 Kind: "commit", 268 TimeUS: time.Now().UnixMicro(), 269 Commit: &jetstream.CommitEvent{ 270 Rev: "test-rev-create", 271 Operation: "create", 272 Collection: "social.coves.community.subscription", 273 RKey: rkey, 274 CID: "bafycreate", 275 Record: map[string]interface{}{ 276 "$type": "social.coves.community.subscription", 277 "subject": community.DID, 278 "createdAt": time.Now().Format(time.RFC3339), 279 "contentVisibility": float64(3), 280 }, 281 }, 282 } 283 284 err := consumer.HandleEvent(ctx, createEvent) 285 if err != nil { 286 t.Fatalf("Failed to create subscription: %v", err) 287 } 288 289 // Verify subscription exists 290 _, err = repo.GetSubscription(ctx, userDID, community.DID) 291 if err != nil { 292 t.Fatalf("Subscription should exist: %v", err) 293 } 294 295 // Now send DELETE event (unsubscribe) 296 // IMPORTANT: DELETE operations don't include record data in Jetstream 297 deleteEvent := &jetstream.JetstreamEvent{ 298 Did: userDID, 299 Kind: "commit", 300 TimeUS: time.Now().UnixMicro(), 301 Commit: &jetstream.CommitEvent{ 302 Rev: "test-rev-delete", 303 Operation: "delete", 304 Collection: "social.coves.community.subscription", 305 RKey: rkey, 306 CID: "", // No CID on deletes 307 Record: nil, // No record data on deletes 308 }, 309 } 310 311 err = consumer.HandleEvent(ctx, deleteEvent) 312 if err != nil { 313 t.Fatalf("Failed to handle delete event: %v", err) 314 } 315 316 // Verify subscription was deleted 317 _, err = repo.GetSubscription(ctx, userDID, community.DID) 318 if err == nil { 319 t.Errorf("Subscription should have been deleted") 320 } 321 if !communities.IsNotFound(err) { 322 t.Errorf("Expected NotFound error, got: %v", err) 323 } 324 325 t.Logf("✓ Subscription deleted successfully") 326 }) 327 328 t.Run("idempotent delete: deleting non-existent subscription doesn't fail", func(t *testing.T) { 329 userDID := "did:plc:test-user-noexist" 330 rkey := "test-sub-noexist" 331 332 // Try to delete a subscription that doesn't exist 333 deleteEvent := &jetstream.JetstreamEvent{ 334 Did: userDID, 335 Kind: "commit", 336 TimeUS: time.Now().UnixMicro(), 337 Commit: &jetstream.CommitEvent{ 338 Rev: "test-rev-noexist", 339 Operation: "delete", 340 Collection: "social.coves.community.subscription", 341 RKey: rkey, 342 CID: "", 343 Record: nil, 344 }, 345 } 346 347 // Should not error (idempotent) 348 err := consumer.HandleEvent(ctx, deleteEvent) 349 if err != nil { 350 t.Errorf("Deleting non-existent subscription should not error, got: %v", err) 351 } 352 353 t.Logf("✓ Idempotent delete handled gracefully") 354 }) 355} 356 357// TestSubscriptionIndexing_SubscriberCount tests that subscriber counts are updated atomically 358func TestSubscriptionIndexing_SubscriberCount(t *testing.T) { 359 if testing.Short() { 360 t.Skip("Skipping integration test in short mode") 361 } 362 363 ctx := context.Background() 364 db := setupTestDB(t) 365 defer cleanupTestDB(t, db) 366 367 repo := createTestCommunityRepo(t, db) 368 // Skip verification in tests 369 // Pass nil for identity resolver - not needed since consumer constructs handles from DIDs 370 consumer := jetstream.NewCommunityEventConsumer(repo, "did:web:coves.local", true, nil) 371 372 // Create test community (with unique DID) 373 testDID := fmt.Sprintf("did:plc:test-subcount-%d", time.Now().UnixNano()) 374 community := createTestCommunity(t, repo, "test-subscriber-count", testDID) 375 376 // Verify initial subscriber count is 0 377 comm, err := repo.GetByDID(ctx, community.DID) 378 if err != nil { 379 t.Fatalf("Failed to get community: %v", err) 380 } 381 if comm.SubscriberCount != 0 { 382 t.Errorf("Initial subscriber count should be 0, got %d", comm.SubscriberCount) 383 } 384 385 t.Run("increments subscriber count on subscribe", func(t *testing.T) { 386 userDID := "did:plc:test-user-count1" 387 rkey := "test-sub-count1" 388 389 event := &jetstream.JetstreamEvent{ 390 Did: userDID, 391 Kind: "commit", 392 TimeUS: time.Now().UnixMicro(), 393 Commit: &jetstream.CommitEvent{ 394 Rev: "test-rev-count", 395 Operation: "create", 396 Collection: "social.coves.community.subscription", 397 RKey: rkey, 398 CID: "bafycount", 399 Record: map[string]interface{}{ 400 "$type": "social.coves.community.subscription", 401 "subject": community.DID, 402 "createdAt": time.Now().Format(time.RFC3339), 403 "contentVisibility": float64(3), 404 }, 405 }, 406 } 407 408 err := consumer.HandleEvent(ctx, event) 409 if err != nil { 410 t.Fatalf("Failed to handle subscription: %v", err) 411 } 412 413 // Check subscriber count incremented 414 comm, err := repo.GetByDID(ctx, community.DID) 415 if err != nil { 416 t.Fatalf("Failed to get community: %v", err) 417 } 418 419 if comm.SubscriberCount != 1 { 420 t.Errorf("Subscriber count should be 1, got %d", comm.SubscriberCount) 421 } 422 423 t.Logf("✓ Subscriber count incremented to 1") 424 }) 425 426 t.Run("decrements subscriber count on unsubscribe", func(t *testing.T) { 427 userDID := "did:plc:test-user-count1" // Same user from above 428 rkey := "test-sub-count1" 429 430 // Send DELETE event 431 deleteEvent := &jetstream.JetstreamEvent{ 432 Did: userDID, 433 Kind: "commit", 434 TimeUS: time.Now().UnixMicro(), 435 Commit: &jetstream.CommitEvent{ 436 Rev: "test-rev-unsub", 437 Operation: "delete", 438 Collection: "social.coves.community.subscription", 439 RKey: rkey, 440 CID: "", 441 Record: nil, 442 }, 443 } 444 445 err := consumer.HandleEvent(ctx, deleteEvent) 446 if err != nil { 447 t.Fatalf("Failed to handle unsubscribe: %v", err) 448 } 449 450 // Check subscriber count decremented back to 0 451 comm, err := repo.GetByDID(ctx, community.DID) 452 if err != nil { 453 t.Fatalf("Failed to get community: %v", err) 454 } 455 456 if comm.SubscriberCount != 0 { 457 t.Errorf("Subscriber count should be 0, got %d", comm.SubscriberCount) 458 } 459 460 t.Logf("✓ Subscriber count decremented to 0") 461 }) 462} 463 464// Helper functions 465 466func createTestCommunity(t *testing.T, repo communities.Repository, name, did string) *communities.Community { 467 t.Helper() 468 469 // Add timestamp to make handles unique across test runs 470 uniqueHandle := fmt.Sprintf("%s-%d.test.coves.social", name, time.Now().UnixNano()) 471 472 community := &communities.Community{ 473 DID: did, 474 Handle: uniqueHandle, 475 Name: name, 476 DisplayName: "Test Community " + name, 477 Description: "Test community for subscription indexing", 478 OwnerDID: did, 479 CreatedByDID: "did:plc:test-creator", 480 HostedByDID: "did:plc:test-instance", 481 Visibility: "public", 482 CreatedAt: time.Now(), 483 UpdatedAt: time.Now(), 484 } 485 486 created, err := repo.Create(context.Background(), community) 487 if err != nil { 488 t.Fatalf("Failed to create test community: %v", err) 489 } 490 491 return created 492} 493 494func createTestCommunityRepo(t *testing.T, db interface{}) communities.Repository { 495 t.Helper() 496 // Import the postgres package to create a repo 497 return postgresRepo.NewCommunityRepository(db.(*sql.DB)) 498} 499 500func cleanupTestDB(t *testing.T, db interface{}) { 501 t.Helper() 502 sqlDB := db.(*sql.DB) 503 if err := sqlDB.Close(); err != nil { 504 t.Logf("Failed to close database: %v", err) 505 } 506}