A social knowledge tool for researchers built on ATProto
1import { IProcess } from '../../domain/IProcess';
2import { EnvironmentConfigService } from '../config/EnvironmentConfigService';
3import { QueueName } from '../events/QueueConfig';
4import { IEventSubscriber } from '../../application/events/IEventSubscriber';
5import {
6 RepositoryFactory,
7 Repositories,
8} from '../http/factories/RepositoryFactory';
9import { WorkerServices } from '../http/factories/ServiceFactory';
10
11export abstract class BaseWorkerProcess implements IProcess {
12 constructor(
13 protected configService: EnvironmentConfigService,
14 protected queueName: QueueName,
15 ) {}
16
17 async start(): Promise<void> {
18 console.log(`Starting ${this.queueName} worker...`);
19
20 const repositories = RepositoryFactory.create(this.configService);
21 const services = this.createServices(repositories);
22
23 await this.validateDependencies(services);
24
25 const eventSubscriber = services.createEventSubscriber(this.queueName);
26 await this.registerHandlers(eventSubscriber, services, repositories);
27 await eventSubscriber.start();
28
29 console.log(`${this.queueName} worker started`);
30
31 this.setupShutdownHandlers(eventSubscriber, services);
32 }
33
34 protected abstract createServices(repositories: Repositories): WorkerServices;
35 protected abstract validateDependencies(
36 services: WorkerServices,
37 ): Promise<void>;
38 protected abstract registerHandlers(
39 subscriber: IEventSubscriber,
40 services: WorkerServices,
41 repositories: Repositories,
42 ): Promise<void>;
43
44 private setupShutdownHandlers(
45 subscriber: IEventSubscriber,
46 services: WorkerServices,
47 ): void {
48 const shutdown = async () => {
49 console.log(`Shutting down ${this.queueName} worker...`);
50 await subscriber.stop();
51 if (services.redisConnection) {
52 await services.redisConnection.quit();
53 }
54 process.exit(0);
55 };
56 process.on('SIGTERM', shutdown);
57 process.on('SIGINT', shutdown);
58 }
59}