A social knowledge tool for researchers built on ATProto
1import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis';
2import Redis from 'ioredis';
3import { BullMQEventPublisher } from '../../../../shared/infrastructure/events/BullMQEventPublisher';
4import { BullMQEventSubscriber } from '../../../../shared/infrastructure/events/BullMQEventSubscriber';
5import { CardAddedToLibraryEvent } from '../../domain/events/CardAddedToLibraryEvent';
6import { CardId } from '../../domain/value-objects/CardId';
7import { CuratorId } from '../../domain/value-objects/CuratorId';
8import { IEventHandler } from '../../../../shared/application/events/IEventSubscriber';
9import { ok, err } from '../../../../shared/core/Result';
10import { EventNames } from '../../../../shared/infrastructure/events/EventConfig';
11import { Queue } from 'bullmq';
12import { QueueNames } from 'src/shared/infrastructure/events/QueueConfig';
13
14describe('BullMQ Event System Integration', () => {
15 let redisContainer: StartedRedisContainer;
16 let redis: Redis;
17 let publisher: BullMQEventPublisher;
18 let subscriber: BullMQEventSubscriber;
19
20 beforeAll(async () => {
21 // Start Redis container
22 redisContainer = await new RedisContainer('redis:7-alpine')
23 .withExposedPorts(6379)
24 .start();
25
26 // Create Redis connection
27 const connectionUrl = redisContainer.getConnectionUrl();
28 redis = new Redis(connectionUrl, { maxRetriesPerRequest: null });
29
30 // Create publisher and subscriber
31 publisher = new BullMQEventPublisher(redis);
32 subscriber = new BullMQEventSubscriber(redis, {
33 queueName: QueueNames.FEEDS,
34 });
35 }, 60000); // Increase timeout for container startup
36
37 afterAll(async () => {
38 // Clean up
39 if (subscriber) {
40 await subscriber.stop();
41 }
42 if (publisher) {
43 await publisher.close();
44 }
45 if (redis) {
46 await redis.quit();
47 }
48 if (redisContainer) {
49 await redisContainer.stop();
50 }
51 });
52
53 beforeEach(async () => {
54 // Clear Redis data between tests
55 await redis.flushall();
56 });
57
58 describe('Event Publishing and Subscription', () => {
59 it('should publish and receive CardAddedToLibraryEvent', async () => {
60 // Arrange
61 const receivedEvents: CardAddedToLibraryEvent[] = [];
62 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
63 handle: jest
64 .fn()
65 .mockImplementation(async (event: CardAddedToLibraryEvent) => {
66 receivedEvents.push(event);
67 return ok(undefined);
68 }),
69 };
70
71 // Subscribe to events
72 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
73 await subscriber.start();
74
75 // Create test event
76 const cardId = CardId.createFromString('test-card-123').unwrap();
77 const curatorId = CuratorId.create('did:plc:testuser123').unwrap();
78 const event = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap();
79
80 // Act - Publish event
81 const publishResult = await publisher.publishEvents([event]);
82
83 // Assert - Publishing succeeded
84 expect(publishResult.isOk()).toBe(true);
85
86 // Wait for event processing
87 await new Promise((resolve) => setTimeout(resolve, 2000));
88
89 // Assert - Event was received and processed
90 expect(mockHandler.handle).toHaveBeenCalledTimes(1);
91 expect(receivedEvents).toHaveLength(1);
92
93 const receivedEvent = receivedEvents[0];
94 expect(receivedEvent).toBeInstanceOf(CardAddedToLibraryEvent);
95 expect(receivedEvent!.cardId.getStringValue()).toBe(
96 cardId.getStringValue(),
97 );
98 expect(receivedEvent!.curatorId.value).toBe(curatorId.value);
99 expect(receivedEvent!.dateTimeOccurred).toBeInstanceOf(Date);
100 }, 15000);
101
102 it('should publish and receive multiple events in sequence', async () => {
103 // Arrange
104 const receivedEvents: CardAddedToLibraryEvent[] = [];
105 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
106 handle: jest
107 .fn()
108 .mockImplementation(async (event: CardAddedToLibraryEvent) => {
109 receivedEvents.push(event);
110 return ok(undefined);
111 }),
112 };
113
114 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
115 await subscriber.start();
116
117 // Create multiple test events
118 const events = [
119 CardAddedToLibraryEvent.create(
120 CardId.createFromString('card-1').unwrap(),
121 CuratorId.create('did:plc:user1').unwrap(),
122 ).unwrap(),
123 CardAddedToLibraryEvent.create(
124 CardId.createFromString('card-2').unwrap(),
125 CuratorId.create('did:plc:user2').unwrap(),
126 ).unwrap(),
127 CardAddedToLibraryEvent.create(
128 CardId.createFromString('card-3').unwrap(),
129 CuratorId.create('did:plc:user3').unwrap(),
130 ).unwrap(),
131 ];
132
133 // Act - Publish all events
134 const publishResult = await publisher.publishEvents(events);
135
136 // Assert - Publishing succeeded
137 expect(publishResult.isOk()).toBe(true);
138
139 // Wait for event processing
140 await new Promise((resolve) => setTimeout(resolve, 3000));
141
142 // Assert - All events were received
143 expect(mockHandler.handle).toHaveBeenCalledTimes(3);
144 expect(receivedEvents).toHaveLength(3);
145
146 // Verify each event was processed correctly
147 const cardIds = receivedEvents.map((e) => e.cardId.getStringValue());
148 expect(cardIds).toContain('card-1');
149 expect(cardIds).toContain('card-2');
150 expect(cardIds).toContain('card-3');
151 }, 20000);
152
153 it('should handle event processing failures gracefully', async () => {
154 // Arrange
155 let callCount = 0;
156 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
157 handle: jest.fn().mockImplementation(async () => {
158 callCount++;
159 if (callCount === 1) {
160 // First call fails
161 return err(new Error('Processing failed'));
162 }
163 // Subsequent calls succeed (for retry)
164 return ok(undefined);
165 }),
166 };
167
168 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
169 await subscriber.start();
170
171 const event = CardAddedToLibraryEvent.create(
172 CardId.createFromString('failing-card').unwrap(),
173 CuratorId.create('did:plc:failuser').unwrap(),
174 ).unwrap();
175
176 // Act - Publish event that will initially fail
177 const publishResult = await publisher.publishEvents([event]);
178
179 // Assert - Publishing succeeded (failure happens during processing)
180 expect(publishResult.isOk()).toBe(true);
181
182 // Wait for initial processing and potential retries
183 await new Promise((resolve) => setTimeout(resolve, 5000));
184
185 // Assert - Handler was called (at least once for initial attempt)
186 expect(mockHandler.handle).toHaveBeenCalled();
187 }, 25000);
188
189 it('should not process events when no handler is registered', async () => {
190 // Arrange - Start subscriber without registering any handlers
191 await subscriber.start();
192
193 const event = CardAddedToLibraryEvent.create(
194 CardId.createFromString('unhandled-card').unwrap(),
195 CuratorId.create('did:plc:unhandleduser').unwrap(),
196 ).unwrap();
197
198 // Act - Publish event
199 const publishResult = await publisher.publishEvents([event]);
200
201 // Assert - Publishing succeeded (no handler doesn't prevent publishing)
202 expect(publishResult.isOk()).toBe(true);
203
204 // Wait to ensure no processing occurs
205 await new Promise((resolve) => setTimeout(resolve, 2000));
206
207 // No assertions needed - test passes if no errors are thrown
208 }, 10000);
209
210 it('should maintain event data integrity during serialization/deserialization', async () => {
211 // Arrange
212 let receivedEvent: CardAddedToLibraryEvent | null = null;
213 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
214 handle: jest
215 .fn()
216 .mockImplementation(async (event: CardAddedToLibraryEvent) => {
217 receivedEvent = event;
218 return ok(undefined);
219 }),
220 };
221
222 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler);
223 await subscriber.start();
224
225 // Create event with specific data
226 const originalCardId = CardId.createFromString(
227 'integrity-test-card-456',
228 ).unwrap();
229 const originalCuratorId = CuratorId.create(
230 'did:plc:integrityuser789',
231 ).unwrap();
232 const originalEvent = CardAddedToLibraryEvent.create(
233 originalCardId,
234 originalCuratorId,
235 ).unwrap();
236 const originalTimestamp = originalEvent.dateTimeOccurred;
237
238 // Act
239 await publisher.publishEvents([originalEvent]);
240 await new Promise((resolve) => setTimeout(resolve, 2000));
241
242 // Assert - Event data was preserved through serialization/deserialization
243 expect(receivedEvent).not.toBeNull();
244 expect(receivedEvent!.cardId.getStringValue()).toBe(
245 originalCardId.getStringValue(),
246 );
247 expect(receivedEvent!.curatorId.value).toBe(originalCuratorId.value);
248 expect(receivedEvent!.dateTimeOccurred.getTime()).toBe(
249 originalTimestamp.getTime(),
250 );
251 expect(receivedEvent!.getAggregateId().toString()).toBe(
252 originalCardId.getValue().toString(),
253 );
254 }, 15000);
255 });
256
257 describe('Queue Configuration', () => {
258 it('should route events to the feeds queue', async () => {
259 // This test verifies the queue routing logic by checking Redis directly
260 const event = CardAddedToLibraryEvent.create(
261 CardId.createFromString('queue-test-card').unwrap(),
262 CuratorId.create('did:plc:queueuser').unwrap(),
263 ).unwrap();
264
265 await publisher.publishEvents([event]);
266
267 // Create a Queue instance to check job counts
268 const eventsQueue = new Queue(QueueNames.FEEDS, { connection: redis });
269
270 // Wait for job to be added
271 await new Promise((resolve) => setTimeout(resolve, 100));
272
273 // Check total number of jobs (regardless of state)
274 const jobCounts = await eventsQueue.getJobCounts();
275 const totalJobs = Object.values(jobCounts).reduce(
276 (sum, count) => sum + count,
277 0,
278 );
279
280 expect(totalJobs).toBeGreaterThanOrEqual(1);
281
282 await eventsQueue.close();
283 }, 10000);
284 });
285});