A social knowledge tool for researchers built on ATProto
1# Domain Event Abstractions in a Clean Architecture 2 3This document explains how to implement a clean, layered domain event system that adheres to clean architecture principles, with clear separation between domain, application, and infrastructure layers. 4 5## Overview 6 7Our domain event system follows a simple, direct approach where: 8 91. **Domain layer** raises events through aggregates 102. **Application layer** defines interfaces for event publishing 113. **Infrastructure layer** provides concrete implementations 124. **Use cases** explicitly publish events after successful operations 13 14## Core Principle: Dependency Inversion 15 16The key principle is **dependency inversion** - higher layers define interfaces, lower layers implement them: 17 18- **Domain Layer**: Pure business logic, no dependencies 19- **Application Layer**: Defines event publishing interfaces 20- **Infrastructure Layer**: Implements interfaces with concrete technologies 21- **Use Cases**: Orchestrate domain logic and event publishing 22 23## Architecture Layers 24 25``` 26┌─────────────────────────────────────────────────────────────┐ 27│ Domain Layer │ 28│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │ 29│ │ Aggregates │ │ Domain Events │ │ Event Types │ │ 30│ │ │ │ │ │ │ │ 31│ │ Card.addToLib() │ │ IDomainEvent │ │ CardAdded... │ │ 32│ │ ↓ addDomainEvent│ │ │ │ │ │ 33│ └─────────────────┘ └─────────────────┘ └──────────────┘ │ 34└─────────────────────────────────────────────────────────────┘ 35 36 37┌─────────────────────────────────────────────────────────────┐ 38│ Application Layer │ 39│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │ 40│ │ Use Cases │ │ Interfaces │ │ Event Handlers│ │ 41│ │ │ │ │ │ │ │ 42│ │ AddCardToLib... │ │ IEventPublisher │ │ Notification │ │ 43│ │ ↓ publishEvents │ │ IEventSubscriber│ │ Feed Handler │ │ 44│ └─────────────────┘ └─────────────────┘ └──────────────┘ │ 45└─────────────────────────────────────────────────────────────┘ 46 47 48┌─────────────────────────────────────────────────────────────┐ 49│ Infrastructure Layer │ 50│ ┌─────────────────┐ ┌─────────────────┐ ┌──────────────┐ │ 51│ │ BullMQPublisher │ │ BullMQSubscriber│ │ DomainEvents │ │ 52│ │ (implements │ │ (implements │ │ (simplified) │ │ 53│ │ IEventPublisher)│ │ IEventSubscriber│ │ │ │ 54│ └─────────────────┘ └─────────────────┘ └──────────────┘ │ 55└─────────────────────────────────────────────────────────────┘ 56``` 57 58## Layer Responsibilities 59 60### 1. Domain Layer (Pure Business Logic) 61 62The domain layer remains completely pure and has no dependencies on infrastructure: 63 64```typescript 65// src/shared/domain/events/IDomainEvent.ts - NO CHANGES 66export interface IDomainEvent { 67 dateTimeOccurred: Date; 68 getAggregateId(): UniqueEntityID; 69} 70``` 71 72```typescript 73// src/modules/cards/domain/events/CardAddedToLibraryEvent.ts - NO CHANGES 74export class CardAddedToLibraryEvent implements IDomainEvent { 75 public readonly dateTimeOccurred: Date; 76 77 constructor( 78 public readonly cardId: CardId, 79 public readonly curatorId: CuratorId, 80 ) { 81 this.dateTimeOccurred = new Date(); 82 } 83 84 getAggregateId(): UniqueEntityID { 85 return this.cardId.getValue(); 86 } 87} 88``` 89 90```typescript 91// src/shared/domain/AggregateRoot.ts - NO CHANGES 92export abstract class AggregateRoot<T> extends Entity<T> { 93 private _domainEvents: IDomainEvent[] = []; 94 95 protected addDomainEvent(domainEvent: IDomainEvent): void { 96 this._domainEvents.push(domainEvent); 97 DomainEvents.markAggregateForDispatch(this); 98 this.logDomainEventAdded(domainEvent); 99 } 100 101 // ... rest unchanged 102} 103``` 104 105```typescript 106// src/modules/cards/domain/Card.ts - Domain logic unchanged 107public addToLibrary(userId: CuratorId): Result<void, CardValidationError> { 108 // ... validation logic 109 110 this.props.libraryMemberships.push({ 111 curatorId: userId, 112 addedAt: new Date(), 113 }); 114 115 // Domain only adds events - doesn't know about publishing 116 this.addDomainEvent(new CardAddedToLibraryEvent(this.cardId, userId)); 117 118 return ok(undefined); 119} 120``` 121 122### 2. Application Layer (Interfaces and Orchestration) 123 124The application layer defines interfaces and orchestrates domain logic with event publishing: 125 126```typescript 127// src/shared/application/events/IEventPublisher.ts 128import { IDomainEvent } from '../../domain/events/IDomainEvent'; 129import { Result } from '../../core/Result'; 130 131export interface IEventPublisher { 132 publishEvents(events: IDomainEvent[]): Promise<Result<void>>; 133} 134``` 135 136```typescript 137// src/shared/application/events/IEventSubscriber.ts 138import { IDomainEvent } from '../../domain/events/IDomainEvent'; 139 140export interface IEventHandler<T extends IDomainEvent> { 141 handle(event: T): Promise<Result<void>>; 142} 143 144export interface IEventSubscriber { 145 subscribe<T extends IDomainEvent>( 146 eventType: string, 147 handler: IEventHandler<T>, 148 ): Promise<void>; 149 150 start(): Promise<void>; 151 stop(): Promise<void>; 152} 153``` 154 155```typescript 156// src/shared/application/BaseUseCase.ts 157import { IEventPublisher } from './events/IEventPublisher'; 158import { DomainEvents } from '../domain/events/DomainEvents'; 159import { AggregateRoot } from '../domain/AggregateRoot'; 160import { Result, ok, err } from '../core/Result'; 161 162export abstract class BaseUseCase { 163 constructor(protected eventPublisher: IEventPublisher) {} 164 165 protected async publishEventsForAggregate( 166 aggregate: AggregateRoot<any>, 167 ): Promise<Result<void>> { 168 const events = DomainEvents.getEventsForAggregate(aggregate.id); 169 170 if (events.length === 0) { 171 return ok(undefined); 172 } 173 174 const publishResult = await this.eventPublisher.publishEvents(events); 175 176 if (publishResult.isOk()) { 177 DomainEvents.clearEventsForAggregate(aggregate.id); 178 } 179 180 return publishResult; 181 } 182} 183``` 184 185### 3. Infrastructure Layer (Concrete Implementations) 186 187The infrastructure layer provides concrete implementations of the application interfaces: 188 189```typescript 190// src/shared/infrastructure/events/BullMQEventPublisher.ts 191import { Queue } from 'bullmq'; 192import Redis from 'ioredis'; 193import { IEventPublisher } from '../../application/events/IEventPublisher'; 194import { IDomainEvent } from '../../domain/events/IDomainEvent'; 195import { Result, ok, err } from '../../core/Result'; 196 197export class BullMQEventPublisher implements IEventPublisher { 198 private queues: Map<string, Queue> = new Map(); 199 200 constructor(private redisConnection: Redis) {} 201 202 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> { 203 try { 204 for (const event of events) { 205 await this.publishSingleEvent(event); 206 } 207 return ok(undefined); 208 } catch (error) { 209 return err(error as Error); 210 } 211 } 212 213 private async publishSingleEvent(event: IDomainEvent): Promise<void> { 214 const queueConfig = this.getQueueConfig(event); 215 216 if (!this.queues.has(queueConfig.name)) { 217 this.queues.set( 218 queueConfig.name, 219 new Queue(queueConfig.name, { 220 connection: this.redisConnection, 221 defaultJobOptions: queueConfig.options, 222 }), 223 ); 224 } 225 226 const queue = this.queues.get(queueConfig.name)!; 227 await queue.add(event.constructor.name, { 228 ...this.serializeEvent(event), 229 eventType: event.constructor.name, 230 aggregateId: event.getAggregateId().toString(), 231 dateTimeOccurred: event.dateTimeOccurred.toISOString(), 232 }); 233 } 234 235 private getQueueConfig(event: IDomainEvent) { 236 // Route events to appropriate queues 237 if (event.constructor.name === 'CardAddedToLibraryEvent') { 238 return { 239 name: 'notifications', 240 options: { 241 priority: 1, 242 attempts: 5, 243 backoff: { type: 'exponential' as const, delay: 1000 }, 244 removeOnComplete: 100, 245 removeOnFail: 50, 246 }, 247 }; 248 } 249 250 return { 251 name: 'events', 252 options: { 253 priority: 2, 254 attempts: 3, 255 backoff: { type: 'exponential' as const, delay: 2000 }, 256 removeOnComplete: 50, 257 removeOnFail: 25, 258 }, 259 }; 260 } 261 262 private serializeEvent(event: IDomainEvent): any { 263 return { 264 ...event, 265 // Handle value objects serialization 266 cardId: (event as any).cardId?.getValue?.()?.toString(), 267 curatorId: (event as any).curatorId?.value, 268 }; 269 } 270} 271``` 272 273```typescript 274// src/shared/infrastructure/events/BullMQEventSubscriber.ts 275import { Worker, Job } from 'bullmq'; 276import Redis from 'ioredis'; 277import { 278 IEventSubscriber, 279 IEventHandler, 280} from '../../application/events/IEventSubscriber'; 281import { IDomainEvent } from '../../domain/events/IDomainEvent'; 282import { CardAddedToLibraryEvent } from '../../../modules/cards/domain/events/CardAddedToLibraryEvent'; 283import { CardId } from '../../../modules/cards/domain/value-objects/CardId'; 284import { CuratorId } from '../../../modules/cards/domain/value-objects/CuratorId'; 285 286export class BullMQEventSubscriber implements IEventSubscriber { 287 private workers: Worker[] = []; 288 private handlers: Map<string, IEventHandler<any>> = new Map(); 289 290 constructor(private redisConnection: Redis) {} 291 292 async subscribe<T extends IDomainEvent>( 293 eventType: string, 294 handler: IEventHandler<T>, 295 ): Promise<void> { 296 this.handlers.set(eventType, handler); 297 } 298 299 async start(): Promise<void> { 300 // Start workers for different queues 301 const queues = ['notifications', 'events']; 302 303 for (const queueName of queues) { 304 const worker = new Worker( 305 queueName, 306 async (job: Job) => { 307 await this.processJob(job); 308 }, 309 { 310 connection: this.redisConnection, 311 concurrency: queueName === 'notifications' ? 5 : 15, 312 }, 313 ); 314 315 worker.on('completed', (job) => { 316 console.log(`Job ${job.id} completed successfully`); 317 }); 318 319 worker.on('failed', (job, err) => { 320 console.error(`Job ${job?.id} failed:`, err); 321 }); 322 323 this.workers.push(worker); 324 } 325 } 326 327 async stop(): Promise<void> { 328 await Promise.all(this.workers.map((worker) => worker.close())); 329 this.workers = []; 330 } 331 332 private async processJob(job: Job): Promise<void> { 333 const eventData = job.data; 334 const eventType = eventData.eventType; 335 336 const handler = this.handlers.get(eventType); 337 if (!handler) { 338 console.warn(`No handler registered for event type: ${eventType}`); 339 return; 340 } 341 342 const event = this.reconstructEvent(eventData); 343 const result = await handler.handle(event); 344 345 if (result.isErr()) { 346 throw result.error; 347 } 348 } 349 350 private reconstructEvent(eventData: any): IDomainEvent { 351 if (eventData.eventType === 'CardAddedToLibraryEvent') { 352 const cardId = CardId.create(eventData.cardId).unwrap(); 353 const curatorId = CuratorId.create(eventData.curatorId).unwrap(); 354 355 const event = new CardAddedToLibraryEvent(cardId, curatorId); 356 (event as any).dateTimeOccurred = new Date(eventData.dateTimeOccurred); 357 358 return event; 359 } 360 361 throw new Error(`Unknown event type: ${eventData.eventType}`); 362 } 363} 364``` 365 366## Use Case Implementation 367 368Use cases orchestrate domain logic and event publishing through dependency injection: 369 370```typescript 371// src/modules/cards/application/use-cases/AddCardToLibraryUseCase.ts 372import { BaseUseCase } from '../../../../shared/application/BaseUseCase'; 373import { UseCase } from '../../../../shared/core/UseCase'; 374import { Result, ok, err } from '../../../../shared/core/Result'; 375import { IEventPublisher } from '../../../../shared/application/events/IEventPublisher'; 376import { ICardRepository } from '../../domain/ICardRepository'; 377import { CardId } from '../../domain/value-objects/CardId'; 378import { CuratorId } from '../../domain/value-objects/CuratorId'; 379 380interface AddCardToLibraryRequest { 381 cardId: string; 382 userId: string; 383} 384 385export class AddCardToLibraryUseCase 386 extends BaseUseCase 387 implements UseCase<AddCardToLibraryRequest, Result<void>> 388{ 389 constructor( 390 private cardRepository: ICardRepository, 391 eventPublisher: IEventPublisher, // Interface injected 392 ) { 393 super(eventPublisher); 394 } 395 396 async execute(request: AddCardToLibraryRequest): Promise<Result<void>> { 397 // 1. Get the card 398 const cardResult = await this.cardRepository.findById( 399 CardId.create(request.cardId).unwrap(), 400 ); 401 if (cardResult.isErr()) { 402 return err(cardResult.error); 403 } 404 405 const card = cardResult.value; 406 if (!card) { 407 return err(new Error('Card not found')); 408 } 409 410 // 2. Execute domain logic (adds events to aggregate) 411 const curatorId = CuratorId.create(request.userId).unwrap(); 412 const addResult = card.addToLibrary(curatorId); 413 if (addResult.isErr()) { 414 return err(addResult.error); 415 } 416 417 // 3. Save to repository 418 const saveResult = await this.cardRepository.save(card); 419 if (saveResult.isErr()) { 420 return err(saveResult.error); 421 } 422 423 // 4. Publish events after successful save 424 const publishResult = await this.publishEventsForAggregate(card); 425 if (publishResult.isErr()) { 426 console.error('Failed to publish events:', publishResult.error); 427 // Don't fail the operation if event publishing fails 428 } 429 430 return ok(undefined); 431 } 432} 433``` 434 435## Event Handler Implementation 436 437Event handlers implement the application interface and contain business logic: 438 439```typescript 440// src/modules/notifications/application/eventHandlers/CardAddedToLibraryEventHandler.ts 441import { IEventHandler } from '../../../../shared/application/events/IEventSubscriber'; 442import { CardAddedToLibraryEvent } from '../../../cards/domain/events/CardAddedToLibraryEvent'; 443import { INotificationService } from '../ports/INotificationService'; 444import { Result } from '../../../../shared/core/Result'; 445 446export class CardAddedToLibraryEventHandler 447 implements IEventHandler<CardAddedToLibraryEvent> 448{ 449 constructor(private notificationService: INotificationService) {} 450 451 async handle(event: CardAddedToLibraryEvent): Promise<Result<void>> { 452 return await this.notificationService.processCardAddedToLibrary(event); 453 } 454} 455``` 456 457## Dependency Injection and Service Factory 458 459The service factory wires up concrete implementations: 460 461```typescript 462// src/shared/infrastructure/ServiceFactory.ts 463import { IEventPublisher } from '../application/events/IEventPublisher'; 464import { IEventSubscriber } from '../application/events/IEventSubscriber'; 465import { BullMQEventPublisher } from './events/BullMQEventPublisher'; 466import { BullMQEventSubscriber } from './events/BullMQEventSubscriber'; 467import { AddCardToLibraryUseCase } from '../../modules/cards/application/use-cases/AddCardToLibraryUseCase'; 468import { CardAddedToLibraryEventHandler as NotificationHandler } from '../../modules/notifications/application/eventHandlers/CardAddedToLibraryEventHandler'; 469import { CardAddedToLibraryEventHandler as FeedHandler } from '../../modules/feeds/application/eventHandlers/CardAddedToLibraryEventHandler'; 470 471export class ServiceFactory { 472 static create( 473 configService: EnvironmentConfigService, 474 repositories: Repositories, 475 ): Services { 476 // Infrastructure - Redis connection 477 const redisConnection = createRedisConnection(); 478 479 // Infrastructure - Event publisher implementation 480 const eventPublisher: IEventPublisher = new BullMQEventPublisher( 481 redisConnection, 482 ); 483 484 // Infrastructure - Event subscriber implementation 485 const eventSubscriber: IEventSubscriber = new BullMQEventSubscriber( 486 redisConnection, 487 ); 488 489 // Application - Use cases with injected interfaces 490 const addCardToLibraryUseCase = new AddCardToLibraryUseCase( 491 repositories.cardRepository, 492 eventPublisher, // Interface injected 493 ); 494 495 // Application - Event handlers 496 const notificationHandler = new NotificationHandler(notificationService); 497 const feedHandler = new FeedHandler(feedService); 498 499 // Register event handlers with subscriber 500 eventSubscriber.subscribe('CardAddedToLibraryEvent', notificationHandler); 501 eventSubscriber.subscribe('CardAddedToLibraryEvent', feedHandler); 502 503 return { 504 // Use cases 505 addCardToLibraryUseCase, 506 507 // Event system 508 eventPublisher, 509 eventSubscriber, 510 511 // Event handlers 512 notificationHandler, 513 feedHandler, 514 }; 515 } 516} 517``` 518 519## Worker Process Setup 520 521Workers are separate processes that run the event subscriber: 522 523```typescript 524// src/workers/notification-worker.ts 525import { ServiceFactory } from '../shared/infrastructure/ServiceFactory'; 526import { EnvironmentConfigService } from '../shared/infrastructure/config/EnvironmentConfigService'; 527 528async function startNotificationWorker() { 529 const configService = new EnvironmentConfigService(); 530 const repositories = createRepositories(configService); 531 const services = ServiceFactory.create(configService, repositories); 532 533 // Start the event subscriber 534 await services.eventSubscriber.start(); 535 536 console.log('Notification worker started'); 537 538 // Graceful shutdown 539 process.on('SIGTERM', async () => { 540 console.log('Shutting down notification worker...'); 541 await services.eventSubscriber.stop(); 542 process.exit(0); 543 }); 544} 545 546startNotificationWorker().catch(console.error); 547``` 548 549## Testing Strategy 550 551### 1. Domain Tests (Unchanged) 552 553Domain tests remain pure and don't need infrastructure: 554 555```typescript 556describe('Card', () => { 557 it('should raise CardAddedToLibraryEvent when added to library', () => { 558 const card = Card.create(cardProps).unwrap(); 559 const userId = CuratorId.create('did:plc:user123').unwrap(); 560 561 card.addToLibrary(userId); 562 563 expect(card.domainEvents).toHaveLength(1); 564 expect(card.domainEvents[0]).toBeInstanceOf(CardAddedToLibraryEvent); 565 }); 566}); 567``` 568 569### 2. Use Case Tests (Mock Interfaces) 570 571Use case tests mock the application interfaces: 572 573```typescript 574describe('AddCardToLibraryUseCase', () => { 575 it('should publish events after successful save', async () => { 576 const mockEventPublisher: IEventPublisher = { 577 publishEvents: jest.fn().mockResolvedValue(ok(undefined)), 578 }; 579 580 const mockCardRepository: ICardRepository = { 581 findById: jest.fn().mockResolvedValue(ok(mockCard)), 582 save: jest.fn().mockResolvedValue(ok(undefined)), 583 }; 584 585 const useCase = new AddCardToLibraryUseCase( 586 mockCardRepository, 587 mockEventPublisher, 588 ); 589 590 const result = await useCase.execute({ 591 cardId: 'card-123', 592 userId: 'user-456', 593 }); 594 595 expect(result.isOk()).toBe(true); 596 expect(mockEventPublisher.publishEvents).toHaveBeenCalledWith( 597 expect.arrayContaining([expect.any(CardAddedToLibraryEvent)]), 598 ); 599 }); 600}); 601``` 602 603### 3. Integration Tests (Real Infrastructure) 604 605Integration tests use real implementations: 606 607```typescript 608describe('BullMQ Event System Integration', () => { 609 let eventPublisher: BullMQEventPublisher; 610 let eventSubscriber: BullMQEventSubscriber; 611 let redis: Redis; 612 613 beforeEach(async () => { 614 redis = new Redis(process.env.TEST_REDIS_URL); 615 eventPublisher = new BullMQEventPublisher(redis); 616 eventSubscriber = new BullMQEventSubscriber(redis); 617 }); 618 619 it('should publish and process events end-to-end', async () => { 620 const mockHandler: IEventHandler<CardAddedToLibraryEvent> = { 621 handle: jest.fn().mockResolvedValue(ok(undefined)), 622 }; 623 624 await eventSubscriber.subscribe('CardAddedToLibraryEvent', mockHandler); 625 await eventSubscriber.start(); 626 627 const event = new CardAddedToLibraryEvent(cardId, userId); 628 await eventPublisher.publishEvents([event]); 629 630 // Wait for processing 631 await new Promise((resolve) => setTimeout(resolve, 1000)); 632 633 expect(mockHandler.handle).toHaveBeenCalledWith( 634 expect.objectContaining({ 635 cardId: expect.any(CardId), 636 curatorId: expect.any(CuratorId), 637 }), 638 ); 639 640 await eventSubscriber.stop(); 641 }); 642}); 643``` 644 645## Migration Strategy 646 647### Phase 1: Create Application Interfaces 648 6491. Define `IEventPublisher` and `IEventSubscriber` interfaces 6502. Create `BaseUseCase` with event publishing logic 6513. No changes to domain or existing infrastructure 652 653### Phase 2: Implement Infrastructure 654 6551. Create `BullMQEventPublisher` and `BullMQEventSubscriber` 6562. Update `ServiceFactory` to inject concrete implementations 6573. Deploy with Redis infrastructure 658 659### Phase 3: Update Use Cases 660 6611. Extend use cases from `BaseUseCase` 6622. Inject `IEventPublisher` through constructor 6633. Replace direct event handling with publishing 664 665### Phase 4: Deploy Workers 666 6671. Create worker processes using `IEventSubscriber` 6682. Register event handlers with subscriber 6693. Scale workers based on load 670 671## Key Benefits of This Approach 672 673### 1. **Clean Architecture Compliance** 674 675- Clear separation of concerns across layers 676- Dependency inversion principle followed 677- Interfaces defined in application layer 678 679### 2. **Testability** 680 681- Easy to mock interfaces for unit tests 682- Domain tests remain pure and fast 683- Integration tests can use real implementations 684 685### 3. **Flexibility** 686 687- Can switch from BullMQ to other queue systems 688- Can add multiple publishers (e.g., event store + queue) 689- Easy to add new event types and handlers 690 691### 4. **Maintainability** 692 693- Clear contracts between layers 694- Infrastructure changes don't affect application logic 695- Easy to understand and debug 696 697### 5. **Scalability** 698 699- Publishers and subscribers can scale independently 700- Different queue configurations per event type 701- Workers can be deployed across multiple regions 702 703## Alternative Implementations 704 705The interface-based approach allows for easy swapping of implementations: 706 707```typescript 708// Alternative: In-memory publisher for testing 709export class InMemoryEventPublisher implements IEventPublisher { 710 public publishedEvents: IDomainEvent[] = []; 711 712 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> { 713 this.publishedEvents.push(...events); 714 return ok(undefined); 715 } 716} 717 718// Alternative: Event store publisher for audit 719export class EventStorePublisher implements IEventPublisher { 720 constructor(private eventStore: IEventStore) {} 721 722 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> { 723 for (const event of events) { 724 await this.eventStore.append(event); 725 } 726 return ok(undefined); 727 } 728} 729 730// Composite publisher for multiple destinations 731export class CompositeEventPublisher implements IEventPublisher { 732 constructor(private publishers: IEventPublisher[]) {} 733 734 async publishEvents(events: IDomainEvent[]): Promise<Result<void>> { 735 for (const publisher of this.publishers) { 736 const result = await publisher.publishEvents(events); 737 if (result.isErr()) { 738 return result; 739 } 740 } 741 return ok(undefined); 742 } 743} 744``` 745 746## Conclusion 747 748This clean architecture approach provides: 749 750- **Domain Purity**: Domain layer has zero infrastructure dependencies 751- **Clear Contracts**: Interfaces define exactly what each layer needs 752- **Flexibility**: Easy to swap implementations or add new ones 753- **Testability**: Mock interfaces for fast, reliable tests 754- **Scalability**: Infrastructure can scale independently 755 756The key insight is that **clean architecture principles apply to event systems too** - define interfaces in the application layer, implement them in infrastructure, and inject them through constructors. This creates a maintainable, testable, and flexible event-driven system.