A social knowledge tool for researchers built on ATProto
1## Fly.io Deployment Considerations 2 3When deploying to Fly.io, you can extend the current in-memory event system to a distributed event system using several approaches: 4 5### Option 1: Redis Pub/Sub 6 7Redis provides a lightweight pub/sub mechanism perfect for distributed event handling: 8 9```typescript 10// Infrastructure layer - Redis Event Publisher 11export class RedisEventPublisher { 12 constructor(private redis: Redis) {} 13 14 async publish(event: IDomainEvent): Promise<void> { 15 const eventName = event.constructor.name; 16 const eventData = { 17 ...event, 18 aggregateId: event.getAggregateId().toString(), 19 }; 20 21 await this.redis.publish(`events:${eventName}`, JSON.stringify(eventData)); 22 } 23} 24 25// Infrastructure layer - Redis Event Subscriber 26export class RedisEventSubscriber { 27 constructor( 28 private redis: Redis, 29 private eventHandlerRegistry: EventHandlerRegistry, 30 ) {} 31 32 async subscribe(): Promise<void> { 33 const subscriber = this.redis.duplicate(); 34 35 // Subscribe to all event patterns 36 await subscriber.psubscribe('events:*'); 37 38 subscriber.on('pmessage', async (pattern, channel, message) => { 39 try { 40 const eventData = JSON.parse(message); 41 const eventName = channel.replace('events:', ''); 42 43 // Reconstruct and handle the event 44 await this.handleDistributedEvent(eventName, eventData); 45 } catch (error) { 46 console.error('Error handling distributed event:', error); 47 } 48 }); 49 } 50 51 private async handleDistributedEvent( 52 eventName: string, 53 eventData: any, 54 ): Promise<void> { 55 // Reconstruct the event object and dispatch to local handlers 56 // This requires event reconstruction logic based on event type 57 } 58} 59 60// Update DomainEvents to support distributed publishing 61export class DistributedDomainEvents extends DomainEvents { 62 private static eventPublisher?: RedisEventPublisher; 63 64 public static setEventPublisher(publisher: RedisEventPublisher): void { 65 this.eventPublisher = publisher; 66 } 67 68 private static async dispatch(event: IDomainEvent): Promise<void> { 69 // Local dispatch (existing logic) 70 const eventClassName: string = event.constructor.name; 71 if (Object.hasOwn(this.handlersMap, eventClassName)) { 72 const handlers = this.handlersMap[eventClassName]; 73 if (handlers) { 74 for (let handler of handlers) { 75 await handler(event); 76 } 77 } 78 } 79 80 // Distributed dispatch 81 if (this.eventPublisher) { 82 await this.eventPublisher.publish(event); 83 } 84 } 85} 86``` 87 88**Fly.io Redis Setup:** 89 90```toml 91# fly.toml 92[[services]] 93 internal_port = 6379 94 protocol = "tcp" 95 96[env] 97 REDIS_URL = "redis://localhost:6379" 98``` 99 100### Option 2: NATS Messaging 101 102NATS provides more advanced messaging patterns with better delivery guarantees: 103 104```typescript 105// Infrastructure layer - NATS Event Publisher 106export class NatsEventPublisher { 107 constructor(private nats: NatsConnection) {} 108 109 async publish(event: IDomainEvent): Promise<void> { 110 const subject = `events.${event.constructor.name}`; 111 const eventData = { 112 ...event, 113 aggregateId: event.getAggregateId().toString(), 114 timestamp: event.dateTimeOccurred.toISOString(), 115 }; 116 117 await this.nats.publish(subject, JSON.stringify(eventData)); 118 } 119} 120 121// Infrastructure layer - NATS Event Subscriber 122export class NatsEventSubscriber { 123 constructor( 124 private nats: NatsConnection, 125 private eventHandlerRegistry: EventHandlerRegistry, 126 ) {} 127 128 async subscribe(): Promise<void> { 129 // Subscribe to all event subjects 130 const subscription = this.nats.subscribe('events.*'); 131 132 for await (const message of subscription) { 133 try { 134 const eventData = JSON.parse(message.data.toString()); 135 const eventType = message.subject.replace('events.', ''); 136 137 await this.handleDistributedEvent(eventType, eventData); 138 } catch (error) { 139 console.error('Error handling NATS event:', error); 140 } 141 } 142 } 143 144 private async handleDistributedEvent( 145 eventType: string, 146 eventData: any, 147 ): Promise<void> { 148 // Event reconstruction and local dispatch logic 149 } 150} 151``` 152 153**Fly.io NATS Setup:** 154 155```dockerfile 156# Add to Dockerfile 157RUN curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.4/nats-server-v2.10.4-linux-amd64.zip -o nats-server.zip 158RUN unzip nats-server.zip && mv nats-server-v2.10.4-linux-amd64/nats-server /usr/local/bin/ 159``` 160 161### Option 3: Database-Based Event Store 162 163For simpler deployments, use your existing PostgreSQL database as an event store: 164 165```typescript 166// Domain layer - Event Store interfaces 167export interface EventStoreRecord { 168 id: string; 169 aggregateId: string; 170 eventType: string; 171 eventData: string; 172 version: number; 173 timestamp: Date; 174 processed: boolean; 175} 176 177export interface IEventStore { 178 saveEvent(event: IDomainEvent, aggregateId: string): Promise<Result<void>>; 179 getUnprocessedEvents(): Promise<Result<EventStoreRecord[]>>; 180 markEventAsProcessed(eventId: string): Promise<Result<void>>; 181} 182 183// Infrastructure layer - Drizzle Event Store 184export class DrizzleEventStore implements IEventStore { 185 constructor(private db: PostgresJsDatabase) {} 186 187 async saveEvent( 188 event: IDomainEvent, 189 aggregateId: string, 190 ): Promise<Result<void>> { 191 try { 192 await this.db.insert(eventStore).values({ 193 id: uuid(), 194 aggregateId, 195 eventType: event.constructor.name, 196 eventData: JSON.stringify(event), 197 version: 1, // Implement versioning logic 198 timestamp: event.dateTimeOccurred, 199 processed: false, 200 }); 201 202 return ok(); 203 } catch (error) { 204 return err(error); 205 } 206 } 207 208 async getUnprocessedEvents(): Promise<Result<EventStoreRecord[]>> { 209 try { 210 const events = await this.db 211 .select() 212 .from(eventStore) 213 .where(eq(eventStore.processed, false)) 214 .orderBy(eventStore.timestamp); 215 216 return ok(events); 217 } catch (error) { 218 return err(error); 219 } 220 } 221 222 async markEventAsProcessed(eventId: string): Promise<Result<void>> { 223 try { 224 await this.db 225 .update(eventStore) 226 .set({ processed: true }) 227 .where(eq(eventStore.id, eventId)); 228 229 return ok(); 230 } catch (error) { 231 return err(error); 232 } 233 } 234} 235 236// Infrastructure layer - Background Event Processor 237export class BackgroundEventProcessor { 238 constructor( 239 private eventStore: IEventStore, 240 private eventHandlerRegistry: EventHandlerRegistry, 241 ) {} 242 243 async start(): Promise<void> { 244 // Process events every 5 seconds 245 setInterval(async () => { 246 await this.processEvents(); 247 }, 5000); 248 } 249 250 private async processEvents(): Promise<void> { 251 try { 252 const eventsResult = await this.eventStore.getUnprocessedEvents(); 253 if (eventsResult.isErr()) { 254 console.error('Error fetching unprocessed events:', eventsResult.error); 255 return; 256 } 257 258 for (const eventRecord of eventsResult.value) { 259 try { 260 // Reconstruct event object 261 const eventData = JSON.parse(eventRecord.eventData); 262 263 // Dispatch to local handlers 264 await this.dispatchEvent(eventRecord.eventType, eventData); 265 266 // Mark as processed 267 await this.eventStore.markEventAsProcessed(eventRecord.id); 268 } catch (error) { 269 console.error(`Error processing event ${eventRecord.id}:`, error); 270 } 271 } 272 } catch (error) { 273 console.error('Error in background event processor:', error); 274 } 275 } 276 277 private async dispatchEvent( 278 eventType: string, 279 eventData: any, 280 ): Promise<void> { 281 // Event reconstruction and dispatch logic 282 } 283} 284``` 285 286**Database Schema:** 287 288```sql 289-- Add to your migrations 290CREATE TABLE event_store ( 291 id UUID PRIMARY KEY DEFAULT gen_random_uuid(), 292 aggregate_id VARCHAR(255) NOT NULL, 293 event_type VARCHAR(255) NOT NULL, 294 event_data JSONB NOT NULL, 295 version INTEGER NOT NULL DEFAULT 1, 296 timestamp TIMESTAMP WITH TIME ZONE NOT NULL, 297 processed BOOLEAN NOT NULL DEFAULT FALSE, 298 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() 299); 300 301CREATE INDEX idx_event_store_unprocessed ON event_store (processed, timestamp) WHERE processed = FALSE; 302CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version); 303``` 304 305### Option 4: RabbitMQ with Reliable Delivery 306 307For mission-critical events requiring guaranteed delivery: 308 309```typescript 310// Infrastructure layer - RabbitMQ Event Publisher 311export class RabbitMQEventPublisher { 312 constructor(private connection: amqp.Connection) {} 313 314 async publish(event: IDomainEvent): Promise<void> { 315 const channel = await this.connection.createChannel(); 316 const exchange = 'domain_events'; 317 const routingKey = event.constructor.name; 318 319 await channel.assertExchange(exchange, 'topic', { durable: true }); 320 321 const eventData = Buffer.from( 322 JSON.stringify({ 323 ...event, 324 aggregateId: event.getAggregateId().toString(), 325 }), 326 ); 327 328 await channel.publish(exchange, routingKey, eventData, { 329 persistent: true, // Survive broker restarts 330 timestamp: Date.now(), 331 }); 332 333 await channel.close(); 334 } 335} 336 337// Infrastructure layer - RabbitMQ Event Consumer 338export class RabbitMQEventConsumer { 339 constructor( 340 private connection: amqp.Connection, 341 private eventHandlerRegistry: EventHandlerRegistry, 342 ) {} 343 344 async startConsuming(): Promise<void> { 345 const channel = await this.connection.createChannel(); 346 const exchange = 'domain_events'; 347 const queue = `events_${process.env.FLY_APP_NAME || 'app'}`; 348 349 await channel.assertExchange(exchange, 'topic', { durable: true }); 350 await channel.assertQueue(queue, { durable: true }); 351 352 // Bind to all event types 353 await channel.bindQueue(queue, exchange, '*Event'); 354 355 await channel.consume(queue, async (message) => { 356 if (message) { 357 try { 358 const eventData = JSON.parse(message.content.toString()); 359 const eventType = message.fields.routingKey; 360 361 await this.handleDistributedEvent(eventType, eventData); 362 363 // Acknowledge successful processing 364 channel.ack(message); 365 } catch (error) { 366 console.error('Error processing RabbitMQ event:', error); 367 // Reject and requeue for retry 368 channel.nack(message, false, true); 369 } 370 } 371 }); 372 } 373} 374``` 375 376### Integration with Existing System 377 378To integrate distributed events with your current system: 379 380```typescript 381// Update EventHandlerRegistry for distributed events 382export class DistributedEventHandlerRegistry extends EventHandlerRegistry { 383 constructor( 384 feedsCardAddedToLibraryHandler: FeedsCardAddedToLibraryEventHandler, 385 notificationsCardAddedToLibraryHandler: NotificationsCardAddedToLibraryEventHandler, 386 private eventPublisher?: RedisEventPublisher | NatsEventPublisher, 387 ) { 388 super( 389 feedsCardAddedToLibraryHandler, 390 notificationsCardAddedToLibraryHandler, 391 ); 392 } 393 394 registerAllHandlers(): void { 395 // Register local handlers (existing logic) 396 super.registerAllHandlers(); 397 398 // If we have a distributed publisher, also publish events 399 if (this.eventPublisher) { 400 DomainEvents.register(async (event: CardAddedToLibraryEvent) => { 401 await this.eventPublisher!.publish(event); 402 }, CardAddedToLibraryEvent.name); 403 } 404 } 405} 406``` 407 408### Fly.io Deployment Configuration 409 410**Multi-region with Redis:** 411 412```toml 413# fly.toml 414[env] 415 REDIS_URL = "redis://your-redis-app.internal:6379" 416 417[[services]] 418 internal_port = 8080 419 protocol = "tcp" 420 421[deploy] 422 strategy = "rolling" 423``` 424 425**Background worker process:** 426 427```toml 428# fly.toml for worker 429[processes] 430 web = "npm start" 431 worker = "npm run worker" 432 433[env] 434 PROCESS_TYPE = "worker" 435``` 436 437### Monitoring and Observability 438 439```typescript 440// Add metrics for distributed events 441export class EventMetrics { 442 private static eventCounts = new Map<string, number>(); 443 private static errorCounts = new Map<string, number>(); 444 445 static incrementEventCount(eventType: string): void { 446 const current = this.eventCounts.get(eventType) || 0; 447 this.eventCounts.set(eventType, current + 1); 448 } 449 450 static incrementErrorCount(eventType: string): void { 451 const current = this.errorCounts.get(eventType) || 0; 452 this.errorCounts.set(eventType, current + 1); 453 } 454 455 static getMetrics(): Record<string, any> { 456 return { 457 eventCounts: Object.fromEntries(this.eventCounts), 458 errorCounts: Object.fromEntries(this.errorCounts), 459 }; 460 } 461} 462``` 463 464## Future Considerations 465 466### Event Sourcing 467 468The current system could be extended to support event sourcing by: 469 470- Persisting events to an event store 471- Rebuilding aggregate state from events 472- Supporting event replay and temporal queries 473 474### Event Streaming 475 476For high-volume scenarios, consider: 477 478- Publishing events to message queues (Redis, RabbitMQ) 479- Event streaming platforms (Kafka) 480- Eventual consistency patterns 481 482### Cross-Bounded Context Events 483 484For events that cross module boundaries: 485 486- Define integration events at the application layer 487- Use event translation between bounded contexts 488- Consider event versioning strategies 489 490## Implementation Checklist 491 492When implementing a new domain event: 493 494### Domain Layer 495 496- [ ] Define event class implementing `IDomainEvent` 497- [ ] Add event raising logic to appropriate aggregate methods 498- [ ] Include necessary context data in event 499 500### Application Layer 501 502- [ ] Create event handler class 503- [ ] Implement handler logic coordinating services 504- [ ] Handle errors gracefully 505 506### Infrastructure Layer 507 508- [ ] Register handler in `EventHandlerRegistry` 509- [ ] Ensure event dispatch happens after persistence 510- [ ] Add factory registration 511 512### Testing 513 514- [ ] Test event is raised by aggregate 515- [ ] Test handler logic in isolation 516- [ ] Test end-to-end event flow 517- [ ] Clear events between tests 518 519## Recommended Fly.io Setup 520 521For most applications, we recommend starting with the **Database-Based Event Store** approach because: 522 5231. **Simplicity**: Uses your existing PostgreSQL database 5242. **Reliability**: ACID transactions ensure event consistency 5253. **No Additional Infrastructure**: No need for separate message brokers 5264. **Easy Debugging**: Events are visible in your database 5275. **Cost Effective**: No additional services to run 528 529As your application scales, you can migrate to Redis Pub/Sub or NATS for better performance and more advanced messaging patterns. 530 531**Migration Path:** 532 5331. Start with database-based events 5342. Add Redis Pub/Sub for real-time features 5353. Migrate to NATS or RabbitMQ for complex routing and guaranteed delivery 5364. Consider event sourcing for audit requirements 537 538This architecture provides a solid foundation for implementing domain events while maintaining clean separation of concerns and testability, with clear paths for scaling on Fly.io infrastructure.