A tool for tailing a labelers' firehose, rehydrating, and storing records for future analysis of moderation decisions.

Implement Phase 5: Rate limiting and retry logic

- Add token bucket rate limiter with configurable refill
- Implement multi-endpoint rate limiting
- Create retry utility with exponential backoff
- Add retryable error detection (rate limit, network, server errors)
- All tests passing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

Changed files
+188
src
+93
src/utils/rate-limit.ts
··· 1 + import { logger } from "../logger/index.js"; 2 + 3 + export interface RateLimiterConfig { 4 + maxTokens: number; 5 + refillRate: number; 6 + refillInterval: number; 7 + } 8 + 9 + export class RateLimiter { 10 + private tokens: number; 11 + private lastRefill: number; 12 + private config: RateLimiterConfig; 13 + 14 + constructor(config: RateLimiterConfig) { 15 + this.config = config; 16 + this.tokens = config.maxTokens; 17 + this.lastRefill = Date.now(); 18 + } 19 + 20 + private refill(): void { 21 + const now = Date.now(); 22 + const elapsed = now - this.lastRefill; 23 + const intervals = Math.floor(elapsed / this.config.refillInterval); 24 + 25 + if (intervals > 0) { 26 + this.tokens = Math.min( 27 + this.config.maxTokens, 28 + this.tokens + intervals * this.config.refillRate 29 + ); 30 + this.lastRefill = now; 31 + } 32 + } 33 + 34 + async acquire(tokens: number = 1): Promise<void> { 35 + while (true) { 36 + this.refill(); 37 + 38 + if (this.tokens >= tokens) { 39 + this.tokens -= tokens; 40 + return; 41 + } 42 + 43 + const waitTime = this.config.refillInterval; 44 + logger.debug( 45 + { tokens, available: this.tokens, waitTime }, 46 + "Rate limit reached, waiting" 47 + ); 48 + 49 + await new Promise((resolve) => setTimeout(resolve, waitTime)); 50 + } 51 + } 52 + 53 + getAvailableTokens(): number { 54 + this.refill(); 55 + return this.tokens; 56 + } 57 + 58 + reset(): void { 59 + this.tokens = this.config.maxTokens; 60 + this.lastRefill = Date.now(); 61 + } 62 + } 63 + 64 + export class MultiRateLimiter { 65 + private limiters: Map<string, RateLimiter> = new Map(); 66 + 67 + constructor( 68 + private defaultConfig: RateLimiterConfig 69 + ) {} 70 + 71 + setLimiter(key: string, config: RateLimiterConfig): void { 72 + this.limiters.set(key, new RateLimiter(config)); 73 + } 74 + 75 + async acquire(key: string, tokens: number = 1): Promise<void> { 76 + let limiter = this.limiters.get(key); 77 + if (!limiter) { 78 + limiter = new RateLimiter(this.defaultConfig); 79 + this.limiters.set(key, limiter); 80 + } 81 + await limiter.acquire(tokens); 82 + } 83 + 84 + reset(key?: string): void { 85 + if (key) { 86 + this.limiters.get(key)?.reset(); 87 + } else { 88 + for (const limiter of this.limiters.values()) { 89 + limiter.reset(); 90 + } 91 + } 92 + } 93 + }
+95
src/utils/retry.ts
··· 1 + import { logger } from "../logger/index.js"; 2 + 3 + export interface RetryConfig { 4 + maxAttempts: number; 5 + initialDelay: number; 6 + maxDelay: number; 7 + backoffMultiplier: number; 8 + retryableErrors?: ((error: any) => boolean)[]; 9 + } 10 + 11 + export class RetryError extends Error { 12 + constructor( 13 + message: string, 14 + public attempts: number, 15 + public lastError: Error 16 + ) { 17 + super(message); 18 + this.name = "RetryError"; 19 + } 20 + } 21 + 22 + export async function withRetry<T>( 23 + fn: () => Promise<T>, 24 + config: Partial<RetryConfig> = {} 25 + ): Promise<T> { 26 + const { 27 + maxAttempts = 3, 28 + initialDelay = 1000, 29 + maxDelay = 30000, 30 + backoffMultiplier = 2, 31 + retryableErrors = [(error) => true], 32 + } = config; 33 + 34 + let lastError: Error; 35 + let delay = initialDelay; 36 + 37 + for (let attempt = 1; attempt <= maxAttempts; attempt++) { 38 + try { 39 + return await fn(); 40 + } catch (error) { 41 + lastError = error instanceof Error ? error : new Error(String(error)); 42 + 43 + const isRetryable = retryableErrors.some((check) => check(lastError)); 44 + 45 + if (!isRetryable || attempt >= maxAttempts) { 46 + throw new RetryError( 47 + `Operation failed after ${attempt} attempts`, 48 + attempt, 49 + lastError 50 + ); 51 + } 52 + 53 + logger.warn( 54 + { 55 + attempt, 56 + maxAttempts, 57 + delay, 58 + error: lastError.message, 59 + }, 60 + "Retrying after error" 61 + ); 62 + 63 + await new Promise((resolve) => setTimeout(resolve, delay)); 64 + delay = Math.min(delay * backoffMultiplier, maxDelay); 65 + } 66 + } 67 + 68 + throw new RetryError( 69 + `Operation failed after ${maxAttempts} attempts`, 70 + maxAttempts, 71 + lastError! 72 + ); 73 + } 74 + 75 + export function isRateLimitError(error: any): boolean { 76 + return ( 77 + error?.status === 429 || 78 + error?.message?.toLowerCase().includes("rate limit") || 79 + error?.message?.toLowerCase().includes("too many requests") 80 + ); 81 + } 82 + 83 + export function isNetworkError(error: any): boolean { 84 + return ( 85 + error?.code === "ECONNRESET" || 86 + error?.code === "ENOTFOUND" || 87 + error?.code === "ETIMEDOUT" || 88 + error?.message?.toLowerCase().includes("network") || 89 + error?.message?.toLowerCase().includes("timeout") 90 + ); 91 + } 92 + 93 + export function isServerError(error: any): boolean { 94 + return error?.status >= 500 && error?.status < 600; 95 + }