A social knowledge tool for researchers built on ATProto
at main 1.9 kB view raw
1import { IProcess } from '../../domain/IProcess'; 2import { EnvironmentConfigService } from '../config/EnvironmentConfigService'; 3import { QueueName } from '../events/QueueConfig'; 4import { IEventSubscriber } from '../../application/events/IEventSubscriber'; 5import { 6 RepositoryFactory, 7 Repositories, 8} from '../http/factories/RepositoryFactory'; 9import { WorkerServices } from '../http/factories/ServiceFactory'; 10 11export abstract class BaseWorkerProcess implements IProcess { 12 constructor( 13 protected configService: EnvironmentConfigService, 14 protected queueName: QueueName, 15 ) {} 16 17 async start(): Promise<void> { 18 console.log(`Starting ${this.queueName} worker...`); 19 20 const repositories = RepositoryFactory.create(this.configService); 21 const services = this.createServices(repositories); 22 23 await this.validateDependencies(services); 24 25 const eventSubscriber = services.createEventSubscriber(this.queueName); 26 await this.registerHandlers(eventSubscriber, services, repositories); 27 await eventSubscriber.start(); 28 29 console.log(`${this.queueName} worker started`); 30 31 this.setupShutdownHandlers(eventSubscriber, services); 32 } 33 34 protected abstract createServices(repositories: Repositories): WorkerServices; 35 protected abstract validateDependencies( 36 services: WorkerServices, 37 ): Promise<void>; 38 protected abstract registerHandlers( 39 subscriber: IEventSubscriber, 40 services: WorkerServices, 41 repositories: Repositories, 42 ): Promise<void>; 43 44 private setupShutdownHandlers( 45 subscriber: IEventSubscriber, 46 services: WorkerServices, 47 ): void { 48 const shutdown = async () => { 49 console.log(`Shutting down ${this.queueName} worker...`); 50 await subscriber.stop(); 51 if (services.redisConnection) { 52 await services.redisConnection.quit(); 53 } 54 process.exit(0); 55 }; 56 process.on('SIGTERM', shutdown); 57 process.on('SIGINT', shutdown); 58 } 59}