A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
1import fs from "node:fs";
2import type {
3 CommitCreateEvent,
4 CommitUpdateEvent,
5 IdentityEvent,
6} from "@skyware/jetstream";
7import { Jetstream } from "@skyware/jetstream";
8import { login } from "./agent.js";
9import {
10 CURSOR_UPDATE_INTERVAL,
11 FIREHOSE_URL,
12 METRICS_PORT,
13 WANTED_COLLECTION,
14} from "./config.js";
15import { logger } from "./logger.js";
16import { startMetricsServer } from "./metrics.js";
17import { connectRedis, disconnectRedis } from "./redis.js";
18import { checkAccountAge } from "./rules/account/age.js";
19import { checkFacetSpam } from "./rules/facets/facets.js";
20import { checkHandle } from "./rules/handles/checkHandles.js";
21import { checkPosts } from "./rules/posts/checkPosts.js";
22import { checkProfile } from "./rules/profiles/checkProfiles.js";
23import { checkStarterPackThreshold } from "./starterPackThreshold.js";
24import type { Post } from "./types.js";
25
26let cursor = 0;
27let cursorUpdateInterval: NodeJS.Timeout;
28
29function epochUsToDateTime(cursor: number): string {
30 return new Date(cursor / 1000).toISOString();
31}
32
33try {
34 logger.info({ process: "MAIN" }, "Trying to read cursor from cursor.txt");
35 cursor = Number(fs.readFileSync("cursor.txt", "utf8"));
36 logger.info(
37 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) },
38 "Cursor found",
39 );
40} catch (error) {
41 if (error instanceof Error && "code" in error && error.code === "ENOENT") {
42 cursor = Math.floor(Date.now() * 1000);
43 logger.info(
44 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) },
45 "Cursor not found in cursor.txt, setting cursor",
46 );
47 fs.writeFileSync("cursor.txt", cursor.toString(), "utf8");
48 } else {
49 logger.error({ process: "MAIN", error }, "Failed to read cursor");
50 process.exit(1);
51 }
52}
53
54const jetstream = new Jetstream({
55 wantedCollections: WANTED_COLLECTION,
56 endpoint: FIREHOSE_URL,
57 cursor,
58});
59
60jetstream.on("open", () => {
61 if (jetstream.cursor) {
62 logger.info(
63 {
64 process: "MAIN",
65 url: FIREHOSE_URL,
66 cursor: jetstream.cursor,
67 datetime: epochUsToDateTime(jetstream.cursor),
68 },
69 "Connected to Jetstream with cursor",
70 );
71 } else {
72 logger.info(
73 { process: "MAIN", url: FIREHOSE_URL },
74 "Connected to Jetstream, waiting for cursor",
75 );
76 }
77 cursorUpdateInterval = setInterval(() => {
78 if (jetstream.cursor) {
79 logger.info(
80 {
81 process: "MAIN",
82 cursor: jetstream.cursor,
83 datetime: epochUsToDateTime(jetstream.cursor),
84 },
85 "Cursor updated",
86 );
87 fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => {
88 if (err)
89 logger.error(
90 { process: "MAIN", error: err },
91 "Failed to write cursor",
92 );
93 });
94 }
95 }, CURSOR_UPDATE_INTERVAL);
96});
97
98jetstream.on("close", () => {
99 clearInterval(cursorUpdateInterval);
100 logger.info({ process: "MAIN" }, "Jetstream connection closed");
101});
102
103jetstream.on("error", (error) => {
104 logger.error({ process: "MAIN", error: error.message }, "Jetstream error");
105});
106
107// Check for post updates
108
109jetstream.onCreate(
110 "app.bsky.feed.post",
111 (event: CommitCreateEvent<"app.bsky.feed.post">) => {
112 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`;
113 const hasEmbed = Object.prototype.hasOwnProperty.call(
114 event.commit.record,
115 "embed",
116 );
117 const hasFacets = Object.prototype.hasOwnProperty.call(
118 event.commit.record,
119 "facets",
120 );
121 const hasText = Object.prototype.hasOwnProperty.call(
122 event.commit.record,
123 "text",
124 );
125
126 const tasks: Promise<void>[] = [];
127
128 // Check account age for replies to monitored DIDs
129 if (event.commit.record.reply) {
130 const parentUri = event.commit.record.reply.parent.uri;
131 const replyToDid = parentUri.split("/")[2]; // Extract DID from at://did/...
132
133 tasks.push(
134 checkAccountAge({
135 actorDid: event.did,
136 replyToDid,
137 replyToPostURI: parentUri,
138 atURI,
139 time: event.time_us,
140 }),
141 );
142 }
143
144 // Check account age for quote posts
145 if (hasEmbed) {
146 const { embed } = event.commit.record;
147 if (
148 embed &&
149 typeof embed === "object" &&
150 "$type" in embed &&
151 (embed.$type === "app.bsky.embed.record" ||
152 embed.$type === "app.bsky.embed.recordWithMedia")
153 ) {
154 const record =
155 embed.$type === "app.bsky.embed.record"
156 ? (embed as { record: { uri?: string } }).record
157 : (embed as { record: { record: { uri?: string } } }).record.record;
158 if (record.uri && typeof record.uri === "string") {
159 const quotedPostURI = record.uri;
160 const quotedDid = quotedPostURI.split("/")[2]; // Extract DID from at://did/...
161 if (quotedDid) {
162 tasks.push(
163 checkAccountAge({
164 actorDid: event.did,
165 quotedDid,
166 quotedPostURI,
167 atURI,
168 time: event.time_us,
169 }),
170 );
171 }
172 }
173 }
174 }
175
176 // Check if the record has facets
177 if (hasFacets) {
178 // Check for facet spam (hidden mentions with duplicate byte positions)
179 const facets = event.commit.record.facets ?? null;
180 tasks.push(checkFacetSpam(event.did, event.time_us, atURI, facets));
181
182 const hasLinkType = facets?.some((facet) =>
183 facet.features.some(
184 (feature) => feature.$type === "app.bsky.richtext.facet#link",
185 ),
186 );
187
188 if (hasLinkType && facets) {
189 for (const facet of facets) {
190 const linkFeatures = facet.features.filter(
191 (feature) => feature.$type === "app.bsky.richtext.facet#link",
192 );
193
194 for (const feature of linkFeatures) {
195 if ("uri" in feature && typeof feature.uri === "string") {
196 const posts: Post[] = [
197 {
198 did: event.did,
199 time: event.time_us,
200 rkey: event.commit.rkey,
201 atURI,
202 text: feature.uri,
203 cid: event.commit.cid,
204 },
205 ];
206 tasks.push(checkPosts(posts));
207 }
208 }
209 }
210 }
211 }
212
213 if (hasText) {
214 const posts: Post[] = [
215 {
216 did: event.did,
217 time: event.time_us,
218 rkey: event.commit.rkey,
219 atURI,
220 text: event.commit.record.text,
221 cid: event.commit.cid,
222 },
223 ];
224 tasks.push(checkPosts(posts));
225 }
226
227 if (hasEmbed) {
228 const { embed } = event.commit.record;
229 if (
230 embed &&
231 typeof embed === "object" &&
232 "$type" in embed &&
233 embed.$type === "app.bsky.embed.external"
234 ) {
235 const { external } = embed as { external: { uri: string } };
236 const posts: Post[] = [
237 {
238 did: event.did,
239 time: event.time_us,
240 rkey: event.commit.rkey,
241 atURI,
242 text: external.uri,
243 cid: event.commit.cid,
244 },
245 ];
246 tasks.push(checkPosts(posts));
247 }
248
249 if (
250 embed &&
251 typeof embed === "object" &&
252 "$type" in embed &&
253 embed.$type === "app.bsky.embed.recordWithMedia"
254 ) {
255 const { media } = embed as {
256 media: { $type: string; external?: { uri: string } };
257 };
258 if (media.$type === "app.bsky.embed.external" && media.external) {
259 const posts: Post[] = [
260 {
261 did: event.did,
262 time: event.time_us,
263 rkey: event.commit.rkey,
264 atURI,
265 text: media.external.uri,
266 cid: event.commit.cid,
267 },
268 ];
269 tasks.push(checkPosts(posts));
270 }
271 }
272 }
273 },
274);
275
276// Check for profile updates
277jetstream.onUpdate(
278 "app.bsky.actor.profile",
279 // eslint-disable-next-line @typescript-eslint/no-misused-promises, @typescript-eslint/require-await
280 async (event: CommitUpdateEvent<"app.bsky.actor.profile">) => {
281 try {
282 if (event.commit.record.displayName || event.commit.record.description) {
283 void checkProfile(
284 event.did,
285 event.time_us,
286 event.commit.record.displayName as string,
287 event.commit.record.description as string,
288 );
289 }
290 } catch (error) {
291 logger.error({ process: "MAIN", error }, "Error checking profile");
292 }
293 },
294);
295
296// Check for profile updates
297
298jetstream.onCreate(
299 "app.bsky.actor.profile",
300 // eslint-disable-next-line @typescript-eslint/no-misused-promises, @typescript-eslint/require-await
301 async (event: CommitCreateEvent<"app.bsky.actor.profile">) => {
302 try {
303 if (event.commit.record.displayName || event.commit.record.description) {
304 void checkProfile(
305 event.did,
306 event.time_us,
307 event.commit.record.displayName as string,
308 event.commit.record.description as string,
309 );
310 }
311 } catch (error) {
312 logger.error({ process: "MAIN", error }, "Error checking profile");
313 }
314 },
315);
316
317// Check for handle updates
318jetstream.on(
319 "identity",
320 // eslint-disable-next-line @typescript-eslint/require-await, @typescript-eslint/no-misused-promises
321 async (event: IdentityEvent) => {
322 if (event.identity.handle) {
323 // checkHandle is sync but calls async functions with void
324 checkHandle(event.identity.did, event.identity.handle, event.time_us);
325 }
326 },
327);
328
329// Check for starter pack creation
330jetstream.onCreate(
331 "app.bsky.graph.starterpack",
332 (event: CommitCreateEvent<"app.bsky.graph.starterpack">) => {
333 const starterPackUri = `at://${event.did}/app.bsky.graph.starterpack/${event.commit.rkey}`;
334 void checkStarterPackThreshold(event.did, starterPackUri, event.time_us);
335 },
336);
337
338const metricsServer = startMetricsServer(METRICS_PORT);
339
340logger.info({ process: "MAIN" }, "Connecting to Redis");
341await connectRedis();
342
343logger.info({ process: "MAIN" }, "Authenticating with Bluesky");
344await login();
345logger.info({ process: "MAIN" }, "Authentication complete, starting Jetstream");
346
347jetstream.start();
348
349async function shutdown() {
350 try {
351 logger.info({ process: "MAIN" }, "Shutting down gracefully");
352 if (jetstream.cursor !== undefined) {
353 fs.writeFileSync("cursor.txt", jetstream.cursor.toString(), "utf8");
354 }
355 jetstream.close();
356 metricsServer.close();
357 await disconnectRedis();
358 } catch (error) {
359 logger.error({ process: "MAIN", error }, "Error shutting down gracefully");
360 process.exit(1);
361 }
362}
363
364process.on("SIGINT", () => void shutdown());
365process.on("SIGTERM", () => void shutdown());