A social knowledge tool for researchers built on ATProto
1## Fly.io Deployment Considerations
2
3When deploying to Fly.io, you can extend the current in-memory event system to a distributed event system using several approaches:
4
5### Option 1: Redis Pub/Sub
6
7Redis provides a lightweight pub/sub mechanism perfect for distributed event handling:
8
9```typescript
10// Infrastructure layer - Redis Event Publisher
11export class RedisEventPublisher {
12 constructor(private redis: Redis) {}
13
14 async publish(event: IDomainEvent): Promise<void> {
15 const eventName = event.constructor.name;
16 const eventData = {
17 ...event,
18 aggregateId: event.getAggregateId().toString(),
19 };
20
21 await this.redis.publish(`events:${eventName}`, JSON.stringify(eventData));
22 }
23}
24
25// Infrastructure layer - Redis Event Subscriber
26export class RedisEventSubscriber {
27 constructor(
28 private redis: Redis,
29 private eventHandlerRegistry: EventHandlerRegistry,
30 ) {}
31
32 async subscribe(): Promise<void> {
33 const subscriber = this.redis.duplicate();
34
35 // Subscribe to all event patterns
36 await subscriber.psubscribe('events:*');
37
38 subscriber.on('pmessage', async (pattern, channel, message) => {
39 try {
40 const eventData = JSON.parse(message);
41 const eventName = channel.replace('events:', '');
42
43 // Reconstruct and handle the event
44 await this.handleDistributedEvent(eventName, eventData);
45 } catch (error) {
46 console.error('Error handling distributed event:', error);
47 }
48 });
49 }
50
51 private async handleDistributedEvent(
52 eventName: string,
53 eventData: any,
54 ): Promise<void> {
55 // Reconstruct the event object and dispatch to local handlers
56 // This requires event reconstruction logic based on event type
57 }
58}
59
60// Update DomainEvents to support distributed publishing
61export class DistributedDomainEvents extends DomainEvents {
62 private static eventPublisher?: RedisEventPublisher;
63
64 public static setEventPublisher(publisher: RedisEventPublisher): void {
65 this.eventPublisher = publisher;
66 }
67
68 private static async dispatch(event: IDomainEvent): Promise<void> {
69 // Local dispatch (existing logic)
70 const eventClassName: string = event.constructor.name;
71 if (Object.hasOwn(this.handlersMap, eventClassName)) {
72 const handlers = this.handlersMap[eventClassName];
73 if (handlers) {
74 for (let handler of handlers) {
75 await handler(event);
76 }
77 }
78 }
79
80 // Distributed dispatch
81 if (this.eventPublisher) {
82 await this.eventPublisher.publish(event);
83 }
84 }
85}
86```
87
88**Fly.io Redis Setup:**
89
90```toml
91# fly.toml
92[[services]]
93 internal_port = 6379
94 protocol = "tcp"
95
96[env]
97 REDIS_URL = "redis://localhost:6379"
98```
99
100### Option 2: NATS Messaging
101
102NATS provides more advanced messaging patterns with better delivery guarantees:
103
104```typescript
105// Infrastructure layer - NATS Event Publisher
106export class NatsEventPublisher {
107 constructor(private nats: NatsConnection) {}
108
109 async publish(event: IDomainEvent): Promise<void> {
110 const subject = `events.${event.constructor.name}`;
111 const eventData = {
112 ...event,
113 aggregateId: event.getAggregateId().toString(),
114 timestamp: event.dateTimeOccurred.toISOString(),
115 };
116
117 await this.nats.publish(subject, JSON.stringify(eventData));
118 }
119}
120
121// Infrastructure layer - NATS Event Subscriber
122export class NatsEventSubscriber {
123 constructor(
124 private nats: NatsConnection,
125 private eventHandlerRegistry: EventHandlerRegistry,
126 ) {}
127
128 async subscribe(): Promise<void> {
129 // Subscribe to all event subjects
130 const subscription = this.nats.subscribe('events.*');
131
132 for await (const message of subscription) {
133 try {
134 const eventData = JSON.parse(message.data.toString());
135 const eventType = message.subject.replace('events.', '');
136
137 await this.handleDistributedEvent(eventType, eventData);
138 } catch (error) {
139 console.error('Error handling NATS event:', error);
140 }
141 }
142 }
143
144 private async handleDistributedEvent(
145 eventType: string,
146 eventData: any,
147 ): Promise<void> {
148 // Event reconstruction and local dispatch logic
149 }
150}
151```
152
153**Fly.io NATS Setup:**
154
155```dockerfile
156# Add to Dockerfile
157RUN curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.4/nats-server-v2.10.4-linux-amd64.zip -o nats-server.zip
158RUN unzip nats-server.zip && mv nats-server-v2.10.4-linux-amd64/nats-server /usr/local/bin/
159```
160
161### Option 3: Database-Based Event Store
162
163For simpler deployments, use your existing PostgreSQL database as an event store:
164
165```typescript
166// Domain layer - Event Store interfaces
167export interface EventStoreRecord {
168 id: string;
169 aggregateId: string;
170 eventType: string;
171 eventData: string;
172 version: number;
173 timestamp: Date;
174 processed: boolean;
175}
176
177export interface IEventStore {
178 saveEvent(event: IDomainEvent, aggregateId: string): Promise<Result<void>>;
179 getUnprocessedEvents(): Promise<Result<EventStoreRecord[]>>;
180 markEventAsProcessed(eventId: string): Promise<Result<void>>;
181}
182
183// Infrastructure layer - Drizzle Event Store
184export class DrizzleEventStore implements IEventStore {
185 constructor(private db: PostgresJsDatabase) {}
186
187 async saveEvent(
188 event: IDomainEvent,
189 aggregateId: string,
190 ): Promise<Result<void>> {
191 try {
192 await this.db.insert(eventStore).values({
193 id: uuid(),
194 aggregateId,
195 eventType: event.constructor.name,
196 eventData: JSON.stringify(event),
197 version: 1, // Implement versioning logic
198 timestamp: event.dateTimeOccurred,
199 processed: false,
200 });
201
202 return ok();
203 } catch (error) {
204 return err(error);
205 }
206 }
207
208 async getUnprocessedEvents(): Promise<Result<EventStoreRecord[]>> {
209 try {
210 const events = await this.db
211 .select()
212 .from(eventStore)
213 .where(eq(eventStore.processed, false))
214 .orderBy(eventStore.timestamp);
215
216 return ok(events);
217 } catch (error) {
218 return err(error);
219 }
220 }
221
222 async markEventAsProcessed(eventId: string): Promise<Result<void>> {
223 try {
224 await this.db
225 .update(eventStore)
226 .set({ processed: true })
227 .where(eq(eventStore.id, eventId));
228
229 return ok();
230 } catch (error) {
231 return err(error);
232 }
233 }
234}
235
236// Infrastructure layer - Background Event Processor
237export class BackgroundEventProcessor {
238 constructor(
239 private eventStore: IEventStore,
240 private eventHandlerRegistry: EventHandlerRegistry,
241 ) {}
242
243 async start(): Promise<void> {
244 // Process events every 5 seconds
245 setInterval(async () => {
246 await this.processEvents();
247 }, 5000);
248 }
249
250 private async processEvents(): Promise<void> {
251 try {
252 const eventsResult = await this.eventStore.getUnprocessedEvents();
253 if (eventsResult.isErr()) {
254 console.error('Error fetching unprocessed events:', eventsResult.error);
255 return;
256 }
257
258 for (const eventRecord of eventsResult.value) {
259 try {
260 // Reconstruct event object
261 const eventData = JSON.parse(eventRecord.eventData);
262
263 // Dispatch to local handlers
264 await this.dispatchEvent(eventRecord.eventType, eventData);
265
266 // Mark as processed
267 await this.eventStore.markEventAsProcessed(eventRecord.id);
268 } catch (error) {
269 console.error(`Error processing event ${eventRecord.id}:`, error);
270 }
271 }
272 } catch (error) {
273 console.error('Error in background event processor:', error);
274 }
275 }
276
277 private async dispatchEvent(
278 eventType: string,
279 eventData: any,
280 ): Promise<void> {
281 // Event reconstruction and dispatch logic
282 }
283}
284```
285
286**Database Schema:**
287
288```sql
289-- Add to your migrations
290CREATE TABLE event_store (
291 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
292 aggregate_id VARCHAR(255) NOT NULL,
293 event_type VARCHAR(255) NOT NULL,
294 event_data JSONB NOT NULL,
295 version INTEGER NOT NULL DEFAULT 1,
296 timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
297 processed BOOLEAN NOT NULL DEFAULT FALSE,
298 created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
299);
300
301CREATE INDEX idx_event_store_unprocessed ON event_store (processed, timestamp) WHERE processed = FALSE;
302CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, version);
303```
304
305### Option 4: RabbitMQ with Reliable Delivery
306
307For mission-critical events requiring guaranteed delivery:
308
309```typescript
310// Infrastructure layer - RabbitMQ Event Publisher
311export class RabbitMQEventPublisher {
312 constructor(private connection: amqp.Connection) {}
313
314 async publish(event: IDomainEvent): Promise<void> {
315 const channel = await this.connection.createChannel();
316 const exchange = 'domain_events';
317 const routingKey = event.constructor.name;
318
319 await channel.assertExchange(exchange, 'topic', { durable: true });
320
321 const eventData = Buffer.from(
322 JSON.stringify({
323 ...event,
324 aggregateId: event.getAggregateId().toString(),
325 }),
326 );
327
328 await channel.publish(exchange, routingKey, eventData, {
329 persistent: true, // Survive broker restarts
330 timestamp: Date.now(),
331 });
332
333 await channel.close();
334 }
335}
336
337// Infrastructure layer - RabbitMQ Event Consumer
338export class RabbitMQEventConsumer {
339 constructor(
340 private connection: amqp.Connection,
341 private eventHandlerRegistry: EventHandlerRegistry,
342 ) {}
343
344 async startConsuming(): Promise<void> {
345 const channel = await this.connection.createChannel();
346 const exchange = 'domain_events';
347 const queue = `events_${process.env.FLY_APP_NAME || 'app'}`;
348
349 await channel.assertExchange(exchange, 'topic', { durable: true });
350 await channel.assertQueue(queue, { durable: true });
351
352 // Bind to all event types
353 await channel.bindQueue(queue, exchange, '*Event');
354
355 await channel.consume(queue, async (message) => {
356 if (message) {
357 try {
358 const eventData = JSON.parse(message.content.toString());
359 const eventType = message.fields.routingKey;
360
361 await this.handleDistributedEvent(eventType, eventData);
362
363 // Acknowledge successful processing
364 channel.ack(message);
365 } catch (error) {
366 console.error('Error processing RabbitMQ event:', error);
367 // Reject and requeue for retry
368 channel.nack(message, false, true);
369 }
370 }
371 });
372 }
373}
374```
375
376### Integration with Existing System
377
378To integrate distributed events with your current system:
379
380```typescript
381// Update EventHandlerRegistry for distributed events
382export class DistributedEventHandlerRegistry extends EventHandlerRegistry {
383 constructor(
384 feedsCardAddedToLibraryHandler: FeedsCardAddedToLibraryEventHandler,
385 notificationsCardAddedToLibraryHandler: NotificationsCardAddedToLibraryEventHandler,
386 private eventPublisher?: RedisEventPublisher | NatsEventPublisher,
387 ) {
388 super(
389 feedsCardAddedToLibraryHandler,
390 notificationsCardAddedToLibraryHandler,
391 );
392 }
393
394 registerAllHandlers(): void {
395 // Register local handlers (existing logic)
396 super.registerAllHandlers();
397
398 // If we have a distributed publisher, also publish events
399 if (this.eventPublisher) {
400 DomainEvents.register(async (event: CardAddedToLibraryEvent) => {
401 await this.eventPublisher!.publish(event);
402 }, CardAddedToLibraryEvent.name);
403 }
404 }
405}
406```
407
408### Fly.io Deployment Configuration
409
410**Multi-region with Redis:**
411
412```toml
413# fly.toml
414[env]
415 REDIS_URL = "redis://your-redis-app.internal:6379"
416
417[[services]]
418 internal_port = 8080
419 protocol = "tcp"
420
421[deploy]
422 strategy = "rolling"
423```
424
425**Background worker process:**
426
427```toml
428# fly.toml for worker
429[processes]
430 web = "npm start"
431 worker = "npm run worker"
432
433[env]
434 PROCESS_TYPE = "worker"
435```
436
437### Monitoring and Observability
438
439```typescript
440// Add metrics for distributed events
441export class EventMetrics {
442 private static eventCounts = new Map<string, number>();
443 private static errorCounts = new Map<string, number>();
444
445 static incrementEventCount(eventType: string): void {
446 const current = this.eventCounts.get(eventType) || 0;
447 this.eventCounts.set(eventType, current + 1);
448 }
449
450 static incrementErrorCount(eventType: string): void {
451 const current = this.errorCounts.get(eventType) || 0;
452 this.errorCounts.set(eventType, current + 1);
453 }
454
455 static getMetrics(): Record<string, any> {
456 return {
457 eventCounts: Object.fromEntries(this.eventCounts),
458 errorCounts: Object.fromEntries(this.errorCounts),
459 };
460 }
461}
462```
463
464## Future Considerations
465
466### Event Sourcing
467
468The current system could be extended to support event sourcing by:
469
470- Persisting events to an event store
471- Rebuilding aggregate state from events
472- Supporting event replay and temporal queries
473
474### Event Streaming
475
476For high-volume scenarios, consider:
477
478- Publishing events to message queues (Redis, RabbitMQ)
479- Event streaming platforms (Kafka)
480- Eventual consistency patterns
481
482### Cross-Bounded Context Events
483
484For events that cross module boundaries:
485
486- Define integration events at the application layer
487- Use event translation between bounded contexts
488- Consider event versioning strategies
489
490## Implementation Checklist
491
492When implementing a new domain event:
493
494### Domain Layer
495
496- [ ] Define event class implementing `IDomainEvent`
497- [ ] Add event raising logic to appropriate aggregate methods
498- [ ] Include necessary context data in event
499
500### Application Layer
501
502- [ ] Create event handler class
503- [ ] Implement handler logic coordinating services
504- [ ] Handle errors gracefully
505
506### Infrastructure Layer
507
508- [ ] Register handler in `EventHandlerRegistry`
509- [ ] Ensure event dispatch happens after persistence
510- [ ] Add factory registration
511
512### Testing
513
514- [ ] Test event is raised by aggregate
515- [ ] Test handler logic in isolation
516- [ ] Test end-to-end event flow
517- [ ] Clear events between tests
518
519## Recommended Fly.io Setup
520
521For most applications, we recommend starting with the **Database-Based Event Store** approach because:
522
5231. **Simplicity**: Uses your existing PostgreSQL database
5242. **Reliability**: ACID transactions ensure event consistency
5253. **No Additional Infrastructure**: No need for separate message brokers
5264. **Easy Debugging**: Events are visible in your database
5275. **Cost Effective**: No additional services to run
528
529As your application scales, you can migrate to Redis Pub/Sub or NATS for better performance and more advanced messaging patterns.
530
531**Migration Path:**
532
5331. Start with database-based events
5342. Add Redis Pub/Sub for real-time features
5353. Migrate to NATS or RabbitMQ for complex routing and guaranteed delivery
5364. Consider event sourcing for audit requirements
537
538This architecture provides a solid foundation for implementing domain events while maintaining clean separation of concerns and testability, with clear paths for scaling on Fly.io infrastructure.