A social knowledge tool for researchers built on ATProto
1# Domain Event Abstractions in a Clean Architecture
2
3This document explains how to implement a clean, layered domain event system that adheres to clean architecture principles, with clear separation between domain, application, and infrastructure layers.
4
5## Overview
6
7Our domain event system follows a simple, direct approach where:
8
91. **Domain layer** raises events through aggregates
102. **Application layer** defines interfaces for event publishing
113. **Infrastructure layer** provides concrete implementations
124. **Use cases** explicitly publish events after successful operations
13
14## Core Principle: Dependency Inversion
15
16The key principle is **dependency inversion** - higher layers define interfaces, lower layers implement them:
17
18- **Domain Layer**: Pure business logic, no dependencies
19- **Application Layer**: Defines event publishing interfaces
20- **Infrastructure Layer**: Implements interfaces with concrete technologies
21- **Use Cases**: Orchestrate domain logic and event publishing
22
23## Architecture Layers
24
25```
26┌─────────────────────────────────────────────────────────────┐
27│ Domain Layer │
28│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
29│ │ Aggregates │ │ Domain Events │ │ Event Types │ │
30│ │ │ │ │ │ │ │
31│ │ Card.addToLib() │ │ IDomainEvent │ │ CardAdded... │ │
32│ │ ↓ addDomainEvent│ │ │ │ │ │
33│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
34└─────────────────────────────────────────────────────────────┘
35 │
36 ▼
37┌─────────────────────────────────────────────────────────────┐
38│ Application Layer │
39│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
40│ │ Use Cases │ │ Interfaces │ │ Event Handlers│ │
41│ │ │ │ │ │ │ │
42│ │ AddCardToLib... │ │ IEventPublisher │ │ Notification │ │
43│ │ ↓ publishEvents │ │ IEventSubscriber│ │ Feed Handler │ │
44│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
45└─────────────────────────────────────────────────────────────┘
46 │
47 ▼
48┌─────────────────────────────────────────────────────────────┐
49│ Infrastructure Layer │
50│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │
51│ │ BullMQPublisher │ │ BullMQSubscriber│ │ DomainEvents │ │
52│ │ (implements │ │ (implements │ │ (simplified) │ │
53│ │ IEventPublisher)│ │ IEventSubscriber│ │ │ │
54│ └─────────────────┘ └─────────────────┘ └──────────────┘ │
55└─────────────────────────────────────────────────────────────┘
56```
57
58## Layer Responsibilities
59
60### 1. Domain Layer (Pure Business Logic)
61
62The domain layer remains completely pure and has no dependencies on infrastructure:
63
64```typescript
65// src/shared/domain/events/IDomainEvent.ts - NO CHANGES
66export interface IDomainEvent {
67 dateTimeOccurred: Date;
68 getAggregateId(): UniqueEntityID;
69}
70```
71
72```typescript
73// src/modules/cards/domain/events/CardAddedToLibraryEvent.ts - NO CHANGES
74export class CardAddedToLibraryEvent implements IDomainEvent {
75 public readonly dateTimeOccurred: Date;
76
77 constructor(
78 public readonly cardId: CardId,
79 public readonly curatorId: CuratorId,
80 ) {
81 this.dateTimeOccurred = new Date();
82 }
83
84 getAggregateId(): UniqueEntityID {
85 return this.cardId.getValue();
86 }
87}
88```
89
90```typescript
91// src/shared/domain/AggregateRoot.ts - NO CHANGES
92export abstract class AggregateRoot<T> extends Entity<T> {
93 private _domainEvents: IDomainEvent[] = [];
94
95 protected addDomainEvent(domainEvent: IDomainEvent): void {
96 this._domainEvents.push(domainEvent);
97 DomainEvents.markAggregateForDispatch(this);
98 this.logDomainEventAdded(domainEvent);
99 }
100
101 // ... rest unchanged
102}
103```
104
105```typescript
106// src/modules/cards/domain/Card.ts - Domain logic unchanged
107public addToLibrary(userId: CuratorId): Result<void, CardValidationError> {
108 // ... validation logic
109
110 this.props.libraryMemberships.push({
111 curatorId: userId,
112 addedAt: new Date(),
113 });
114
115 // Domain only adds events - doesn't know about publishing
116 this.addDomainEvent(new CardAddedToLibraryEvent(this.cardId, userId));
117
118 return ok(undefined);
119}
120```
121
122### 2. Application Layer (Interfaces and Orchestration)
123
124The application layer defines interfaces and orchestrates domain logic with event publishing:
125
126```typescript
127// src/shared/application/events/IEventPublisher.ts
128import { IDomainEvent } from '../../domain/events/IDomainEvent';
129import { Result } from '../../core/Result';
130
131export interface IEventPublisher {
132 publishEvents(events: IDomainEvent[]): Promise<Result<void>>;
133}
134```
135
136```typescript
137// src/shared/application/events/IEventSubscriber.ts
138import { IDomainEvent } from '../../domain/events/IDomainEvent';
139
140export interface IEventHandler<T extends IDomainEvent> {
141 handle(event: T): Promise<Result<void>>;
142}
143
144export interface IEventSubscriber {
145 subscribe<T extends IDomainEvent>(
146 eventType: string,
147 handler: IEventHandler<T>,
148 ): Promise<void>;
149
150 start(): Promise<void>;
151 stop(): Promise<void>;
152}
153```
154
155```typescript
156// src/shared/application/BaseUseCase.ts
157import { IEventPublisher } from './events/IEventPublisher';
158import { DomainEvents } from '../domain/events/DomainEvents';
159import { AggregateRoot } from '../domain/AggregateRoot';
160import { Result, ok, err } from '../core/Result';
161
162export abstract class BaseUseCase {
163 constructor(protected eventPublisher: IEventPublisher) {}
164
165 protected async publishEventsForAggregate(
166 aggregate: AggregateRoot<any>,
167 ): Promise<Result<void>> {
168 const events = DomainEvents.getEventsForAggregate(aggregate.id);
169
170 if (events.length === 0) {
171 return ok(undefined);
172 }
173
174 const publishResult = await this.eventPublisher.publishEvents(events);
175
176 if (publishResult.isOk()) {
177 DomainEvents.clearEventsForAggregate(aggregate.id);
178 }
179
180 return publishResult;
181 }
182}
183```
184
185### 3. Infrastructure Layer (Concrete Implementations)
186
187The infrastructure layer provides concrete implementations of the application interfaces:
188
189```typescript
190// src/shared/infrastructure/events/BullMQEventPublisher.ts
191import { Queue } from 'bullmq';
192import Redis from 'ioredis';
193import { IEventPublisher } from '../../application/events/IEventPublisher';
194import { IDomainEvent } from '../../domain/events/IDomainEvent';
195import { Result, ok, err } from '../../core/Result';
196
197export class BullMQEventPublisher implements IEventPublisher {
198 private queues: Map<string, Queue> = new Map();
199
200 constructor(private redisConnection: Redis) {}
201
202 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> {
203 try {
204 for (const event of events) {
205 await this.publishSingleEvent(event);
206 }
207 return ok(undefined);
208 } catch (error) {
209 return err(error as Error);
210 }
211 }
212
213 private async publishSingleEvent(event: IDomainEvent): Promise<void> {
214 const queueConfig = this.getQueueConfig(event);
215
216 if (!this.queues.has(queueConfig.name)) {
217 this.queues.set(
218 queueConfig.name,
219 new Queue(queueConfig.name, {
220 connection: this.redisConnection,
221 defaultJobOptions: queueConfig.options,
222 }),
223 );
224 }
225
226 const queue = this.queues.get(queueConfig.name)!;
227 await queue.add(event.constructor.name, {
228 ...this.serializeEvent(event),
229 eventType: event.constructor.name,
230 aggregateId: event.getAggregateId().toString(),
231 dateTimeOccurred: event.dateTimeOccurred.toISOString(),
232 });
233 }
234
235 private getQueueConfig(event: IDomainEvent) {
236 // Route events to appropriate queues
237 if (event.constructor.name === 'CardAddedToLibraryEvent') {
238 return {
239 name: 'notifications',
240 options: {
241 priority: 1,
242 attempts: 5,
243 backoff: { type: 'exponential' as const, delay: 1000 },
244 removeOnComplete: 100,
245 removeOnFail: 50,
246 },
247 };
248 }
249
250 return {
251 name: 'events',
252 options: {
253 priority: 2,
254 attempts: 3,
255 backoff: { type: 'exponential' as const, delay: 2000 },
256 removeOnComplete: 50,
257 removeOnFail: 25,
258 },
259 };
260 }
261
262 private serializeEvent(event: IDomainEvent): any {
263 return {
264 ...event,
265 // Handle value objects serialization
266 cardId: (event as any).cardId?.getValue?.()?.toString(),
267 curatorId: (event as any).curatorId?.value,
268 };
269 }
270}
271```
272
273```typescript
274// src/shared/infrastructure/events/BullMQEventSubscriber.ts
275import { Worker, Job } from 'bullmq';
276import Redis from 'ioredis';
277import {
278 IEventSubscriber,
279 IEventHandler,
280} from '../../application/events/IEventSubscriber';
281import { IDomainEvent } from '../../domain/events/IDomainEvent';
282import { CardAddedToLibraryEvent } from '../../../modules/cards/domain/events/CardAddedToLibraryEvent';
283import { CardId } from '../../../modules/cards/domain/value-objects/CardId';
284import { CuratorId } from '../../../modules/cards/domain/value-objects/CuratorId';
285
286export class BullMQEventSubscriber implements IEventSubscriber {
287 private workers: Worker[] = [];
288 private handlers: Map<string, IEventHandler<any>> = new Map();
289
290 constructor(private redisConnection: Redis) {}
291
292 async subscribe<T extends IDomainEvent>(
293 eventType: string,
294 handler: IEventHandler<T>,
295 ): Promise<void> {
296 this.handlers.set(eventType, handler);
297 }
298
299 async start(): Promise<void> {
300 // Start workers for different queues
301 const queues = ['notifications', 'events'];
302
303 for (const queueName of queues) {
304 const worker = new Worker(
305 queueName,
306 async (job: Job) => {
307 await this.processJob(job);
308 },
309 {
310 connection: this.redisConnection,
311 concurrency: queueName === 'notifications' ? 5 : 15,
312 },
313 );
314
315 worker.on('completed', (job) => {
316 console.log(`Job ${job.id} completed successfully`);
317 });
318
319 worker.on('failed', (job, err) => {
320 console.error(`Job ${job?.id} failed:`, err);
321 });
322
323 this.workers.push(worker);
324 }
325 }
326
327 async stop(): Promise<void> {
328 await Promise.all(this.workers.map((worker) => worker.close()));
329 this.workers = [];
330 }
331
332 private async processJob(job: Job): Promise<void> {
333 const eventData = job.data;
334 const eventType = eventData.eventType;
335
336 const handler = this.handlers.get(eventType);
337 if (!handler) {
338 console.warn(`No handler registered for event type: ${eventType}`);
339 return;
340 }
341
342 const event = this.reconstructEvent(eventData);
343 const result = await handler.handle(event);
344
345 if (result.isErr()) {
346 throw result.error;
347 }
348 }
349
350 private reconstructEvent(eventData: any): IDomainEvent {
351 if (eventData.eventType === 'CardAddedToLibraryEvent') {
352 const cardId = CardId.create(eventData.cardId).unwrap();
353 const curatorId = CuratorId.create(eventData.curatorId).unwrap();
354
355 const event = new CardAddedToLibraryEvent(cardId, curatorId);
356 (event as any).dateTimeOccurred = new Date(eventData.dateTimeOccurred);
357
358 return event;
359 }
360
361 throw new Error(`Unknown event type: ${eventData.eventType}`);
362 }
363}
364```
365
366## Use Case Implementation
367
368Use cases orchestrate domain logic and event publishing through dependency injection:
369
370```typescript
371// src/modules/cards/application/use-cases/AddCardToLibraryUseCase.ts
372import { BaseUseCase } from '../../../../shared/application/BaseUseCase';
373import { UseCase } from '../../../../shared/core/UseCase';
374import { Result, ok, err } from '../../../../shared/core/Result';
375import { IEventPublisher } from '../../../../shared/application/events/IEventPublisher';
376import { ICardRepository } from '../../domain/ICardRepository';
377import { CardId } from '../../domain/value-objects/CardId';
378import { CuratorId } from '../../domain/value-objects/CuratorId';
379
380interface AddCardToLibraryRequest {
381 cardId: string;
382 userId: string;
383}
384
385export class AddCardToLibraryUseCase
386 extends BaseUseCase
387 implements UseCase<AddCardToLibraryRequest, Result<void>>
388{
389 constructor(
390 private cardRepository: ICardRepository,
391 eventPublisher: IEventPublisher, // Interface injected
392 ) {
393 super(eventPublisher);
394 }
395
396 async execute(request: AddCardToLibraryRequest): Promise<Result<void>> {
397 // 1. Get the card
398 const cardResult = await this.cardRepository.findById(
399 CardId.create(request.cardId).unwrap(),
400 );
401 if (cardResult.isErr()) {
402 return err(cardResult.error);
403 }
404
405 const card = cardResult.value;
406 if (!card) {
407 return err(new Error('Card not found'));
408 }
409
410 // 2. Execute domain logic (adds events to aggregate)
411 const curatorId = CuratorId.create(request.userId).unwrap();
412 const addResult = card.addToLibrary(curatorId);
413 if (addResult.isErr()) {
414 return err(addResult.error);
415 }
416
417 // 3. Save to repository
418 const saveResult = await this.cardRepository.save(card);
419 if (saveResult.isErr()) {
420 return err(saveResult.error);
421 }
422
423 // 4. Publish events after successful save
424 const publishResult = await this.publishEventsForAggregate(card);
425 if (publishResult.isErr()) {
426 console.error('Failed to publish events:', publishResult.error);
427 // Don't fail the operation if event publishing fails
428 }
429
430 return ok(undefined);
431 }
432}
433```
434
435## Event Handler Implementation
436
437Event handlers implement the application interface and contain business logic:
438
439```typescript
440// src/modules/notifications/application/eventHandlers/CardAddedToLibraryEventHandler.ts
441import { IEventHandler } from '../../../../shared/application/events/IEventSubscriber';
442import { CardAddedToLibraryEvent } from '../../../cards/domain/events/CardAddedToLibraryEvent';
443import { INotificationService } from '../ports/INotificationService';
444import { Result } from '../../../../shared/core/Result';
445
446export class CardAddedToLibraryEventHandler
447 implements IEventHandler<CardAddedToLibraryEvent>
448{
449 constructor(private notificationService: INotificationService) {}
450
451 async handle(event: CardAddedToLibraryEvent): Promise<Result<void>> {
452 return await this.notificationService.processCardAddedToLibrary(event);
453 }
454}
455```
456
457## Dependency Injection and Service Factory
458
459The service factory wires up concrete implementations:
460
461```typescript
462// src/shared/infrastructure/ServiceFactory.ts
463import { IEventPublisher } from '../application/events/IEventPublisher';
464import { IEventSubscriber } from '../application/events/IEventSubscriber';
465import { BullMQEventPublisher } from './events/BullMQEventPublisher';
466import { BullMQEventSubscriber } from './events/BullMQEventSubscriber';
467import { AddCardToLibraryUseCase } from '../../modules/cards/application/use-cases/AddCardToLibraryUseCase';
468import { CardAddedToLibraryEventHandler as NotificationHandler } from '../../modules/notifications/application/eventHandlers/CardAddedToLibraryEventHandler';
469import { CardAddedToLibraryEventHandler as FeedHandler } from '../../modules/feeds/application/eventHandlers/CardAddedToLibraryEventHandler';
470
471export class ServiceFactory {
472 static create(
473 configService: EnvironmentConfigService,
474 repositories: Repositories,
475 ): Services {
476 // Infrastructure - Redis connection
477 const redisConnection = createRedisConnection();
478
479 // Infrastructure - Event publisher implementation
480 const eventPublisher: IEventPublisher = new BullMQEventPublisher(
481 redisConnection,
482 );
483
484 // Infrastructure - Event subscriber implementation
485 const eventSubscriber: IEventSubscriber = new BullMQEventSubscriber(
486 redisConnection,
487 );
488
489 // Application - Use cases with injected interfaces
490 const addCardToLibraryUseCase = new AddCardToLibraryUseCase(
491 repositories.cardRepository,
492 eventPublisher, // Interface injected
493 );
494
495 // Application - Event handlers
496 const notificationHandler = new NotificationHandler(notificationService);
497 const feedHandler = new FeedHandler(feedService);
498
499 // Register event handlers with subscriber
500 eventSubscriber.subscribe('CardAddedToLibraryEvent', notificationHandler);
501 eventSubscriber.subscribe('CardAddedToLibraryEvent', feedHandler);
502
503 return {
504 // Use cases
505 addCardToLibraryUseCase,
506
507 // Event system
508 eventPublisher,
509 eventSubscriber,
510
511 // Event handlers
512 notificationHandler,
513 feedHandler,
514 };
515 }
516}
517```
518
519## Worker Process Setup
520
521Workers are separate processes that run the event subscriber:
522
523```typescript
524// src/workers/notification-worker.ts
525import { ServiceFactory } from '../shared/infrastructure/ServiceFactory';
526import { EnvironmentConfigService } from '../shared/infrastructure/config/EnvironmentConfigService';
527
528async function startNotificationWorker() {
529 const configService = new EnvironmentConfigService();
530 const repositories = createRepositories(configService);
531 const services = ServiceFactory.create(configService, repositories);
532
533 // Start the event subscriber
534 await services.eventSubscriber.start();
535
536 console.log('Notification worker started');
537
538 // Graceful shutdown
539 process.on('SIGTERM', async () => {
540 console.log('Shutting down notification worker...');
541 await services.eventSubscriber.stop();
542 process.exit(0);
543 });
544}
545
546startNotificationWorker().catch(console.error);
547```
548
549## Testing Strategy
550
551### 1. Domain Tests (Unchanged)
552
553Domain tests remain pure and don't need infrastructure:
554
555```typescript
556describe('Card', () => {
557 it('should raise CardAddedToLibraryEvent when added to library', () => {
558 const card = Card.create(cardProps).unwrap();
559 const userId = CuratorId.create('did:plc:user123').unwrap();
560
561 card.addToLibrary(userId);
562
563 expect(card.domainEvents).toHaveLength(1);
564 expect(card.domainEvents[0]).toBeInstanceOf(CardAddedToLibraryEvent);
565 });
566});
567```
568
569### 2. Use Case Tests (Mock Interfaces)
570
571Use case tests mock the application interfaces:
572
573```typescript
574describe('AddCardToLibraryUseCase', () => {
575 it('should publish events after successful save', async () => {
576 const mockEventPublisher: IEventPublisher = {
577 publishEvents: jest.fn().mockResolvedValue(ok(undefined)),
578 };
579
580 const mockCardRepository: ICardRepository = {
581 findById: jest.fn().mockResolvedValue(ok(mockCard)),
582 save: jest.fn().mockResolvedValue(ok(undefined)),
583 };
584
585 const useCase = new AddCardToLibraryUseCase(
586 mockCardRepository,
587 mockEventPublisher,
588 );
589
590 const result = await useCase.execute({
591 cardId: 'card-123',
592 userId: 'user-456',
593 });
594
595 expect(result.isOk()).toBe(true);
596 expect(mockEventPublisher.publishEvents).toHaveBeenCalledWith(
597 expect.arrayContaining([expect.any(CardAddedToLibraryEvent)]),
598 );
599 });
600});
601```
602
603### 3. Integration Tests (Real Infrastructure)
604
605Integration tests use real implementations:
606
607```typescript
608describe('BullMQ Event System Integration', () => {
609 let eventPublisher: BullMQEventPublisher;
610 let eventSubscriber: BullMQEventSubscriber;
611 let redis: Redis;
612
613 beforeEach(async () => {
614 redis = new Redis(process.env.TEST_REDIS_URL);
615 eventPublisher = new BullMQEventPublisher(redis);
616 eventSubscriber = new BullMQEventSubscriber(redis);
617 });
618
619 it('should publish and process events end-to-end', async () => {
620 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = {
621 handle: jest.fn().mockResolvedValue(ok(undefined)),
622 };
623
624 await eventSubscriber.subscribe('CardAddedToLibraryEvent', mockHandler);
625 await eventSubscriber.start();
626
627 const event = new CardAddedToLibraryEvent(cardId, userId);
628 await eventPublisher.publishEvents([event]);
629
630 // Wait for processing
631 await new Promise((resolve) => setTimeout(resolve, 1000));
632
633 expect(mockHandler.handle).toHaveBeenCalledWith(
634 expect.objectContaining({
635 cardId: expect.any(CardId),
636 curatorId: expect.any(CuratorId),
637 }),
638 );
639
640 await eventSubscriber.stop();
641 });
642});
643```
644
645## Migration Strategy
646
647### Phase 1: Create Application Interfaces
648
6491. Define `IEventPublisher` and `IEventSubscriber` interfaces
6502. Create `BaseUseCase` with event publishing logic
6513. No changes to domain or existing infrastructure
652
653### Phase 2: Implement Infrastructure
654
6551. Create `BullMQEventPublisher` and `BullMQEventSubscriber`
6562. Update `ServiceFactory` to inject concrete implementations
6573. Deploy with Redis infrastructure
658
659### Phase 3: Update Use Cases
660
6611. Extend use cases from `BaseUseCase`
6622. Inject `IEventPublisher` through constructor
6633. Replace direct event handling with publishing
664
665### Phase 4: Deploy Workers
666
6671. Create worker processes using `IEventSubscriber`
6682. Register event handlers with subscriber
6693. Scale workers based on load
670
671## Key Benefits of This Approach
672
673### 1. **Clean Architecture Compliance**
674
675- Clear separation of concerns across layers
676- Dependency inversion principle followed
677- Interfaces defined in application layer
678
679### 2. **Testability**
680
681- Easy to mock interfaces for unit tests
682- Domain tests remain pure and fast
683- Integration tests can use real implementations
684
685### 3. **Flexibility**
686
687- Can switch from BullMQ to other queue systems
688- Can add multiple publishers (e.g., event store + queue)
689- Easy to add new event types and handlers
690
691### 4. **Maintainability**
692
693- Clear contracts between layers
694- Infrastructure changes don't affect application logic
695- Easy to understand and debug
696
697### 5. **Scalability**
698
699- Publishers and subscribers can scale independently
700- Different queue configurations per event type
701- Workers can be deployed across multiple regions
702
703## Alternative Implementations
704
705The interface-based approach allows for easy swapping of implementations:
706
707```typescript
708// Alternative: In-memory publisher for testing
709export class InMemoryEventPublisher implements IEventPublisher {
710 public publishedEvents: IDomainEvent[] = [];
711
712 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> {
713 this.publishedEvents.push(...events);
714 return ok(undefined);
715 }
716}
717
718// Alternative: Event store publisher for audit
719export class EventStorePublisher implements IEventPublisher {
720 constructor(private eventStore: IEventStore) {}
721
722 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> {
723 for (const event of events) {
724 await this.eventStore.append(event);
725 }
726 return ok(undefined);
727 }
728}
729
730// Composite publisher for multiple destinations
731export class CompositeEventPublisher implements IEventPublisher {
732 constructor(private publishers: IEventPublisher[]) {}
733
734 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> {
735 for (const publisher of this.publishers) {
736 const result = await publisher.publishEvents(events);
737 if (result.isErr()) {
738 return result;
739 }
740 }
741 return ok(undefined);
742 }
743}
744```
745
746## Conclusion
747
748This clean architecture approach provides:
749
750- **Domain Purity**: Domain layer has zero infrastructure dependencies
751- **Clear Contracts**: Interfaces define exactly what each layer needs
752- **Flexibility**: Easy to swap implementations or add new ones
753- **Testability**: Mock interfaces for fast, reliable tests
754- **Scalability**: Infrastructure can scale independently
755
756The key insight is that **clean architecture principles apply to event systems too** - define interfaces in the application layer, implement them in infrastructure, and inject them through constructors. This creates a maintainable, testable, and flexible event-driven system.