A social knowledge tool for researchers built on ATProto
at main 592 lines 22 kB view raw
1import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis'; 2import Redis from 'ioredis'; 3import { BullMQEventPublisher } from '../../../../shared/infrastructure/events/BullMQEventPublisher'; 4import { BullMQEventSubscriber } from '../../../../shared/infrastructure/events/BullMQEventSubscriber'; 5import { CardAddedToLibraryEvent } from '../../domain/events/CardAddedToLibraryEvent'; 6import { CardAddedToCollectionEvent } from '../../domain/events/CardAddedToCollectionEvent'; 7import { CardId } from '../../domain/value-objects/CardId'; 8import { CuratorId } from '../../domain/value-objects/CuratorId'; 9import { CollectionId } from '../../domain/value-objects/CollectionId'; 10import { IEventHandler } from '../../../../shared/application/events/IEventSubscriber'; 11import { ok, err } from '../../../../shared/core/Result'; 12import { EventNames } from '../../../../shared/infrastructure/events/EventConfig'; 13import { Queue } from 'bullmq'; 14import { QueueNames } from 'src/shared/infrastructure/events/QueueConfig'; 15import { CardCollectionSaga } from '../../../feeds/application/sagas/CardCollectionSaga'; 16import { RedisSagaStateStore } from '../../../feeds/infrastructure/RedisSagaStateStore'; 17 18describe('BullMQ Event System Integration', () => { 19 let redisContainer: StartedRedisContainer; 20 let redis: Redis; 21 let publisher: BullMQEventPublisher; 22 let subscriber: BullMQEventSubscriber; 23 24 beforeAll(async () => { 25 // Start Redis container 26 redisContainer = await new RedisContainer('redis:7-alpine') 27 .withExposedPorts(6379) 28 .start(); 29 30 // Create Redis connection 31 const connectionUrl = redisContainer.getConnectionUrl(); 32 redis = new Redis(connectionUrl, { maxRetriesPerRequest: null }); 33 34 // Create publisher and subscriber 35 publisher = new BullMQEventPublisher(redis); 36 subscriber = new BullMQEventSubscriber(redis, { 37 queueName: QueueNames.FEEDS, 38 }); 39 }, 60000); // Increase timeout for container startup 40 41 afterAll(async () => { 42 // Clean up 43 if (subscriber) { 44 await subscriber.stop(); 45 } 46 if (publisher) { 47 await publisher.close(); 48 } 49 if (redis) { 50 await redis.quit(); 51 } 52 if (redisContainer) { 53 await redisContainer.stop(); 54 } 55 }); 56 57 beforeEach(async () => { 58 // Clear Redis data between tests 59 await redis.flushall(); 60 }); 61 62 describe('Event Publishing and Subscription', () => { 63 it('should publish and receive CardAddedToLibraryEvent', async () => { 64 // Arrange 65 const receivedEvents: CardAddedToLibraryEvent[] = []; 66 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 67 handle: jest 68 .fn() 69 .mockImplementation(async (event: CardAddedToLibraryEvent) => { 70 receivedEvents.push(event); 71 return ok(undefined); 72 }), 73 }; 74 75 // Subscribe to events 76 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 77 await subscriber.start(); 78 79 // Create test event 80 const cardId = CardId.createFromString('test-card-123').unwrap(); 81 const curatorId = CuratorId.create('did:plc:testuser123').unwrap(); 82 const event = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap(); 83 84 // Act - Publish event 85 const publishResult = await publisher.publishEvents([event]); 86 87 // Assert - Publishing succeeded 88 expect(publishResult.isOk()).toBe(true); 89 90 // Wait for event processing 91 await new Promise((resolve) => setTimeout(resolve, 2000)); 92 93 // Assert - Event was received and processed 94 expect(mockHandler.handle).toHaveBeenCalledTimes(1); 95 expect(receivedEvents).toHaveLength(1); 96 97 const receivedEvent = receivedEvents[0]; 98 expect(receivedEvent).toBeInstanceOf(CardAddedToLibraryEvent); 99 expect(receivedEvent!.cardId.getStringValue()).toBe( 100 cardId.getStringValue(), 101 ); 102 expect(receivedEvent!.curatorId.value).toBe(curatorId.value); 103 expect(receivedEvent!.dateTimeOccurred).toBeInstanceOf(Date); 104 }, 15000); 105 106 it('should publish and receive multiple events in sequence', async () => { 107 // Arrange 108 const receivedEvents: CardAddedToLibraryEvent[] = []; 109 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 110 handle: jest 111 .fn() 112 .mockImplementation(async (event: CardAddedToLibraryEvent) => { 113 receivedEvents.push(event); 114 return ok(undefined); 115 }), 116 }; 117 118 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 119 await subscriber.start(); 120 121 // Create multiple test events 122 const events = [ 123 CardAddedToLibraryEvent.create( 124 CardId.createFromString('card-1').unwrap(), 125 CuratorId.create('did:plc:user1').unwrap(), 126 ).unwrap(), 127 CardAddedToLibraryEvent.create( 128 CardId.createFromString('card-2').unwrap(), 129 CuratorId.create('did:plc:user2').unwrap(), 130 ).unwrap(), 131 CardAddedToLibraryEvent.create( 132 CardId.createFromString('card-3').unwrap(), 133 CuratorId.create('did:plc:user3').unwrap(), 134 ).unwrap(), 135 ]; 136 137 // Act - Publish all events 138 const publishResult = await publisher.publishEvents(events); 139 140 // Assert - Publishing succeeded 141 expect(publishResult.isOk()).toBe(true); 142 143 // Wait for event processing 144 await new Promise((resolve) => setTimeout(resolve, 3000)); 145 146 // Assert - All events were received 147 expect(mockHandler.handle).toHaveBeenCalledTimes(3); 148 expect(receivedEvents).toHaveLength(3); 149 150 // Verify each event was processed correctly 151 const cardIds = receivedEvents.map((e) => e.cardId.getStringValue()); 152 expect(cardIds).toContain('card-1'); 153 expect(cardIds).toContain('card-2'); 154 expect(cardIds).toContain('card-3'); 155 }, 20000); 156 157 it('should handle event processing failures gracefully', async () => { 158 // Arrange 159 let callCount = 0; 160 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 161 handle: jest.fn().mockImplementation(async () => { 162 callCount++; 163 if (callCount === 1) { 164 // First call fails 165 return err(new Error('Processing failed')); 166 } 167 // Subsequent calls succeed (for retry) 168 return ok(undefined); 169 }), 170 }; 171 172 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 173 await subscriber.start(); 174 175 const event = CardAddedToLibraryEvent.create( 176 CardId.createFromString('failing-card').unwrap(), 177 CuratorId.create('did:plc:failuser').unwrap(), 178 ).unwrap(); 179 180 // Act - Publish event that will initially fail 181 const publishResult = await publisher.publishEvents([event]); 182 183 // Assert - Publishing succeeded (failure happens during processing) 184 expect(publishResult.isOk()).toBe(true); 185 186 // Wait for initial processing and potential retries 187 await new Promise((resolve) => setTimeout(resolve, 5000)); 188 189 // Assert - Handler was called (at least once for initial attempt) 190 expect(mockHandler.handle).toHaveBeenCalled(); 191 }, 25000); 192 193 it('should not process events when no handler is registered', async () => { 194 // Arrange - Start subscriber without registering any handlers 195 await subscriber.start(); 196 197 const event = CardAddedToLibraryEvent.create( 198 CardId.createFromString('unhandled-card').unwrap(), 199 CuratorId.create('did:plc:unhandleduser').unwrap(), 200 ).unwrap(); 201 202 // Act - Publish event 203 const publishResult = await publisher.publishEvents([event]); 204 205 // Assert - Publishing succeeded (no handler doesn't prevent publishing) 206 expect(publishResult.isOk()).toBe(true); 207 208 // Wait to ensure no processing occurs 209 await new Promise((resolve) => setTimeout(resolve, 2000)); 210 211 // No assertions needed - test passes if no errors are thrown 212 }, 10000); 213 214 it('should maintain event data integrity during serialization/deserialization', async () => { 215 // Arrange 216 let receivedEvent: CardAddedToLibraryEvent | null = null; 217 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 218 handle: jest 219 .fn() 220 .mockImplementation(async (event: CardAddedToLibraryEvent) => { 221 receivedEvent = event; 222 return ok(undefined); 223 }), 224 }; 225 226 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 227 await subscriber.start(); 228 229 // Create event with specific data 230 const originalCardId = CardId.createFromString( 231 'integrity-test-card-456', 232 ).unwrap(); 233 const originalCuratorId = CuratorId.create( 234 'did:plc:integrityuser789', 235 ).unwrap(); 236 const originalEvent = CardAddedToLibraryEvent.create( 237 originalCardId, 238 originalCuratorId, 239 ).unwrap(); 240 const originalTimestamp = originalEvent.dateTimeOccurred; 241 242 // Act 243 await publisher.publishEvents([originalEvent]); 244 await new Promise((resolve) => setTimeout(resolve, 2000)); 245 246 // Assert - Event data was preserved through serialization/deserialization 247 expect(receivedEvent).not.toBeNull(); 248 expect(receivedEvent!.cardId.getStringValue()).toBe( 249 originalCardId.getStringValue(), 250 ); 251 expect(receivedEvent!.curatorId.value).toBe(originalCuratorId.value); 252 expect(receivedEvent!.dateTimeOccurred.getTime()).toBe( 253 originalTimestamp.getTime(), 254 ); 255 expect(receivedEvent!.getAggregateId().toString()).toBe( 256 originalCardId.getValue().toString(), 257 ); 258 }, 15000); 259 }); 260 261 describe('Queue Configuration', () => { 262 it('should route events to the feeds queue', async () => { 263 // This test verifies the queue routing logic by checking Redis directly 264 const event = CardAddedToLibraryEvent.create( 265 CardId.createFromString('queue-test-card').unwrap(), 266 CuratorId.create('did:plc:queueuser').unwrap(), 267 ).unwrap(); 268 269 await publisher.publishEvents([event]); 270 271 // Create a Queue instance to check job counts 272 const eventsQueue = new Queue(QueueNames.FEEDS, { connection: redis }); 273 274 // Wait for job to be added 275 await new Promise((resolve) => setTimeout(resolve, 100)); 276 277 // Check total number of jobs (regardless of state) 278 const jobCounts = await eventsQueue.getJobCounts(); 279 const totalJobs = Object.values(jobCounts).reduce( 280 (sum, count) => sum + count, 281 0, 282 ); 283 284 expect(totalJobs).toBeGreaterThanOrEqual(1); 285 286 await eventsQueue.close(); 287 }, 10000); 288 }); 289 290 describe('Redis-Based Saga Integration', () => { 291 it('should handle distributed saga state across multiple workers', async () => { 292 // Arrange - Create two saga instances (simulating multiple workers) 293 const mockUseCase = { 294 execute: jest 295 .fn() 296 .mockResolvedValue(ok({ activityId: 'test-activity' })), 297 } as any; 298 299 const stateStore = new RedisSagaStateStore(redis); 300 const saga1 = new CardCollectionSaga(mockUseCase, stateStore); 301 const saga2 = new CardCollectionSaga(mockUseCase, stateStore); 302 303 // Create test events for same card/user (should be aggregated) 304 const cardId = CardId.createFromString('saga-test-card').unwrap(); 305 const curatorId = CuratorId.create('did:plc:sagatest').unwrap(); 306 307 const libraryEvent = CardAddedToLibraryEvent.create( 308 cardId, 309 curatorId, 310 ).unwrap(); 311 const collectionEvent = CardAddedToCollectionEvent.create( 312 cardId, 313 CollectionId.createFromString('test-collection').unwrap(), 314 curatorId, 315 ).unwrap(); 316 317 // Act - Process events with different saga instances 318 const result1 = await saga1.handleCardEvent(libraryEvent); 319 const result2 = await saga2.handleCardEvent(collectionEvent); 320 321 // Assert - Both operations succeeded 322 expect(result1.isOk()).toBe(true); 323 expect(result2.isOk()).toBe(true); 324 325 // Wait for aggregation window 326 await new Promise((resolve) => setTimeout(resolve, 3500)); 327 328 // Assert - Only one aggregated activity was created 329 expect(mockUseCase.execute).toHaveBeenCalledTimes(1); 330 331 const call = mockUseCase.execute.mock.calls[0][0]; 332 expect(call.cardId).toBe(cardId.getStringValue()); 333 expect(call.actorId).toBe(curatorId.value); 334 expect(call.collectionIds).toContain('test-collection'); 335 }, 15000); 336 337 it('should handle concurrent lock contention with retry mechanism', async () => { 338 // Arrange - Create multiple saga instances 339 const mockUseCase = { 340 execute: jest 341 .fn() 342 .mockResolvedValue(ok({ activityId: 'test-activity' })), 343 } as any; 344 345 const stateStore = new RedisSagaStateStore(redis); 346 const saga1 = new CardCollectionSaga(mockUseCase, stateStore); 347 const saga2 = new CardCollectionSaga(mockUseCase, stateStore); 348 const saga3 = new CardCollectionSaga(mockUseCase, stateStore); 349 350 // Create events for same card/user (will compete for same lock) 351 const cardId = CardId.createFromString('concurrent-test-card').unwrap(); 352 const curatorId = CuratorId.create('did:plc:concurrentuser').unwrap(); 353 const collectionId1 = 354 CollectionId.createFromString('collection-1').unwrap(); 355 const collectionId2 = 356 CollectionId.createFromString('collection-2').unwrap(); 357 358 const events = [ 359 CardAddedToLibraryEvent.create(cardId, curatorId).unwrap(), 360 CardAddedToCollectionEvent.create( 361 cardId, 362 collectionId1, 363 curatorId, 364 ).unwrap(), 365 CardAddedToCollectionEvent.create( 366 cardId, 367 collectionId2, 368 curatorId, 369 ).unwrap(), 370 ]; 371 372 // Act - Process all events concurrently (not sequentially) 373 const results = await Promise.all([ 374 saga1.handleCardEvent(events[0]!), 375 saga2.handleCardEvent(events[1]!), 376 saga3.handleCardEvent(events[2]!), 377 ]); 378 379 // Assert - All should succeed (no events dropped due to lock contention) 380 results.forEach((result) => expect(result.isOk()).toBe(true)); 381 382 // Wait for aggregation window 383 await new Promise((resolve) => setTimeout(resolve, 3500)); 384 385 // Assert - Should create single activity with all collections 386 expect(mockUseCase.execute).toHaveBeenCalledTimes(1); 387 const call = mockUseCase.execute.mock.calls[0][0]; 388 expect(call.cardId).toBe(cardId.getStringValue()); 389 expect(call.actorId).toBe(curatorId.value); 390 expect(call.collectionIds).toHaveLength(2); 391 expect(call.collectionIds).toContain('collection-1'); 392 expect(call.collectionIds).toContain('collection-2'); 393 }, 20000); 394 395 it('should handle high concurrency with many simultaneous events', async () => { 396 // Arrange - Create many saga instances 397 const mockUseCase = { 398 execute: jest 399 .fn() 400 .mockResolvedValue(ok({ activityId: 'test-activity' })), 401 } as any; 402 403 const stateStore = new RedisSagaStateStore(redis); 404 const cardId = CardId.createFromString('high-concurrency-card').unwrap(); 405 const curatorId = CuratorId.create('did:plc:highconcurrency').unwrap(); 406 407 // Create 10 collection events that will all compete for the same lock 408 const events = Array.from({ length: 10 }, (_, i) => { 409 const saga = new CardCollectionSaga(mockUseCase, stateStore); 410 const collectionId = CollectionId.createFromString( 411 `collection-${i}`, 412 ).unwrap(); 413 const event = CardAddedToCollectionEvent.create( 414 cardId, 415 collectionId, 416 curatorId, 417 ).unwrap(); 418 return { saga, event }; 419 }); 420 421 // Add one library event 422 const librarySaga = new CardCollectionSaga(mockUseCase, stateStore); 423 const libraryEvent = CardAddedToLibraryEvent.create( 424 cardId, 425 curatorId, 426 ).unwrap(); 427 428 // Act - Process all events concurrently 429 const allPromises = [ 430 librarySaga.handleCardEvent(libraryEvent), 431 ...events.map(({ saga, event }) => saga.handleCardEvent(event)), 432 ]; 433 434 const results = await Promise.all(allPromises); 435 436 // Assert - All should succeed 437 results.forEach((result) => expect(result.isOk()).toBe(true)); 438 439 // Wait longer for aggregation window and retry processing 440 await new Promise((resolve) => setTimeout(resolve, 8000)); // Increased from 3500ms 441 442 // Assert - Should create single activity with all 10 collections 443 expect(mockUseCase.execute).toHaveBeenCalledTimes(1); 444 const call = mockUseCase.execute.mock.calls[0][0]; 445 expect(call.cardId).toBe(cardId.getStringValue()); 446 expect(call.actorId).toBe(curatorId.value); 447 expect(call.collectionIds).toHaveLength(10); 448 449 // Verify all collection IDs are present 450 for (let i = 0; i < 10; i++) { 451 expect(call.collectionIds).toContain(`collection-${i}`); 452 } 453 }, 35000); // Increased timeout from 25000ms 454 455 it('should recover when lock expires due to timeout', async () => { 456 // Arrange 457 const mockUseCase = { 458 execute: jest 459 .fn() 460 .mockResolvedValue(ok({ activityId: 'test-activity' })), 461 } as any; 462 463 const stateStore = new RedisSagaStateStore(redis); 464 const cardId = CardId.createFromString('timeout-test-card').unwrap(); 465 const curatorId = CuratorId.create('did:plc:timeoutuser').unwrap(); 466 467 // Manually acquire lock with short TTL to simulate crashed worker 468 const lockKey = 'saga:feed:lock:timeout-test-card-did:plc:timeoutuser'; 469 await stateStore.set(lockKey, '1', 'EX', 2, 'NX'); // 2 second TTL (increased from 1) 470 471 // Create saga and event 472 const saga = new CardCollectionSaga(mockUseCase, stateStore); 473 const event = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap(); 474 475 // Act - Try to process event (should initially be blocked by lock) 476 // But should succeed after lock expires and retry mechanism kicks in 477 const result = await saga.handleCardEvent(event); 478 479 // Assert - Should succeed after lock expires 480 expect(result.isOk()).toBe(true); 481 482 // Wait longer for aggregation window 483 await new Promise((resolve) => setTimeout(resolve, 5000)); // Increased from 3500ms 484 485 // Assert - Activity should be created 486 expect(mockUseCase.execute).toHaveBeenCalledTimes(1); 487 }, 20000); // Increased timeout from 15000ms 488 489 it('should handle retry mechanism under lock contention', async () => { 490 // Arrange 491 const mockUseCase = { 492 execute: jest 493 .fn() 494 .mockResolvedValue(ok({ activityId: 'test-activity' })), 495 } as any; 496 497 const stateStore = new RedisSagaStateStore(redis); 498 const cardId = CardId.createFromString('retry-test-card').unwrap(); 499 const curatorId = CuratorId.create('did:plc:retryuser').unwrap(); 500 501 // Create a saga that will hold the lock for a while 502 const lockHoldingSaga = new CardCollectionSaga(mockUseCase, stateStore); 503 const retryingSaga = new CardCollectionSaga(mockUseCase, stateStore); 504 505 const event1 = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap(); 506 const event2 = CardAddedToCollectionEvent.create( 507 cardId, 508 CollectionId.createFromString('retry-collection').unwrap(), 509 curatorId, 510 ).unwrap(); 511 512 // Act - Start first saga (will acquire lock) 513 const promise1 = lockHoldingSaga.handleCardEvent(event1); 514 515 // Immediately start second saga (will need to retry) 516 const promise2 = retryingSaga.handleCardEvent(event2); 517 518 const results = await Promise.all([promise1, promise2]); 519 520 // Assert - Both should succeed 521 expect(results[0].isOk()).toBe(true); 522 expect(results[1].isOk()).toBe(true); 523 524 // Wait for aggregation window 525 await new Promise((resolve) => setTimeout(resolve, 3500)); 526 527 // Assert - Should create single aggregated activity 528 expect(mockUseCase.execute).toHaveBeenCalledTimes(1); 529 const call = mockUseCase.execute.mock.calls[0][0]; 530 expect(call.collectionIds).toContain('retry-collection'); 531 }, 20000); 532 }); 533 534 describe('Multi-Queue Event Routing', () => { 535 it('should route events to multiple queues', async () => { 536 // Stop the shared subscriber to avoid interference 537 await subscriber.stop(); 538 539 // Clear any pending jobs 540 await redis.flushall(); 541 542 // Arrange - Create subscribers for different queues 543 const feedsSubscriber = new BullMQEventSubscriber(redis, { 544 queueName: QueueNames.FEEDS, 545 }); 546 const searchSubscriber = new BullMQEventSubscriber(redis, { 547 queueName: QueueNames.SEARCH, 548 }); 549 550 const feedsHandler = { 551 handle: jest.fn().mockResolvedValue(ok(undefined)), 552 }; 553 const searchHandler = { 554 handle: jest.fn().mockResolvedValue(ok(undefined)), 555 }; 556 557 await feedsSubscriber.subscribe( 558 EventNames.CARD_ADDED_TO_LIBRARY, 559 feedsHandler, 560 ); 561 await searchSubscriber.subscribe( 562 EventNames.CARD_ADDED_TO_LIBRARY, 563 searchHandler, 564 ); 565 566 await feedsSubscriber.start(); 567 await searchSubscriber.start(); 568 569 // Give workers time to initialize 570 await new Promise((resolve) => setTimeout(resolve, 500)); 571 572 // Act - Publish single event 573 const event = CardAddedToLibraryEvent.create( 574 CardId.createFromString('multi-queue-card').unwrap(), 575 CuratorId.create('did:plc:multiuser').unwrap(), 576 ).unwrap(); 577 578 await publisher.publishEvents([event]); 579 580 // Wait for processing 581 await new Promise((resolve) => setTimeout(resolve, 3000)); 582 583 // Assert - Event processed by both queues 584 expect(feedsHandler.handle).toHaveBeenCalledTimes(1); 585 expect(searchHandler.handle).toHaveBeenCalledTimes(1); 586 587 // Cleanup 588 await feedsSubscriber.stop(); 589 await searchSubscriber.stop(); 590 }, 15000); 591 }); 592});