A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
1import { pRateLimit } from "p-ratelimit";
2import { Counter, Gauge, Histogram } from "prom-client";
3import { logger } from "./logger.js";
4
5interface RateLimitState {
6 limit: number;
7 remaining: number;
8 reset: number; // Unix timestamp in seconds
9 policy?: string;
10}
11
12// Conservative defaults based on previous static configuration
13// Will be replaced with dynamic values from ATP response headers
14let rateLimitState: RateLimitState = {
15 limit: 280,
16 remaining: 280,
17 reset: Math.floor(Date.now() / 1000) + 30,
18};
19
20const SAFETY_BUFFER = 5; // Keep this many requests in reserve (reduced from 20)
21const CONCURRENCY = 24; // Reduced from 48 to prevent rapid depletion
22
23// Metrics
24const rateLimitWaitsTotal = new Counter({
25 name: "rate_limit_waits_total",
26 help: "Total number of times rate limit wait was triggered",
27});
28
29const rateLimitWaitDuration = new Histogram({
30 name: "rate_limit_wait_duration_seconds",
31 help: "Duration of rate limit waits in seconds",
32 buckets: [0.1, 0.5, 1, 5, 10, 30, 60],
33});
34
35const rateLimitRemaining = new Gauge({
36 name: "rate_limit_remaining",
37 help: "Current remaining rate limit",
38});
39
40const rateLimitTotal = new Gauge({
41 name: "rate_limit_total",
42 help: "Total rate limit from headers",
43});
44
45const concurrentRequestsGauge = new Gauge({
46 name: "concurrent_requests",
47 help: "Current number of concurrent requests",
48});
49
50// Use p-ratelimit purely for concurrency management
51const concurrencyLimiter = pRateLimit({
52 interval: 1000,
53 rate: 10000, // Very high rate, we manage rate limiting separately
54 concurrency: CONCURRENCY,
55 maxDelay: 0,
56});
57
58export function getRateLimitState(): RateLimitState {
59 return { ...rateLimitState };
60}
61
62export function updateRateLimitState(state: Partial<RateLimitState>): void {
63 rateLimitState = { ...rateLimitState, ...state };
64
65 // Update Prometheus metrics
66 if (state.remaining !== undefined) {
67 rateLimitRemaining.set(state.remaining);
68 }
69 if (state.limit !== undefined) {
70 rateLimitTotal.set(state.limit);
71 }
72
73 logger.debug(
74 {
75 limit: rateLimitState.limit,
76 remaining: rateLimitState.remaining,
77 resetIn: rateLimitState.reset - Math.floor(Date.now() / 1000),
78 },
79 "Rate limit state updated",
80 );
81}
82
83async function awaitRateLimit(): Promise<void> {
84 const state = getRateLimitState();
85 const now = Math.floor(Date.now() / 1000);
86
87 // Only wait if we're critically low
88 if (state.remaining <= SAFETY_BUFFER) {
89 rateLimitWaitsTotal.inc();
90
91 const delaySeconds = Math.max(0, state.reset - now);
92 const delayMs = delaySeconds * 1000;
93
94 if (delayMs > 0) {
95 logger.warn(
96 `Rate limit critical (${state.remaining.toString()}/${state.limit.toString()} remaining). Waiting ${delaySeconds.toString()}s until reset...`,
97 );
98
99 const waitStart = Date.now();
100 await new Promise((resolve) => setTimeout(resolve, delayMs));
101 const waitDuration = (Date.now() - waitStart) / 1000;
102 rateLimitWaitDuration.observe(waitDuration);
103
104 // Don't manually reset state - let the next API response update it
105 logger.info("Rate limit wait complete, resuming requests");
106 }
107 }
108}
109
110export async function limit<T>(fn: () => Promise<T>): Promise<T> {
111 return concurrencyLimiter(async () => {
112 concurrentRequestsGauge.inc();
113 try {
114 await awaitRateLimit();
115 return await fn();
116 } finally {
117 concurrentRequestsGauge.dec();
118 }
119 });
120}