A social knowledge tool for researchers built on ATProto
1import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis';
2import Redis from 'ioredis';
3import { BullMQEventPublisher } from '../../../../shared/infrastructure/events/BullMQEventPublisher';
4import { BullMQEventSubscriber } from '../../../../shared/infrastructure/events/BullMQEventSubscriber';
5import { CardAddedToLibraryEvent } from '../../domain/events/CardAddedToLibraryEvent';
6import { CardAddedToCollectionEvent } from '../../domain/events/CardAddedToCollectionEvent';
7import { CardId } from '../../domain/value-objects/CardId';
8import { CuratorId } from '../../domain/value-objects/CuratorId';
9import { CollectionId } from '../../domain/value-objects/CollectionId';
10import { IEventHandler } from '../../../../shared/application/events/IEventSubscriber';
11import { ok, err } from '../../../../shared/core/Result';
12import { EventNames } from '../../../../shared/infrastructure/events/EventConfig';
13import { Queue } from 'bullmq';
14import { QueueNames } from 'src/shared/infrastructure/events/QueueConfig';
15import { CardCollectionSaga } from '../../../feeds/application/sagas/CardCollectionSaga';
16import { RedisSagaStateStore } from '../../../feeds/infrastructure/RedisSagaStateStore';
17
18describe('BullMQ Event System Integration', () => {
19 let redisContainer: StartedRedisContainer;
20 let redis: Redis;
21 let publisher: BullMQEventPublisher;
22 let subscriber: BullMQEventSubscriber;
23
24 beforeAll(async () => {
25 // Start Redis container
26 redisContainer = await new RedisContainer('redis:7-alpine')
27 .withExposedPorts(6379)
28 .start();
29
30 // Create Redis connection
31 const connectionUrl = redisContainer.getConnectionUrl();
32 redis = new Redis(connectionUrl, { maxRetriesPerRequest: null });
33
34 // Create publisher and subscriber
35 publisher = new BullMQEventPublisher(redis);
36 subscriber = new BullMQEventSubscriber(redis, {
37 queueName: QueueNames.FEEDS,
38 });
39 }, 60000); // Increase timeout for container startup
40
41 afterAll(async () => {
42 // Clean up
43 if (subscriber) {
44 await subscriber.stop();
45 }
46 if (publisher) {
47 await publisher.close();
48 }
49 if (redis) {
50 await redis.quit();
51 }
52 if (redisContainer) {
53 await redisContainer.stop();
54 }
55 });
56
57 beforeEach(async () => {
58 // Clear Redis data between tests
59 await redis.flushall();
60 });
61
62 describe('Event Publishing and Subscription', () => {
63 it('should publish and receive CardAddedToLibraryEvent', async () => {
64 // Arrange
65 const receivedEvents: CardAddedToLibraryEvent[] = [];
66 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
67 handle: jest
68 .fn()
69 .mockImplementation(async (event: CardAddedToLibraryEvent) => {
70 receivedEvents.push(event);
71 return ok(undefined);
72 }),
73 };
74
75 // Subscribe to events
76 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
77 await subscriber.start();
78
79 // Create test event
80 const cardId = CardId.createFromString('test-card-123').unwrap();
81 const curatorId = CuratorId.create('did:plc:testuser123').unwrap();
82 const event = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap();
83
84 // Act - Publish event
85 const publishResult = await publisher.publishEvents([event]);
86
87 // Assert - Publishing succeeded
88 expect(publishResult.isOk()).toBe(true);
89
90 // Wait for event processing
91 await new Promise((resolve) => setTimeout(resolve, 2000));
92
93 // Assert - Event was received and processed
94 expect(mockHandler.handle).toHaveBeenCalledTimes(1);
95 expect(receivedEvents).toHaveLength(1);
96
97 const receivedEvent = receivedEvents[0];
98 expect(receivedEvent).toBeInstanceOf(CardAddedToLibraryEvent);
99 expect(receivedEvent!.cardId.getStringValue()).toBe(
100 cardId.getStringValue(),
101 );
102 expect(receivedEvent!.curatorId.value).toBe(curatorId.value);
103 expect(receivedEvent!.dateTimeOccurred).toBeInstanceOf(Date);
104 }, 15000);
105
106 it('should publish and receive multiple events in sequence', async () => {
107 // Arrange
108 const receivedEvents: CardAddedToLibraryEvent[] = [];
109 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
110 handle: jest
111 .fn()
112 .mockImplementation(async (event: CardAddedToLibraryEvent) => {
113 receivedEvents.push(event);
114 return ok(undefined);
115 }),
116 };
117
118 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
119 await subscriber.start();
120
121 // Create multiple test events
122 const events = [
123 CardAddedToLibraryEvent.create(
124 CardId.createFromString('card-1').unwrap(),
125 CuratorId.create('did:plc:user1').unwrap(),
126 ).unwrap(),
127 CardAddedToLibraryEvent.create(
128 CardId.createFromString('card-2').unwrap(),
129 CuratorId.create('did:plc:user2').unwrap(),
130 ).unwrap(),
131 CardAddedToLibraryEvent.create(
132 CardId.createFromString('card-3').unwrap(),
133 CuratorId.create('did:plc:user3').unwrap(),
134 ).unwrap(),
135 ];
136
137 // Act - Publish all events
138 const publishResult = await publisher.publishEvents(events);
139
140 // Assert - Publishing succeeded
141 expect(publishResult.isOk()).toBe(true);
142
143 // Wait for event processing
144 await new Promise((resolve) => setTimeout(resolve, 3000));
145
146 // Assert - All events were received
147 expect(mockHandler.handle).toHaveBeenCalledTimes(3);
148 expect(receivedEvents).toHaveLength(3);
149
150 // Verify each event was processed correctly
151 const cardIds = receivedEvents.map((e) => e.cardId.getStringValue());
152 expect(cardIds).toContain('card-1');
153 expect(cardIds).toContain('card-2');
154 expect(cardIds).toContain('card-3');
155 }, 20000);
156
157 it('should handle event processing failures gracefully', async () => {
158 // Arrange
159 let callCount = 0;
160 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
161 handle: jest.fn().mockImplementation(async () => {
162 callCount++;
163 if (callCount === 1) {
164 // First call fails
165 return err(new Error('Processing failed'));
166 }
167 // Subsequent calls succeed (for retry)
168 return ok(undefined);
169 }),
170 };
171
172 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
173 await subscriber.start();
174
175 const event = CardAddedToLibraryEvent.create(
176 CardId.createFromString('failing-card').unwrap(),
177 CuratorId.create('did:plc:failuser').unwrap(),
178 ).unwrap();
179
180 // Act - Publish event that will initially fail
181 const publishResult = await publisher.publishEvents([event]);
182
183 // Assert - Publishing succeeded (failure happens during processing)
184 expect(publishResult.isOk()).toBe(true);
185
186 // Wait for initial processing and potential retries
187 await new Promise((resolve) => setTimeout(resolve, 5000));
188
189 // Assert - Handler was called (at least once for initial attempt)
190 expect(mockHandler.handle).toHaveBeenCalled();
191 }, 25000);
192
193 it('should not process events when no handler is registered', async () => {
194 // Arrange - Start subscriber without registering any handlers
195 await subscriber.start();
196
197 const event = CardAddedToLibraryEvent.create(
198 CardId.createFromString('unhandled-card').unwrap(),
199 CuratorId.create('did:plc:unhandleduser').unwrap(),
200 ).unwrap();
201
202 // Act - Publish event
203 const publishResult = await publisher.publishEvents([event]);
204
205 // Assert - Publishing succeeded (no handler doesn't prevent publishing)
206 expect(publishResult.isOk()).toBe(true);
207
208 // Wait to ensure no processing occurs
209 await new Promise((resolve) => setTimeout(resolve, 2000));
210
211 // No assertions needed - test passes if no errors are thrown
212 }, 10000);
213
214 it('should maintain event data integrity during serialization/deserialization', async () => {
215 // Arrange
216 let receivedEvent: CardAddedToLibraryEvent | null = null;
217 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
218 handle: jest
219 .fn()
220 .mockImplementation(async (event: CardAddedToLibraryEvent) => {
221 receivedEvent = event;
222 return ok(undefined);
223 }),
224 };
225
226 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
227 await subscriber.start();
228
229 // Create event with specific data
230 const originalCardId = CardId.createFromString(
231 'integrity-test-card-456',
232 ).unwrap();
233 const originalCuratorId = CuratorId.create(
234 'did:plc:integrityuser789',
235 ).unwrap();
236 const originalEvent = CardAddedToLibraryEvent.create(
237 originalCardId,
238 originalCuratorId,
239 ).unwrap();
240 const originalTimestamp = originalEvent.dateTimeOccurred;
241
242 // Act
243 await publisher.publishEvents([originalEvent]);
244 await new Promise((resolve) => setTimeout(resolve, 2000));
245
246 // Assert - Event data was preserved through serialization/deserialization
247 expect(receivedEvent).not.toBeNull();
248 expect(receivedEvent!.cardId.getStringValue()).toBe(
249 originalCardId.getStringValue(),
250 );
251 expect(receivedEvent!.curatorId.value).toBe(originalCuratorId.value);
252 expect(receivedEvent!.dateTimeOccurred.getTime()).toBe(
253 originalTimestamp.getTime(),
254 );
255 expect(receivedEvent!.getAggregateId().toString()).toBe(
256 originalCardId.getValue().toString(),
257 );
258 }, 15000);
259 });
260
261 describe('Queue Configuration', () => {
262 it('should route events to the feeds queue', async () => {
263 // This test verifies the queue routing logic by checking Redis directly
264 const event = CardAddedToLibraryEvent.create(
265 CardId.createFromString('queue-test-card').unwrap(),
266 CuratorId.create('did:plc:queueuser').unwrap(),
267 ).unwrap();
268
269 await publisher.publishEvents([event]);
270
271 // Create a Queue instance to check job counts
272 const eventsQueue = new Queue(QueueNames.FEEDS, { connection: redis });
273
274 // Wait for job to be added
275 await new Promise((resolve) => setTimeout(resolve, 100));
276
277 // Check total number of jobs (regardless of state)
278 const jobCounts = await eventsQueue.getJobCounts();
279 const totalJobs = Object.values(jobCounts).reduce(
280 (sum, count) => sum + count,
281 0,
282 );
283
284 expect(totalJobs).toBeGreaterThanOrEqual(1);
285
286 await eventsQueue.close();
287 }, 10000);
288 });
289
290 describe('Redis-Based Saga Integration', () => {
291 it('should handle distributed saga state across multiple workers', async () => {
292 // Arrange - Create two saga instances (simulating multiple workers)
293 const mockUseCase = {
294 execute: jest
295 .fn()
296 .mockResolvedValue(ok({ activityId: 'test-activity' })),
297 } as any;
298
299 const stateStore = new RedisSagaStateStore(redis);
300 const saga1 = new CardCollectionSaga(mockUseCase, stateStore);
301 const saga2 = new CardCollectionSaga(mockUseCase, stateStore);
302
303 // Create test events for same card/user (should be aggregated)
304 const cardId = CardId.createFromString('saga-test-card').unwrap();
305 const curatorId = CuratorId.create('did:plc:sagatest').unwrap();
306
307 const libraryEvent = CardAddedToLibraryEvent.create(
308 cardId,
309 curatorId,
310 ).unwrap();
311 const collectionEvent = CardAddedToCollectionEvent.create(
312 cardId,
313 CollectionId.createFromString('test-collection').unwrap(),
314 curatorId,
315 ).unwrap();
316
317 // Act - Process events with different saga instances
318 const result1 = await saga1.handleCardEvent(libraryEvent);
319 const result2 = await saga2.handleCardEvent(collectionEvent);
320
321 // Assert - Both operations succeeded
322 expect(result1.isOk()).toBe(true);
323 expect(result2.isOk()).toBe(true);
324
325 // Wait for aggregation window
326 await new Promise((resolve) => setTimeout(resolve, 3500));
327
328 // Assert - Only one aggregated activity was created
329 expect(mockUseCase.execute).toHaveBeenCalledTimes(1);
330
331 const call = mockUseCase.execute.mock.calls[0][0];
332 expect(call.cardId).toBe(cardId.getStringValue());
333 expect(call.actorId).toBe(curatorId.value);
334 expect(call.collectionIds).toContain('test-collection');
335 }, 15000);
336
337 it('should handle concurrent lock contention with retry mechanism', async () => {
338 // Arrange - Create multiple saga instances
339 const mockUseCase = {
340 execute: jest
341 .fn()
342 .mockResolvedValue(ok({ activityId: 'test-activity' })),
343 } as any;
344
345 const stateStore = new RedisSagaStateStore(redis);
346 const saga1 = new CardCollectionSaga(mockUseCase, stateStore);
347 const saga2 = new CardCollectionSaga(mockUseCase, stateStore);
348 const saga3 = new CardCollectionSaga(mockUseCase, stateStore);
349
350 // Create events for same card/user (will compete for same lock)
351 const cardId = CardId.createFromString('concurrent-test-card').unwrap();
352 const curatorId = CuratorId.create('did:plc:concurrentuser').unwrap();
353 const collectionId1 =
354 CollectionId.createFromString('collection-1').unwrap();
355 const collectionId2 =
356 CollectionId.createFromString('collection-2').unwrap();
357
358 const events = [
359 CardAddedToLibraryEvent.create(cardId, curatorId).unwrap(),
360 CardAddedToCollectionEvent.create(
361 cardId,
362 collectionId1,
363 curatorId,
364 ).unwrap(),
365 CardAddedToCollectionEvent.create(
366 cardId,
367 collectionId2,
368 curatorId,
369 ).unwrap(),
370 ];
371
372 // Act - Process all events concurrently (not sequentially)
373 const results = await Promise.all([
374 saga1.handleCardEvent(events[0]!),
375 saga2.handleCardEvent(events[1]!),
376 saga3.handleCardEvent(events[2]!),
377 ]);
378
379 // Assert - All should succeed (no events dropped due to lock contention)
380 results.forEach((result) => expect(result.isOk()).toBe(true));
381
382 // Wait for aggregation window
383 await new Promise((resolve) => setTimeout(resolve, 3500));
384
385 // Assert - Should create single activity with all collections
386 expect(mockUseCase.execute).toHaveBeenCalledTimes(1);
387 const call = mockUseCase.execute.mock.calls[0][0];
388 expect(call.cardId).toBe(cardId.getStringValue());
389 expect(call.actorId).toBe(curatorId.value);
390 expect(call.collectionIds).toHaveLength(2);
391 expect(call.collectionIds).toContain('collection-1');
392 expect(call.collectionIds).toContain('collection-2');
393 }, 20000);
394
395 it('should handle high concurrency with many simultaneous events', async () => {
396 // Arrange - Create many saga instances
397 const mockUseCase = {
398 execute: jest
399 .fn()
400 .mockResolvedValue(ok({ activityId: 'test-activity' })),
401 } as any;
402
403 const stateStore = new RedisSagaStateStore(redis);
404 const cardId = CardId.createFromString('high-concurrency-card').unwrap();
405 const curatorId = CuratorId.create('did:plc:highconcurrency').unwrap();
406
407 // Create 10 collection events that will all compete for the same lock
408 const events = Array.from({ length: 10 }, (_, i) => {
409 const saga = new CardCollectionSaga(mockUseCase, stateStore);
410 const collectionId = CollectionId.createFromString(
411 `collection-${i}`,
412 ).unwrap();
413 const event = CardAddedToCollectionEvent.create(
414 cardId,
415 collectionId,
416 curatorId,
417 ).unwrap();
418 return { saga, event };
419 });
420
421 // Add one library event
422 const librarySaga = new CardCollectionSaga(mockUseCase, stateStore);
423 const libraryEvent = CardAddedToLibraryEvent.create(
424 cardId,
425 curatorId,
426 ).unwrap();
427
428 // Act - Process all events concurrently
429 const allPromises = [
430 librarySaga.handleCardEvent(libraryEvent),
431 ...events.map(({ saga, event }) => saga.handleCardEvent(event)),
432 ];
433
434 const results = await Promise.all(allPromises);
435
436 // Assert - All should succeed
437 results.forEach((result) => expect(result.isOk()).toBe(true));
438
439 // Wait longer for aggregation window and retry processing
440 await new Promise((resolve) => setTimeout(resolve, 8000)); // Increased from 3500ms
441
442 // Assert - Should create single activity with all 10 collections
443 expect(mockUseCase.execute).toHaveBeenCalledTimes(1);
444 const call = mockUseCase.execute.mock.calls[0][0];
445 expect(call.cardId).toBe(cardId.getStringValue());
446 expect(call.actorId).toBe(curatorId.value);
447 expect(call.collectionIds).toHaveLength(10);
448
449 // Verify all collection IDs are present
450 for (let i = 0; i < 10; i++) {
451 expect(call.collectionIds).toContain(`collection-${i}`);
452 }
453 }, 35000); // Increased timeout from 25000ms
454
455 it('should recover when lock expires due to timeout', async () => {
456 // Arrange
457 const mockUseCase = {
458 execute: jest
459 .fn()
460 .mockResolvedValue(ok({ activityId: 'test-activity' })),
461 } as any;
462
463 const stateStore = new RedisSagaStateStore(redis);
464 const cardId = CardId.createFromString('timeout-test-card').unwrap();
465 const curatorId = CuratorId.create('did:plc:timeoutuser').unwrap();
466
467 // Manually acquire lock with short TTL to simulate crashed worker
468 const lockKey = 'saga:feed:lock:timeout-test-card-did:plc:timeoutuser';
469 await stateStore.set(lockKey, '1', 'EX', 2, 'NX'); // 2 second TTL (increased from 1)
470
471 // Create saga and event
472 const saga = new CardCollectionSaga(mockUseCase, stateStore);
473 const event = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap();
474
475 // Act - Try to process event (should initially be blocked by lock)
476 // But should succeed after lock expires and retry mechanism kicks in
477 const result = await saga.handleCardEvent(event);
478
479 // Assert - Should succeed after lock expires
480 expect(result.isOk()).toBe(true);
481
482 // Wait longer for aggregation window
483 await new Promise((resolve) => setTimeout(resolve, 5000)); // Increased from 3500ms
484
485 // Assert - Activity should be created
486 expect(mockUseCase.execute).toHaveBeenCalledTimes(1);
487 }, 20000); // Increased timeout from 15000ms
488
489 it('should handle retry mechanism under lock contention', async () => {
490 // Arrange
491 const mockUseCase = {
492 execute: jest
493 .fn()
494 .mockResolvedValue(ok({ activityId: 'test-activity' })),
495 } as any;
496
497 const stateStore = new RedisSagaStateStore(redis);
498 const cardId = CardId.createFromString('retry-test-card').unwrap();
499 const curatorId = CuratorId.create('did:plc:retryuser').unwrap();
500
501 // Create a saga that will hold the lock for a while
502 const lockHoldingSaga = new CardCollectionSaga(mockUseCase, stateStore);
503 const retryingSaga = new CardCollectionSaga(mockUseCase, stateStore);
504
505 const event1 = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap();
506 const event2 = CardAddedToCollectionEvent.create(
507 cardId,
508 CollectionId.createFromString('retry-collection').unwrap(),
509 curatorId,
510 ).unwrap();
511
512 // Act - Start first saga (will acquire lock)
513 const promise1 = lockHoldingSaga.handleCardEvent(event1);
514
515 // Immediately start second saga (will need to retry)
516 const promise2 = retryingSaga.handleCardEvent(event2);
517
518 const results = await Promise.all([promise1, promise2]);
519
520 // Assert - Both should succeed
521 expect(results[0].isOk()).toBe(true);
522 expect(results[1].isOk()).toBe(true);
523
524 // Wait for aggregation window
525 await new Promise((resolve) => setTimeout(resolve, 3500));
526
527 // Assert - Should create single aggregated activity
528 expect(mockUseCase.execute).toHaveBeenCalledTimes(1);
529 const call = mockUseCase.execute.mock.calls[0][0];
530 expect(call.collectionIds).toContain('retry-collection');
531 }, 20000);
532 });
533
534 describe('Multi-Queue Event Routing', () => {
535 it('should route events to multiple queues', async () => {
536 // Stop the shared subscriber to avoid interference
537 await subscriber.stop();
538
539 // Clear any pending jobs
540 await redis.flushall();
541
542 // Arrange - Create subscribers for different queues
543 const feedsSubscriber = new BullMQEventSubscriber(redis, {
544 queueName: QueueNames.FEEDS,
545 });
546 const searchSubscriber = new BullMQEventSubscriber(redis, {
547 queueName: QueueNames.SEARCH,
548 });
549
550 const feedsHandler = {
551 handle: jest.fn().mockResolvedValue(ok(undefined)),
552 };
553 const searchHandler = {
554 handle: jest.fn().mockResolvedValue(ok(undefined)),
555 };
556
557 await feedsSubscriber.subscribe(
558 EventNames.CARD_ADDED_TO_LIBRARY,
559 feedsHandler,
560 );
561 await searchSubscriber.subscribe(
562 EventNames.CARD_ADDED_TO_LIBRARY,
563 searchHandler,
564 );
565
566 await feedsSubscriber.start();
567 await searchSubscriber.start();
568
569 // Give workers time to initialize
570 await new Promise((resolve) => setTimeout(resolve, 500));
571
572 // Act - Publish single event
573 const event = CardAddedToLibraryEvent.create(
574 CardId.createFromString('multi-queue-card').unwrap(),
575 CuratorId.create('did:plc:multiuser').unwrap(),
576 ).unwrap();
577
578 await publisher.publishEvents([event]);
579
580 // Wait for processing
581 await new Promise((resolve) => setTimeout(resolve, 3000));
582
583 // Assert - Event processed by both queues
584 expect(feedsHandler.handle).toHaveBeenCalledTimes(1);
585 expect(searchHandler.handle).toHaveBeenCalledTimes(1);
586
587 // Cleanup
588 await feedsSubscriber.stop();
589 await searchSubscriber.stop();
590 }, 15000);
591 });
592});