A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules

feat: Add session management and rate limiting

- Stores and restores session for persistent login - Implements dynamic
rate limiting from ATP headers - Adds Prometheus metrics for rate limit
state and waits - Refactors authentication flow for session persistence

Skywatch 5a9384f5 1d327c74

Changed files
+285 -15
src
+1
compose.yaml
··· 54 54 # after a restart, preventing it from reprocessing old events or skipping new ones. 55 55 volumes: 56 56 - ./cursor.txt:/app/cursor.txt 57 + - ./.session:/app/.session 57 58 58 59 environment: 59 60 - NODE_ENV=production
+107 -7
src/agent.ts
··· 1 1 import { Agent, setGlobalDispatcher } from "undici"; 2 2 import { AtpAgent } from "@atproto/api"; 3 3 import { BSKY_HANDLE, BSKY_PASSWORD, OZONE_PDS } from "./config.js"; 4 + import { loadSession, saveSession, type SessionData } from "./session.js"; 5 + import { updateRateLimitState } from "./limits.js"; 6 + import { logger } from "./logger.js"; 4 7 5 8 setGlobalDispatcher(new Agent({ connect: { timeout: 20_000 } })); 6 9 10 + const customFetch: typeof fetch = async (input, init) => { 11 + const response = await fetch(input, init); 12 + 13 + // Extract rate limit headers from ATP responses 14 + const limitHeader = response.headers.get("ratelimit-limit"); 15 + const remainingHeader = response.headers.get("ratelimit-remaining"); 16 + const resetHeader = response.headers.get("ratelimit-reset"); 17 + const policyHeader = response.headers.get("ratelimit-policy"); 18 + 19 + if (limitHeader && remainingHeader && resetHeader) { 20 + updateRateLimitState({ 21 + limit: parseInt(limitHeader, 10), 22 + remaining: parseInt(remainingHeader, 10), 23 + reset: parseInt(resetHeader, 10), 24 + policy: policyHeader || undefined, 25 + }); 26 + } 27 + 28 + return response; 29 + }; 30 + 7 31 export const agent = new AtpAgent({ 8 32 service: `https://${OZONE_PDS}`, 33 + fetch: customFetch, 9 34 }); 10 - export const login = () => 11 - agent.login({ 12 - identifier: BSKY_HANDLE, 13 - password: BSKY_PASSWORD, 14 - }); 35 + 36 + const JWT_LIFETIME_MS = 2 * 60 * 60 * 1000; // 2 hours (typical ATP JWT lifetime) 37 + const REFRESH_AT_PERCENT = 0.8; // Refresh at 80% of lifetime 38 + let refreshTimer: NodeJS.Timeout | null = null; 15 39 16 - export const isLoggedIn = login() 17 - .then(() => true) 40 + async function refreshSession(): Promise<void> { 41 + try { 42 + logger.info("Refreshing session tokens"); 43 + await agent.resumeSession(agent.session!); 44 + 45 + if (agent.session) { 46 + saveSession(agent.session as SessionData); 47 + scheduleSessionRefresh(); 48 + } 49 + } catch (error) { 50 + logger.error({ error }, "Failed to refresh session, will re-authenticate"); 51 + await performLogin(); 52 + } 53 + } 54 + 55 + function scheduleSessionRefresh(): void { 56 + if (refreshTimer) { 57 + clearTimeout(refreshTimer); 58 + } 59 + 60 + const refreshIn = JWT_LIFETIME_MS * REFRESH_AT_PERCENT; 61 + logger.debug(`Scheduling session refresh in ${(refreshIn / 1000 / 60).toFixed(1)} minutes`); 62 + 63 + refreshTimer = setTimeout(() => { 64 + refreshSession().catch((error) => { 65 + logger.error({ error }, "Scheduled session refresh failed"); 66 + }); 67 + }, refreshIn); 68 + } 69 + 70 + async function performLogin(): Promise<boolean> { 71 + try { 72 + logger.info("Performing fresh login"); 73 + const response = await agent.login({ 74 + identifier: BSKY_HANDLE, 75 + password: BSKY_PASSWORD, 76 + }); 77 + 78 + if (response.success && agent.session) { 79 + saveSession(agent.session as SessionData); 80 + scheduleSessionRefresh(); 81 + logger.info("Login successful, session saved"); 82 + return true; 83 + } 84 + 85 + logger.error("Login failed: no session returned"); 86 + return false; 87 + } catch (error) { 88 + logger.error({ error }, "Login failed"); 89 + return false; 90 + } 91 + } 92 + 93 + async function authenticate(): Promise<boolean> { 94 + const savedSession = loadSession(); 95 + 96 + if (savedSession) { 97 + try { 98 + logger.info("Attempting to resume saved session"); 99 + await agent.resumeSession(savedSession); 100 + 101 + // Verify session is still valid with a lightweight call 102 + await agent.getProfile({ actor: savedSession.did }); 103 + 104 + logger.info("Session resumed successfully"); 105 + scheduleSessionRefresh(); 106 + return true; 107 + } catch (error) { 108 + logger.warn({ error }, "Saved session invalid, will re-authenticate"); 109 + } 110 + } 111 + 112 + return performLogin(); 113 + } 114 + 115 + export const login = authenticate; 116 + export const isLoggedIn = authenticate() 117 + .then((success) => success) 18 118 .catch(() => false);
+115 -8
src/limits.ts
··· 1 1 import { pRateLimit } from "p-ratelimit"; 2 + import { logger } from "./logger.js"; 3 + import { Counter, Gauge, Histogram } from "prom-client"; 2 4 3 - // TypeScript 5 + interface RateLimitState { 6 + limit: number; 7 + remaining: number; 8 + reset: number; // Unix timestamp in seconds 9 + policy?: string; 10 + } 11 + 12 + // Conservative defaults based on previous static configuration 13 + // Will be replaced with dynamic values from ATP response headers 14 + let rateLimitState: RateLimitState = { 15 + limit: 280, 16 + remaining: 280, 17 + reset: Math.floor(Date.now() / 1000) + 30, 18 + }; 4 19 5 - // create a rate limiter that allows up to 30 API calls per second, 6 - // with max concurrency of 10 20 + const SAFETY_BUFFER = 5; // Keep this many requests in reserve (reduced from 20) 21 + const CONCURRENCY = 24; // Reduced from 48 to prevent rapid depletion 7 22 8 - export const limit = pRateLimit({ 9 - interval: 30000, // 1000 ms == 1 second 10 - rate: 280, // 30 API calls per interval 11 - concurrency: 48, // no more than 10 running at once 12 - maxDelay: 0, // an API call delayed > 30 sec is rejected 23 + // Metrics 24 + const rateLimitWaitsTotal = new Counter({ 25 + name: "rate_limit_waits_total", 26 + help: "Total number of times rate limit wait was triggered", 13 27 }); 28 + 29 + const rateLimitWaitDuration = new Histogram({ 30 + name: "rate_limit_wait_duration_seconds", 31 + help: "Duration of rate limit waits in seconds", 32 + buckets: [0.1, 0.5, 1, 5, 10, 30, 60], 33 + }); 34 + 35 + const rateLimitRemaining = new Gauge({ 36 + name: "rate_limit_remaining", 37 + help: "Current remaining rate limit", 38 + }); 39 + 40 + const rateLimitTotal = new Gauge({ 41 + name: "rate_limit_total", 42 + help: "Total rate limit from headers", 43 + }); 44 + 45 + const concurrentRequestsGauge = new Gauge({ 46 + name: "concurrent_requests", 47 + help: "Current number of concurrent requests", 48 + }); 49 + 50 + // Use p-ratelimit purely for concurrency management 51 + const concurrencyLimiter = pRateLimit({ 52 + interval: 1000, 53 + rate: 10000, // Very high rate, we manage rate limiting separately 54 + concurrency: CONCURRENCY, 55 + maxDelay: 0, 56 + }); 57 + 58 + export function getRateLimitState(): RateLimitState { 59 + return { ...rateLimitState }; 60 + } 61 + 62 + export function updateRateLimitState(state: Partial<RateLimitState>): void { 63 + rateLimitState = { ...rateLimitState, ...state }; 64 + 65 + // Update Prometheus metrics 66 + if (state.remaining !== undefined) { 67 + rateLimitRemaining.set(state.remaining); 68 + } 69 + if (state.limit !== undefined) { 70 + rateLimitTotal.set(state.limit); 71 + } 72 + 73 + logger.debug( 74 + { 75 + limit: rateLimitState.limit, 76 + remaining: rateLimitState.remaining, 77 + resetIn: rateLimitState.reset - Math.floor(Date.now() / 1000), 78 + }, 79 + "Rate limit state updated" 80 + ); 81 + } 82 + 83 + async function awaitRateLimit(): Promise<void> { 84 + const state = getRateLimitState(); 85 + const now = Math.floor(Date.now() / 1000); 86 + 87 + // Only wait if we're critically low 88 + if (state.remaining <= SAFETY_BUFFER) { 89 + rateLimitWaitsTotal.inc(); 90 + 91 + const delaySeconds = Math.max(0, state.reset - now); 92 + const delayMs = delaySeconds * 1000; 93 + 94 + if (delayMs > 0) { 95 + logger.warn( 96 + `Rate limit critical (${state.remaining}/${state.limit} remaining). Waiting ${delaySeconds}s until reset...` 97 + ); 98 + 99 + const waitStart = Date.now(); 100 + await new Promise((resolve) => setTimeout(resolve, delayMs)); 101 + const waitDuration = (Date.now() - waitStart) / 1000; 102 + rateLimitWaitDuration.observe(waitDuration); 103 + 104 + // Don't manually reset state - let the next API response update it 105 + logger.info("Rate limit wait complete, resuming requests"); 106 + } 107 + } 108 + } 109 + 110 + export async function limit<T>(fn: () => Promise<T>): Promise<T> { 111 + return concurrencyLimiter(async () => { 112 + concurrentRequestsGauge.inc(); 113 + try { 114 + await awaitRateLimit(); 115 + return await fn(); 116 + } finally { 117 + concurrentRequestsGauge.dec(); 118 + } 119 + }); 120 + }
+62
src/session.ts
··· 1 + import { readFileSync, writeFileSync, unlinkSync, chmodSync, existsSync } from "node:fs"; 2 + import { join } from "node:path"; 3 + import { logger } from "./logger.js"; 4 + 5 + const SESSION_FILE_PATH = join(process.cwd(), ".session"); 6 + 7 + export interface SessionData { 8 + accessJwt: string; 9 + refreshJwt: string; 10 + did: string; 11 + handle: string; 12 + email?: string; 13 + emailConfirmed?: boolean; 14 + emailAuthFactor?: boolean; 15 + active: boolean; 16 + status?: string; 17 + } 18 + 19 + export function loadSession(): SessionData | null { 20 + try { 21 + if (!existsSync(SESSION_FILE_PATH)) { 22 + logger.debug("No session file found"); 23 + return null; 24 + } 25 + 26 + const data = readFileSync(SESSION_FILE_PATH, "utf-8"); 27 + const session = JSON.parse(data) as SessionData; 28 + 29 + if (!session.accessJwt || !session.refreshJwt || !session.did) { 30 + logger.warn("Session file is missing required fields, ignoring"); 31 + return null; 32 + } 33 + 34 + logger.info("Loaded existing session from file"); 35 + return session; 36 + } catch (error) { 37 + logger.error({ error }, "Failed to load session file, will authenticate fresh"); 38 + return null; 39 + } 40 + } 41 + 42 + export function saveSession(session: SessionData): void { 43 + try { 44 + const data = JSON.stringify(session, null, 2); 45 + writeFileSync(SESSION_FILE_PATH, data, "utf-8"); 46 + chmodSync(SESSION_FILE_PATH, 0o600); 47 + logger.info("Session saved to file"); 48 + } catch (error) { 49 + logger.error({ error }, "Failed to save session to file"); 50 + } 51 + } 52 + 53 + export function clearSession(): void { 54 + try { 55 + if (existsSync(SESSION_FILE_PATH)) { 56 + unlinkSync(SESSION_FILE_PATH); 57 + logger.info("Session file cleared"); 58 + } 59 + } catch (error) { 60 + logger.error({ error }, "Failed to clear session file"); 61 + } 62 + }