A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
at main 7.2 kB view raw
1import { createClient } from "redis"; 2import { REDIS_URL } from "./config.js"; 3import { logger } from "./logger.js"; 4import type { WindowUnit } from "./types.js"; 5 6export const redisClient = createClient({ 7 url: REDIS_URL, 8}); 9 10redisClient.on("error", (err: Error) => { 11 logger.error({ err }, "Redis client error"); 12}); 13 14redisClient.on("connect", () => { 15 logger.info("Redis client connected"); 16}); 17 18redisClient.on("ready", () => { 19 logger.info("Redis client ready"); 20}); 21 22redisClient.on("reconnecting", () => { 23 logger.warn("Redis client reconnecting"); 24}); 25 26export async function connectRedis(): Promise<void> { 27 try { 28 await redisClient.connect(); 29 } catch (err) { 30 logger.error({ err }, "Failed to connect to Redis"); 31 throw err; 32 } 33} 34 35export async function disconnectRedis(): Promise<void> { 36 try { 37 await redisClient.quit(); 38 logger.info("Redis client disconnected"); 39 } catch (err) { 40 logger.error({ err }, "Error disconnecting Redis"); 41 } 42} 43 44function getPostLabelCacheKey(atURI: string, label: string): string { 45 return `post-label:${atURI}:${label}`; 46} 47 48function getAccountLabelCacheKey(did: string, label: string): string { 49 return `account-label:${did}:${label}`; 50} 51 52export async function tryClaimPostLabel( 53 atURI: string, 54 label: string, 55): Promise<boolean> { 56 try { 57 const key = getPostLabelCacheKey(atURI, label); 58 const result = await redisClient.set(key, "1", { 59 NX: true, 60 EX: 60 * 60 * 24 * 7, 61 }); 62 return result === "OK"; 63 } catch (err) { 64 logger.warn( 65 { err, atURI, label }, 66 "Error claiming post label in Redis, allowing through", 67 ); 68 return true; 69 } 70} 71 72export async function tryClaimAccountLabel( 73 did: string, 74 label: string, 75): Promise<boolean> { 76 try { 77 const key = getAccountLabelCacheKey(did, label); 78 const result = await redisClient.set(key, "1", { 79 NX: true, 80 EX: 60 * 60 * 24 * 7, 81 }); 82 return result === "OK"; 83 } catch (err) { 84 logger.warn( 85 { err, did, label }, 86 "Error claiming account label in Redis, allowing through", 87 ); 88 return true; 89 } 90} 91 92export async function deleteAccountLabelClaim( 93 did: string, 94 label: string, 95): Promise<void> { 96 try { 97 const key = getAccountLabelCacheKey(did, label); 98 await redisClient.del(key); 99 logger.debug( 100 { did, label }, 101 "Deleted account label claim from Redis cache", 102 ); 103 } catch (err) { 104 logger.warn( 105 { err, did, label }, 106 "Error deleting account label claim from Redis", 107 ); 108 } 109} 110 111export async function tryClaimAccountComment( 112 did: string, 113 atURI: string, 114): Promise<boolean> { 115 try { 116 const key = `account-comment:${did}:${atURI}`; 117 const result = await redisClient.set(key, "1", { 118 NX: true, 119 EX: 60 * 60 * 24 * 7, 120 }); 121 return result === "OK"; 122 } catch (err) { 123 logger.warn( 124 { err, did, atURI }, 125 "Error claiming account comment in Redis, allowing through", 126 ); 127 return true; 128 } 129} 130 131function windowToMicroseconds(window: number, unit: WindowUnit): number { 132 const multipliers: Record<WindowUnit, number> = { 133 minutes: 60 * 1000000, 134 hours: 60 * 60 * 1000000, 135 days: 24 * 60 * 60 * 1000000, 136 }; 137 return window * multipliers[unit]; 138} 139 140function windowToSeconds(window: number, unit: WindowUnit): number { 141 const multipliers: Record<WindowUnit, number> = { 142 minutes: 60, 143 hours: 60 * 60, 144 days: 24 * 60 * 60, 145 }; 146 return window * multipliers[unit]; 147} 148 149function getPostLabelTrackingKey( 150 did: string, 151 label: string, 152 window: number, 153 unit: WindowUnit, 154): string { 155 return `account-post-labels:${did}:${label}:${window.toString()}${unit}`; 156} 157 158function getStarterPackTrackingKey( 159 did: string, 160 window: number, 161 unit: WindowUnit, 162): string { 163 return `starterpack:threshold:${did}:${window.toString()}${unit}`; 164} 165 166export async function trackStarterPackForAccount( 167 did: string, 168 starterPackUri: string, 169 timestamp: number, 170 window: number, 171 windowUnit: WindowUnit, 172): Promise<void> { 173 try { 174 const key = getStarterPackTrackingKey(did, window, windowUnit); 175 const windowStartTime = timestamp - windowToMicroseconds(window, windowUnit); 176 177 await redisClient.zRemRangeByScore(key, "-inf", windowStartTime); 178 179 await redisClient.zAdd(key, { 180 score: timestamp, 181 value: starterPackUri, 182 }); 183 184 const ttlSeconds = windowToSeconds(window, windowUnit) + 60 * 60; 185 await redisClient.expire(key, ttlSeconds); 186 187 logger.debug( 188 { did, starterPackUri, timestamp, window, windowUnit }, 189 "Tracked starter pack for account", 190 ); 191 } catch (err) { 192 logger.error( 193 { err, did, starterPackUri, timestamp, window, windowUnit }, 194 "Error tracking starter pack in Redis", 195 ); 196 throw err; 197 } 198} 199 200export async function getStarterPackCountInWindow( 201 did: string, 202 window: number, 203 windowUnit: WindowUnit, 204 currentTime: number, 205): Promise<number> { 206 try { 207 const key = getStarterPackTrackingKey(did, window, windowUnit); 208 const windowStartTime = currentTime - windowToMicroseconds(window, windowUnit); 209 const count = await redisClient.zCount(key, windowStartTime, "+inf"); 210 211 logger.debug( 212 { did, window, windowUnit, count }, 213 "Retrieved starter pack count in window", 214 ); 215 216 return count; 217 } catch (err) { 218 logger.error( 219 { err, did, window, windowUnit }, 220 "Error getting starter pack count from Redis", 221 ); 222 throw err; 223 } 224} 225 226export async function trackPostLabelForAccount( 227 did: string, 228 label: string, 229 timestamp: number, 230 window: number, 231 windowUnit: WindowUnit, 232): Promise<void> { 233 try { 234 const key = getPostLabelTrackingKey(did, label, window, windowUnit); 235 const windowStartTime = timestamp - windowToMicroseconds(window, windowUnit); 236 237 await redisClient.zRemRangeByScore(key, "-inf", windowStartTime); 238 239 await redisClient.zAdd(key, { 240 score: timestamp, 241 value: timestamp.toString(), 242 }); 243 244 const ttlSeconds = windowToSeconds(window, windowUnit) + 60 * 60; 245 await redisClient.expire(key, ttlSeconds); 246 247 logger.debug( 248 { did, label, timestamp, window, windowUnit }, 249 "Tracked post label for account", 250 ); 251 } catch (err) { 252 logger.error( 253 { err, did, label, timestamp, window, windowUnit }, 254 "Error tracking post label in Redis", 255 ); 256 throw err; 257 } 258} 259 260export async function getPostLabelCountInWindow( 261 did: string, 262 labels: string[], 263 window: number, 264 windowUnit: WindowUnit, 265 currentTime: number, 266): Promise<number> { 267 try { 268 const windowStartTime = currentTime - windowToMicroseconds(window, windowUnit); 269 let totalCount = 0; 270 271 for (const label of labels) { 272 const key = getPostLabelTrackingKey(did, label, window, windowUnit); 273 const count = await redisClient.zCount(key, windowStartTime, "+inf"); 274 totalCount += count; 275 } 276 277 logger.debug( 278 { did, labels, window, windowUnit, totalCount }, 279 "Retrieved post label count in window", 280 ); 281 282 return totalCount; 283 } catch (err) { 284 logger.error( 285 { err, did, labels, window, windowUnit }, 286 "Error getting post label count from Redis", 287 ); 288 throw err; 289 } 290}