A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
at main 11 kB view raw
1import fs from "node:fs"; 2import type { 3 CommitCreateEvent, 4 CommitUpdateEvent, 5 IdentityEvent, 6} from "@skyware/jetstream"; 7import { Jetstream } from "@skyware/jetstream"; 8import { login } from "./agent.js"; 9import { 10 CURSOR_UPDATE_INTERVAL, 11 FIREHOSE_URL, 12 METRICS_PORT, 13 WANTED_COLLECTION, 14} from "./config.js"; 15import { logger } from "./logger.js"; 16import { startMetricsServer } from "./metrics.js"; 17import { connectRedis, disconnectRedis } from "./redis.js"; 18import { checkAccountAge } from "./rules/account/age.js"; 19import { checkFacetSpam } from "./rules/facets/facets.js"; 20import { checkHandle } from "./rules/handles/checkHandles.js"; 21import { checkPosts } from "./rules/posts/checkPosts.js"; 22import { checkProfile } from "./rules/profiles/checkProfiles.js"; 23import { checkStarterPackThreshold } from "./starterPackThreshold.js"; 24import type { Post } from "./types.js"; 25 26let cursor = 0; 27let cursorUpdateInterval: NodeJS.Timeout; 28 29function epochUsToDateTime(cursor: number): string { 30 return new Date(cursor / 1000).toISOString(); 31} 32 33try { 34 logger.info({ process: "MAIN" }, "Trying to read cursor from cursor.txt"); 35 cursor = Number(fs.readFileSync("cursor.txt", "utf8")); 36 logger.info( 37 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) }, 38 "Cursor found", 39 ); 40} catch (error) { 41 if (error instanceof Error && "code" in error && error.code === "ENOENT") { 42 cursor = Math.floor(Date.now() * 1000); 43 logger.info( 44 { process: "MAIN", cursor, datetime: epochUsToDateTime(cursor) }, 45 "Cursor not found in cursor.txt, setting cursor", 46 ); 47 fs.writeFileSync("cursor.txt", cursor.toString(), "utf8"); 48 } else { 49 logger.error({ process: "MAIN", error }, "Failed to read cursor"); 50 process.exit(1); 51 } 52} 53 54const jetstream = new Jetstream({ 55 wantedCollections: WANTED_COLLECTION, 56 endpoint: FIREHOSE_URL, 57 cursor, 58}); 59 60jetstream.on("open", () => { 61 if (jetstream.cursor) { 62 logger.info( 63 { 64 process: "MAIN", 65 url: FIREHOSE_URL, 66 cursor: jetstream.cursor, 67 datetime: epochUsToDateTime(jetstream.cursor), 68 }, 69 "Connected to Jetstream with cursor", 70 ); 71 } else { 72 logger.info( 73 { process: "MAIN", url: FIREHOSE_URL }, 74 "Connected to Jetstream, waiting for cursor", 75 ); 76 } 77 cursorUpdateInterval = setInterval(() => { 78 if (jetstream.cursor) { 79 logger.info( 80 { 81 process: "MAIN", 82 cursor: jetstream.cursor, 83 datetime: epochUsToDateTime(jetstream.cursor), 84 }, 85 "Cursor updated", 86 ); 87 fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => { 88 if (err) 89 logger.error( 90 { process: "MAIN", error: err }, 91 "Failed to write cursor", 92 ); 93 }); 94 } 95 }, CURSOR_UPDATE_INTERVAL); 96}); 97 98jetstream.on("close", () => { 99 clearInterval(cursorUpdateInterval); 100 logger.info({ process: "MAIN" }, "Jetstream connection closed"); 101}); 102 103jetstream.on("error", (error) => { 104 logger.error({ process: "MAIN", error: error.message }, "Jetstream error"); 105}); 106 107// Check for post updates 108 109jetstream.onCreate( 110 "app.bsky.feed.post", 111 (event: CommitCreateEvent<"app.bsky.feed.post">) => { 112 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 113 const hasEmbed = Object.prototype.hasOwnProperty.call( 114 event.commit.record, 115 "embed", 116 ); 117 const hasFacets = Object.prototype.hasOwnProperty.call( 118 event.commit.record, 119 "facets", 120 ); 121 const hasText = Object.prototype.hasOwnProperty.call( 122 event.commit.record, 123 "text", 124 ); 125 126 const tasks: Promise<void>[] = []; 127 128 // Check account age for replies to monitored DIDs 129 if (event.commit.record.reply) { 130 const parentUri = event.commit.record.reply.parent.uri; 131 const replyToDid = parentUri.split("/")[2]; // Extract DID from at://did/... 132 133 tasks.push( 134 checkAccountAge({ 135 actorDid: event.did, 136 replyToDid, 137 replyToPostURI: parentUri, 138 atURI, 139 time: event.time_us, 140 }), 141 ); 142 } 143 144 // Check account age for quote posts 145 if (hasEmbed) { 146 const { embed } = event.commit.record; 147 if ( 148 embed && 149 typeof embed === "object" && 150 "$type" in embed && 151 (embed.$type === "app.bsky.embed.record" || 152 embed.$type === "app.bsky.embed.recordWithMedia") 153 ) { 154 const record = 155 embed.$type === "app.bsky.embed.record" 156 ? (embed as { record: { uri?: string } }).record 157 : (embed as { record: { record: { uri?: string } } }).record.record; 158 if (record.uri && typeof record.uri === "string") { 159 const quotedPostURI = record.uri; 160 const quotedDid = quotedPostURI.split("/")[2]; // Extract DID from at://did/... 161 if (quotedDid) { 162 tasks.push( 163 checkAccountAge({ 164 actorDid: event.did, 165 quotedDid, 166 quotedPostURI, 167 atURI, 168 time: event.time_us, 169 }), 170 ); 171 } 172 } 173 } 174 } 175 176 // Check if the record has facets 177 if (hasFacets) { 178 // Check for facet spam (hidden mentions with duplicate byte positions) 179 const facets = event.commit.record.facets ?? null; 180 tasks.push(checkFacetSpam(event.did, event.time_us, atURI, facets)); 181 182 const hasLinkType = facets?.some((facet) => 183 facet.features.some( 184 (feature) => feature.$type === "app.bsky.richtext.facet#link", 185 ), 186 ); 187 188 if (hasLinkType && facets) { 189 for (const facet of facets) { 190 const linkFeatures = facet.features.filter( 191 (feature) => feature.$type === "app.bsky.richtext.facet#link", 192 ); 193 194 for (const feature of linkFeatures) { 195 if ("uri" in feature && typeof feature.uri === "string") { 196 const posts: Post[] = [ 197 { 198 did: event.did, 199 time: event.time_us, 200 rkey: event.commit.rkey, 201 atURI, 202 text: feature.uri, 203 cid: event.commit.cid, 204 }, 205 ]; 206 tasks.push(checkPosts(posts)); 207 } 208 } 209 } 210 } 211 } 212 213 if (hasText) { 214 const posts: Post[] = [ 215 { 216 did: event.did, 217 time: event.time_us, 218 rkey: event.commit.rkey, 219 atURI, 220 text: event.commit.record.text, 221 cid: event.commit.cid, 222 }, 223 ]; 224 tasks.push(checkPosts(posts)); 225 } 226 227 if (hasEmbed) { 228 const { embed } = event.commit.record; 229 if ( 230 embed && 231 typeof embed === "object" && 232 "$type" in embed && 233 embed.$type === "app.bsky.embed.external" 234 ) { 235 const { external } = embed as { external: { uri: string } }; 236 const posts: Post[] = [ 237 { 238 did: event.did, 239 time: event.time_us, 240 rkey: event.commit.rkey, 241 atURI, 242 text: external.uri, 243 cid: event.commit.cid, 244 }, 245 ]; 246 tasks.push(checkPosts(posts)); 247 } 248 249 if ( 250 embed && 251 typeof embed === "object" && 252 "$type" in embed && 253 embed.$type === "app.bsky.embed.recordWithMedia" 254 ) { 255 const { media } = embed as { 256 media: { $type: string; external?: { uri: string } }; 257 }; 258 if (media.$type === "app.bsky.embed.external" && media.external) { 259 const posts: Post[] = [ 260 { 261 did: event.did, 262 time: event.time_us, 263 rkey: event.commit.rkey, 264 atURI, 265 text: media.external.uri, 266 cid: event.commit.cid, 267 }, 268 ]; 269 tasks.push(checkPosts(posts)); 270 } 271 } 272 } 273 }, 274); 275 276// Check for profile updates 277jetstream.onUpdate( 278 "app.bsky.actor.profile", 279 // eslint-disable-next-line @typescript-eslint/no-misused-promises, @typescript-eslint/require-await 280 async (event: CommitUpdateEvent<"app.bsky.actor.profile">) => { 281 try { 282 if (event.commit.record.displayName || event.commit.record.description) { 283 void checkProfile( 284 event.did, 285 event.time_us, 286 event.commit.record.displayName as string, 287 event.commit.record.description as string, 288 ); 289 } 290 } catch (error) { 291 logger.error({ process: "MAIN", error }, "Error checking profile"); 292 } 293 }, 294); 295 296// Check for profile updates 297 298jetstream.onCreate( 299 "app.bsky.actor.profile", 300 // eslint-disable-next-line @typescript-eslint/no-misused-promises, @typescript-eslint/require-await 301 async (event: CommitCreateEvent<"app.bsky.actor.profile">) => { 302 try { 303 if (event.commit.record.displayName || event.commit.record.description) { 304 void checkProfile( 305 event.did, 306 event.time_us, 307 event.commit.record.displayName as string, 308 event.commit.record.description as string, 309 ); 310 } 311 } catch (error) { 312 logger.error({ process: "MAIN", error }, "Error checking profile"); 313 } 314 }, 315); 316 317// Check for handle updates 318jetstream.on( 319 "identity", 320 // eslint-disable-next-line @typescript-eslint/require-await, @typescript-eslint/no-misused-promises 321 async (event: IdentityEvent) => { 322 if (event.identity.handle) { 323 // checkHandle is sync but calls async functions with void 324 checkHandle(event.identity.did, event.identity.handle, event.time_us); 325 } 326 }, 327); 328 329// Check for starter pack creation 330jetstream.onCreate( 331 "app.bsky.graph.starterpack", 332 (event: CommitCreateEvent<"app.bsky.graph.starterpack">) => { 333 const starterPackUri = `at://${event.did}/app.bsky.graph.starterpack/${event.commit.rkey}`; 334 void checkStarterPackThreshold(event.did, starterPackUri, event.time_us); 335 }, 336); 337 338const metricsServer = startMetricsServer(METRICS_PORT); 339 340logger.info({ process: "MAIN" }, "Connecting to Redis"); 341await connectRedis(); 342 343logger.info({ process: "MAIN" }, "Authenticating with Bluesky"); 344await login(); 345logger.info({ process: "MAIN" }, "Authentication complete, starting Jetstream"); 346 347jetstream.start(); 348 349async function shutdown() { 350 try { 351 logger.info({ process: "MAIN" }, "Shutting down gracefully"); 352 if (jetstream.cursor !== undefined) { 353 fs.writeFileSync("cursor.txt", jetstream.cursor.toString(), "utf8"); 354 } 355 jetstream.close(); 356 metricsServer.close(); 357 await disconnectRedis(); 358 } catch (error) { 359 logger.error({ process: "MAIN", error }, "Error shutting down gracefully"); 360 process.exit(1); 361 } 362} 363 364process.on("SIGINT", () => void shutdown()); 365process.on("SIGTERM", () => void shutdown());