A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
limits.ts#
This module implements a sophisticated rate limiting and concurrency management system for all outgoing API calls. Its primary goal is to ensure the application respects the API rate limits imposed by the Bluesky PDS while maximizing throughput.
State Management#
rateLimitState: A module-level object that holds the current understanding of the API rate limit. It includes:limit: The total number of requests allowed in the current window.remaining: The number of requests left in the current window.reset: A Unix timestamp (in seconds) indicating when the window will reset.policy: The rate limit policy string from the API header.
- It is initialized with conservative default values but is designed to be dynamically updated by the
updateRateLimitStatefunction.
Constants#
SAFETY_BUFFER: The number of requests to keep in reserve. The system will pause and wait for the rate limit window to reset only when theremainingcount drops to this level.CONCURRENCY: The maximum number of API requests that can be in-flight simultaneously.
Metrics#
This module is heavily instrumented with Prometheus metrics to provide visibility into its performance:
rateLimitWaitsTotal(Counter): Counts how many times the system had to pause due to hitting theSAFETY_BUFFER.rateLimitWaitDuration(Histogram): Measures the duration of each pause.rateLimitRemaining(Gauge): Tracks the current value ofrateLimitState.remaining.rateLimitTotal(Gauge): Tracks the current value ofrateLimitState.limit.concurrentRequestsGauge(Gauge): Shows the number of currently active, concurrent API requests.
Key Functions#
updateRateLimitState(state: Partial<RateLimitState>): void#
- Purpose: To update the module's internal
rateLimitState. - Usage: This function is called by the
customFetchwrapper inagent.tsevery time an API response with rate limit headers is received. - Logic: It merges the new partial state with the existing state and updates the corresponding Prometheus gauges.
awaitRateLimit(): Promise<void>#
- Purpose: To pause execution if the rate limit is critically low.
- Logic:
- It checks if
rateLimitState.remainingis less than or equal to theSAFETY_BUFFER. - If it is, it calculates the time remaining until the next
reset. - It then waits for that duration using a
setTimeoutwrapped in aPromise. - It logs the wait and records metrics about the wait duration.
- It checks if
limit<T>(fn: () => Promise<T>): Promise<T>#
- Purpose: This is the main exported function used to wrap all API calls. It manages both concurrency and rate limiting.
- Usage: Every function that makes an API call (e.g., in
accountModeration.tsandmoderation.ts) is wrapped inlimit(async () => { ... }). - Logic:
- Concurrency: It uses the
p-ratelimitlibrary (concurrencyLimiter) to ensure that no more thanCONCURRENCYinstances of the wrapped function can run at the same time. - Rate Limiting: Once a "slot" in the concurrency limiter is acquired, it calls
await awaitRateLimit()to potentially pause if the API rate limit is nearly exhausted. - Execution: After the potential pause, it executes the provided function
fn. - Metrics: It increments and decrements the
concurrentRequestsGaugeto track the number of active requests.
- Concurrency: It uses the
Dependencies#
p-ratelimit: A library used to control the concurrency of promise-based functions.prom-client: For creating and managing Prometheus metrics../logger.js: For logging warnings and debug information.