A social knowledge tool for researchers built on ATProto
1import { RedisContainer, StartedRedisContainer } from '@testcontainers/redis'; 2import Redis from 'ioredis'; 3import { BullMQEventPublisher } from '../../../../shared/infrastructure/events/BullMQEventPublisher'; 4import { BullMQEventSubscriber } from '../../../../shared/infrastructure/events/BullMQEventSubscriber'; 5import { CardAddedToLibraryEvent } from '../../domain/events/CardAddedToLibraryEvent'; 6import { CardId } from '../../domain/value-objects/CardId'; 7import { CuratorId } from '../../domain/value-objects/CuratorId'; 8import { IEventHandler } from '../../../../shared/application/events/IEventSubscriber'; 9import { ok, err } from '../../../../shared/core/Result'; 10import { EventNames } from '../../../../shared/infrastructure/events/EventConfig'; 11import { Queue } from 'bullmq'; 12import { QueueNames } from 'src/shared/infrastructure/events/QueueConfig'; 13 14describe('BullMQ Event System Integration', () => { 15 let redisContainer: StartedRedisContainer; 16 let redis: Redis; 17 let publisher: BullMQEventPublisher; 18 let subscriber: BullMQEventSubscriber; 19 20 beforeAll(async () => { 21 // Start Redis container 22 redisContainer = await new RedisContainer('redis:7-alpine') 23 .withExposedPorts(6379) 24 .start(); 25 26 // Create Redis connection 27 const connectionUrl = redisContainer.getConnectionUrl(); 28 redis = new Redis(connectionUrl, { maxRetriesPerRequest: null }); 29 30 // Create publisher and subscriber 31 publisher = new BullMQEventPublisher(redis); 32 subscriber = new BullMQEventSubscriber(redis, { 33 queueName: QueueNames.FEEDS, 34 }); 35 }, 60000); // Increase timeout for container startup 36 37 afterAll(async () => { 38 // Clean up 39 if (subscriber) { 40 await subscriber.stop(); 41 } 42 if (publisher) { 43 await publisher.close(); 44 } 45 if (redis) { 46 await redis.quit(); 47 } 48 if (redisContainer) { 49 await redisContainer.stop(); 50 } 51 }); 52 53 beforeEach(async () => { 54 // Clear Redis data between tests 55 await redis.flushall(); 56 }); 57 58 describe('Event Publishing and Subscription', () => { 59 it('should publish and receive CardAddedToLibraryEvent', async () => { 60 // Arrange 61 const receivedEvents: CardAddedToLibraryEvent[] = []; 62 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 63 handle: jest 64 .fn() 65 .mockImplementation(async (event: CardAddedToLibraryEvent) => { 66 receivedEvents.push(event); 67 return ok(undefined); 68 }), 69 }; 70 71 // Subscribe to events 72 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 73 await subscriber.start(); 74 75 // Create test event 76 const cardId = CardId.createFromString('test-card-123').unwrap(); 77 const curatorId = CuratorId.create('did:plc:testuser123').unwrap(); 78 const event = CardAddedToLibraryEvent.create(cardId, curatorId).unwrap(); 79 80 // Act - Publish event 81 const publishResult = await publisher.publishEvents([event]); 82 83 // Assert - Publishing succeeded 84 expect(publishResult.isOk()).toBe(true); 85 86 // Wait for event processing 87 await new Promise((resolve) => setTimeout(resolve, 2000)); 88 89 // Assert - Event was received and processed 90 expect(mockHandler.handle).toHaveBeenCalledTimes(1); 91 expect(receivedEvents).toHaveLength(1); 92 93 const receivedEvent = receivedEvents[0]; 94 expect(receivedEvent).toBeInstanceOf(CardAddedToLibraryEvent); 95 expect(receivedEvent!.cardId.getStringValue()).toBe( 96 cardId.getStringValue(), 97 ); 98 expect(receivedEvent!.curatorId.value).toBe(curatorId.value); 99 expect(receivedEvent!.dateTimeOccurred).toBeInstanceOf(Date); 100 }, 15000); 101 102 it('should publish and receive multiple events in sequence', async () => { 103 // Arrange 104 const receivedEvents: CardAddedToLibraryEvent[] = []; 105 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 106 handle: jest 107 .fn() 108 .mockImplementation(async (event: CardAddedToLibraryEvent) => { 109 receivedEvents.push(event); 110 return ok(undefined); 111 }), 112 }; 113 114 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 115 await subscriber.start(); 116 117 // Create multiple test events 118 const events = [ 119 CardAddedToLibraryEvent.create( 120 CardId.createFromString('card-1').unwrap(), 121 CuratorId.create('did:plc:user1').unwrap(), 122 ).unwrap(), 123 CardAddedToLibraryEvent.create( 124 CardId.createFromString('card-2').unwrap(), 125 CuratorId.create('did:plc:user2').unwrap(), 126 ).unwrap(), 127 CardAddedToLibraryEvent.create( 128 CardId.createFromString('card-3').unwrap(), 129 CuratorId.create('did:plc:user3').unwrap(), 130 ).unwrap(), 131 ]; 132 133 // Act - Publish all events 134 const publishResult = await publisher.publishEvents(events); 135 136 // Assert - Publishing succeeded 137 expect(publishResult.isOk()).toBe(true); 138 139 // Wait for event processing 140 await new Promise((resolve) => setTimeout(resolve, 3000)); 141 142 // Assert - All events were received 143 expect(mockHandler.handle).toHaveBeenCalledTimes(3); 144 expect(receivedEvents).toHaveLength(3); 145 146 // Verify each event was processed correctly 147 const cardIds = receivedEvents.map((e) => e.cardId.getStringValue()); 148 expect(cardIds).toContain('card-1'); 149 expect(cardIds).toContain('card-2'); 150 expect(cardIds).toContain('card-3'); 151 }, 20000); 152 153 it('should handle event processing failures gracefully', async () => { 154 // Arrange 155 let callCount = 0; 156 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 157 handle: jest.fn().mockImplementation(async () => { 158 callCount++; 159 if (callCount === 1) { 160 // First call fails 161 return err(new Error('Processing failed')); 162 } 163 // Subsequent calls succeed (for retry) 164 return ok(undefined); 165 }), 166 }; 167 168 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 169 await subscriber.start(); 170 171 const event = CardAddedToLibraryEvent.create( 172 CardId.createFromString('failing-card').unwrap(), 173 CuratorId.create('did:plc:failuser').unwrap(), 174 ).unwrap(); 175 176 // Act - Publish event that will initially fail 177 const publishResult = await publisher.publishEvents([event]); 178 179 // Assert - Publishing succeeded (failure happens during processing) 180 expect(publishResult.isOk()).toBe(true); 181 182 // Wait for initial processing and potential retries 183 await new Promise((resolve) => setTimeout(resolve, 5000)); 184 185 // Assert - Handler was called (at least once for initial attempt) 186 expect(mockHandler.handle).toHaveBeenCalled(); 187 }, 25000); 188 189 it('should not process events when no handler is registered', async () => { 190 // Arrange - Start subscriber without registering any handlers 191 await subscriber.start(); 192 193 const event = CardAddedToLibraryEvent.create( 194 CardId.createFromString('unhandled-card').unwrap(), 195 CuratorId.create('did:plc:unhandleduser').unwrap(), 196 ).unwrap(); 197 198 // Act - Publish event 199 const publishResult = await publisher.publishEvents([event]); 200 201 // Assert - Publishing succeeded (no handler doesn't prevent publishing) 202 expect(publishResult.isOk()).toBe(true); 203 204 // Wait to ensure no processing occurs 205 await new Promise((resolve) => setTimeout(resolve, 2000)); 206 207 // No assertions needed - test passes if no errors are thrown 208 }, 10000); 209 210 it('should maintain event data integrity during serialization/deserialization', async () => { 211 // Arrange 212 let receivedEvent: CardAddedToLibraryEvent | null = null; 213 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 214 handle: jest 215 .fn() 216 .mockImplementation(async (event: CardAddedToLibraryEvent) => { 217 receivedEvent = event; 218 return ok(undefined); 219 }), 220 }; 221 222 await subscriber.subscribe(EventNames.CARD_ADDED_TO_LIBRARY, mockHandler); 223 await subscriber.start(); 224 225 // Create event with specific data 226 const originalCardId = CardId.createFromString( 227 'integrity-test-card-456', 228 ).unwrap(); 229 const originalCuratorId = CuratorId.create( 230 'did:plc:integrityuser789', 231 ).unwrap(); 232 const originalEvent = CardAddedToLibraryEvent.create( 233 originalCardId, 234 originalCuratorId, 235 ).unwrap(); 236 const originalTimestamp = originalEvent.dateTimeOccurred; 237 238 // Act 239 await publisher.publishEvents([originalEvent]); 240 await new Promise((resolve) => setTimeout(resolve, 2000)); 241 242 // Assert - Event data was preserved through serialization/deserialization 243 expect(receivedEvent).not.toBeNull(); 244 expect(receivedEvent!.cardId.getStringValue()).toBe( 245 originalCardId.getStringValue(), 246 ); 247 expect(receivedEvent!.curatorId.value).toBe(originalCuratorId.value); 248 expect(receivedEvent!.dateTimeOccurred.getTime()).toBe( 249 originalTimestamp.getTime(), 250 ); 251 expect(receivedEvent!.getAggregateId().toString()).toBe( 252 originalCardId.getValue().toString(), 253 ); 254 }, 15000); 255 }); 256 257 describe('Queue Configuration', () => { 258 it('should route events to the feeds queue', async () => { 259 // This test verifies the queue routing logic by checking Redis directly 260 const event = CardAddedToLibraryEvent.create( 261 CardId.createFromString('queue-test-card').unwrap(), 262 CuratorId.create('did:plc:queueuser').unwrap(), 263 ).unwrap(); 264 265 await publisher.publishEvents([event]); 266 267 // Create a Queue instance to check job counts 268 const eventsQueue = new Queue(QueueNames.FEEDS, { connection: redis }); 269 270 // Wait for job to be added 271 await new Promise((resolve) => setTimeout(resolve, 100)); 272 273 // Check total number of jobs (regardless of state) 274 const jobCounts = await eventsQueue.getJobCounts(); 275 const totalJobs = Object.values(jobCounts).reduce( 276 (sum, count) => sum + count, 277 0, 278 ); 279 280 expect(totalJobs).toBeGreaterThanOrEqual(1); 281 282 await eventsQueue.close(); 283 }, 10000); 284 }); 285});