this repo has no description
1import { CommitCreateEvent, Jetstream } from '@skyware/jetstream';
2import fs from 'node:fs';
3
4import { CURSOR_UPDATE_INTERVAL, DID, FIREHOSE_URL, METRICS_PORT, PORT, WANTED_COLLECTION } from './config.js';
5import { label, labelerServer } from './label.js';
6import logger from './logger.js';
7import { startMetricsServer } from './metrics.js';
8
9let cursor = 0;
10let cursorUpdateInterval: NodeJS.Timeout;
11
12function epochUsToDateTime(cursor: number): string {
13 return new Date(cursor / 1000).toISOString();
14}
15
16try {
17 logger.info('Trying to read cursor from cursor.txt...');
18 cursor = Number(fs.readFileSync('cursor.txt', 'utf8'));
19 logger.info(`Cursor found: ${cursor} (${epochUsToDateTime(cursor)})`);
20} catch (error) {
21 if (error instanceof Error && 'code' in error && error.code === 'ENOENT') {
22 logger.info(`Cursor not found in cursor.txt, setting cursor to: ${cursor} (${epochUsToDateTime(cursor)})`);
23 fs.writeFileSync('cursor.txt', cursor.toString(), 'utf8');
24 } else {
25 logger.error(error);
26 process.exit(1);
27 }
28}
29
30const jetstream = new Jetstream({
31 wantedCollections: [WANTED_COLLECTION],
32 endpoint: FIREHOSE_URL,
33 cursor: cursor,
34});
35
36jetstream.on('open', () => {
37 logger.info(
38 `Connected to Jetstream at ${FIREHOSE_URL} with cursor ${jetstream.cursor} (${epochUsToDateTime(jetstream.cursor!)})`,
39 );
40 cursorUpdateInterval = setInterval(() => {
41 if (jetstream.cursor) {
42 logger.info(`Cursor updated to: ${jetstream.cursor} (${epochUsToDateTime(jetstream.cursor)})`);
43 fs.writeFile('cursor.txt', jetstream.cursor.toString(), (err) => {
44 if (err) logger.error(err);
45 });
46 }
47 }, CURSOR_UPDATE_INTERVAL);
48});
49
50jetstream.on('close', () => {
51 clearInterval(cursorUpdateInterval);
52 logger.info('Jetstream connection closed.');
53});
54
55jetstream.on('error', (error) => {
56 logger.error(`Jetstream error: ${error.message}`);
57});
58
59jetstream.onCreate(WANTED_COLLECTION, (event: CommitCreateEvent<typeof WANTED_COLLECTION>) => {
60 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
61 if (event.commit?.record?.subject?.uri?.includes(DID)) {
62 label(event.did, event.commit.record.subject.uri.split('/').pop()!).catch((error: unknown) => {
63 logger.error(`Unexpected error labeling ${event.did}: ${error}`);
64 });
65 }
66});
67
68const metricsServer = startMetricsServer(METRICS_PORT);
69
70labelerServer.start(PORT, (error, address) => {
71 if (error) {
72 logger.error('Error starting server: %s', error);
73 } else {
74 logger.info(`Labeler server listening on ${address}`);
75 }
76});
77
78jetstream.start();
79
80// this doesn't work properly, need to research why
81function shutdown() {
82 try {
83 logger.info('Shutting down gracefully...');
84 fs.writeFileSync('cursor.txt', jetstream.cursor!.toString(), 'utf8');
85 jetstream.close();
86 labelerServer.stop();
87 metricsServer.close();
88 } catch (error) {
89 logger.error(`Error shutting down gracefully: ${error}`);
90 process.exit(1);
91 }
92}
93
94process.on('SIGINT', shutdown);
95process.on('SIGTERM', shutdown);