A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
at main 3.4 kB view raw
1import { pRateLimit } from "p-ratelimit"; 2import { Counter, Gauge, Histogram } from "prom-client"; 3import { logger } from "./logger.js"; 4 5interface RateLimitState { 6 limit: number; 7 remaining: number; 8 reset: number; // Unix timestamp in seconds 9 policy?: string; 10} 11 12// Conservative defaults based on previous static configuration 13// Will be replaced with dynamic values from ATP response headers 14let rateLimitState: RateLimitState = { 15 limit: 280, 16 remaining: 280, 17 reset: Math.floor(Date.now() / 1000) + 30, 18}; 19 20const SAFETY_BUFFER = 5; // Keep this many requests in reserve (reduced from 20) 21const CONCURRENCY = 24; // Reduced from 48 to prevent rapid depletion 22 23// Metrics 24const rateLimitWaitsTotal = new Counter({ 25 name: "rate_limit_waits_total", 26 help: "Total number of times rate limit wait was triggered", 27}); 28 29const rateLimitWaitDuration = new Histogram({ 30 name: "rate_limit_wait_duration_seconds", 31 help: "Duration of rate limit waits in seconds", 32 buckets: [0.1, 0.5, 1, 5, 10, 30, 60], 33}); 34 35const rateLimitRemaining = new Gauge({ 36 name: "rate_limit_remaining", 37 help: "Current remaining rate limit", 38}); 39 40const rateLimitTotal = new Gauge({ 41 name: "rate_limit_total", 42 help: "Total rate limit from headers", 43}); 44 45const concurrentRequestsGauge = new Gauge({ 46 name: "concurrent_requests", 47 help: "Current number of concurrent requests", 48}); 49 50// Use p-ratelimit purely for concurrency management 51const concurrencyLimiter = pRateLimit({ 52 interval: 1000, 53 rate: 10000, // Very high rate, we manage rate limiting separately 54 concurrency: CONCURRENCY, 55 maxDelay: 0, 56}); 57 58export function getRateLimitState(): RateLimitState { 59 return { ...rateLimitState }; 60} 61 62export function updateRateLimitState(state: Partial<RateLimitState>): void { 63 rateLimitState = { ...rateLimitState, ...state }; 64 65 // Update Prometheus metrics 66 if (state.remaining !== undefined) { 67 rateLimitRemaining.set(state.remaining); 68 } 69 if (state.limit !== undefined) { 70 rateLimitTotal.set(state.limit); 71 } 72 73 logger.debug( 74 { 75 limit: rateLimitState.limit, 76 remaining: rateLimitState.remaining, 77 resetIn: rateLimitState.reset - Math.floor(Date.now() / 1000), 78 }, 79 "Rate limit state updated", 80 ); 81} 82 83async function awaitRateLimit(): Promise<void> { 84 const state = getRateLimitState(); 85 const now = Math.floor(Date.now() / 1000); 86 87 // Only wait if we're critically low 88 if (state.remaining <= SAFETY_BUFFER) { 89 rateLimitWaitsTotal.inc(); 90 91 const delaySeconds = Math.max(0, state.reset - now); 92 const delayMs = delaySeconds * 1000; 93 94 if (delayMs > 0) { 95 logger.warn( 96 `Rate limit critical (${state.remaining.toString()}/${state.limit.toString()} remaining). Waiting ${delaySeconds.toString()}s until reset...`, 97 ); 98 99 const waitStart = Date.now(); 100 await new Promise((resolve) => setTimeout(resolve, delayMs)); 101 const waitDuration = (Date.now() - waitStart) / 1000; 102 rateLimitWaitDuration.observe(waitDuration); 103 104 // Don't manually reset state - let the next API response update it 105 logger.info("Rate limit wait complete, resuming requests"); 106 } 107 } 108} 109 110export async function limit<T>(fn: () => Promise<T>): Promise<T> { 111 return concurrencyLimiter(async () => { 112 concurrentRequestsGauge.inc(); 113 try { 114 await awaitRateLimit(); 115 return await fn(); 116 } finally { 117 concurrentRequestsGauge.dec(); 118 } 119 }); 120}