this repo has no description
1import { AppBskyFeedLike } from "@atproto/api";
2import { Firehose } from "@skyware/firehose";
3import { label } from "./label.js";
4import { DID } from "./constants.js";
5import fs from "node:fs";
6
7const subscribe = async () => {
8 let cursorFirehose = 0;
9 let intervalID: NodeJS.Timeout;
10 const cursorFile = fs.readFileSync("cursor.txt", "utf8");
11
12 const firehose = new Firehose({ cursor: cursorFile ?? "" });
13 if (cursorFile) console.log(`Initiate firehose at cursor ${cursorFile}`);
14
15 firehose.on("error", ({ cursor, error }) => {
16 console.log(`Firehose errored on cursor: ${cursor}`, error);
17 });
18
19 firehose.on("open", () => {
20 intervalID = setInterval(() => {
21 const timestamp = new Date().toISOString();
22 console.log(`${timestamp} cursor: ${cursorFirehose}`);
23 fs.writeFile("cursor.txt", cursorFirehose.toString(), (err) => {
24 if (err) console.error(err);
25 });
26 }, 60000);
27 });
28
29 firehose.on("close", () => {
30 clearInterval(intervalID);
31 });
32
33 firehose.on("commit", (commit) => {
34 cursorFirehose = commit.seq;
35 commit.ops.forEach(async (op) => {
36 if (op.action !== "delete" && AppBskyFeedLike.isRecord(op.record)) {
37 if (op.record.subject.uri.includes(DID)) {
38 await label(
39 commit.repo,
40 op.record.subject.uri.split("/").pop()!,
41 ).catch((err) => console.error(err));
42 }
43 }
44 });
45 });
46
47 firehose.start();
48};
49
50subscribe();