this repo has no description
at main 95 lines 3.0 kB view raw
1import { CommitCreateEvent, Jetstream } from '@skyware/jetstream'; 2import fs from 'node:fs'; 3 4import { CURSOR_UPDATE_INTERVAL, DID, FIREHOSE_URL, METRICS_PORT, PORT, WANTED_COLLECTION } from './config.js'; 5import { label, labelerServer } from './label.js'; 6import logger from './logger.js'; 7import { startMetricsServer } from './metrics.js'; 8 9let cursor = 0; 10let cursorUpdateInterval: NodeJS.Timeout; 11 12function epochUsToDateTime(cursor: number): string { 13 return new Date(cursor / 1000).toISOString(); 14} 15 16try { 17 logger.info('Trying to read cursor from cursor.txt...'); 18 cursor = Number(fs.readFileSync('cursor.txt', 'utf8')); 19 logger.info(`Cursor found: ${cursor} (${epochUsToDateTime(cursor)})`); 20} catch (error) { 21 if (error instanceof Error && 'code' in error && error.code === 'ENOENT') { 22 logger.info(`Cursor not found in cursor.txt, setting cursor to: ${cursor} (${epochUsToDateTime(cursor)})`); 23 fs.writeFileSync('cursor.txt', cursor.toString(), 'utf8'); 24 } else { 25 logger.error(error); 26 process.exit(1); 27 } 28} 29 30const jetstream = new Jetstream({ 31 wantedCollections: [WANTED_COLLECTION], 32 endpoint: FIREHOSE_URL, 33 cursor: cursor, 34}); 35 36jetstream.on('open', () => { 37 logger.info( 38 `Connected to Jetstream at ${FIREHOSE_URL} with cursor ${jetstream.cursor} (${epochUsToDateTime(jetstream.cursor!)})`, 39 ); 40 cursorUpdateInterval = setInterval(() => { 41 if (jetstream.cursor) { 42 logger.info(`Cursor updated to: ${jetstream.cursor} (${epochUsToDateTime(jetstream.cursor)})`); 43 fs.writeFile('cursor.txt', jetstream.cursor.toString(), (err) => { 44 if (err) logger.error(err); 45 }); 46 } 47 }, CURSOR_UPDATE_INTERVAL); 48}); 49 50jetstream.on('close', () => { 51 clearInterval(cursorUpdateInterval); 52 logger.info('Jetstream connection closed.'); 53}); 54 55jetstream.on('error', (error) => { 56 logger.error(`Jetstream error: ${error.message}`); 57}); 58 59jetstream.onCreate(WANTED_COLLECTION, (event: CommitCreateEvent<typeof WANTED_COLLECTION>) => { 60 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition 61 if (event.commit?.record?.subject?.uri?.includes(DID)) { 62 label(event.did, event.commit.record.subject.uri.split('/').pop()!).catch((error: unknown) => { 63 logger.error(`Unexpected error labeling ${event.did}: ${error}`); 64 }); 65 } 66}); 67 68const metricsServer = startMetricsServer(METRICS_PORT); 69 70labelerServer.start(PORT, (error, address) => { 71 if (error) { 72 logger.error('Error starting server: %s', error); 73 } else { 74 logger.info(`Labeler server listening on ${address}`); 75 } 76}); 77 78jetstream.start(); 79 80// this doesn't work properly, need to research why 81function shutdown() { 82 try { 83 logger.info('Shutting down gracefully...'); 84 fs.writeFileSync('cursor.txt', jetstream.cursor!.toString(), 'utf8'); 85 jetstream.close(); 86 labelerServer.stop(); 87 metricsServer.close(); 88 } catch (error) { 89 logger.error(`Error shutting down gracefully: ${error}`); 90 process.exit(1); 91 } 92} 93 94process.on('SIGINT', shutdown); 95process.on('SIGTERM', shutdown);