A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
at main 5.3 kB view raw
1import { Agent, setGlobalDispatcher } from "undici"; 2import { AtpAgent } from "@atproto/api"; 3import { BSKY_HANDLE, BSKY_PASSWORD, OZONE_PDS } from "./config.js"; 4import { updateRateLimitState } from "./limits.js"; 5import { logger } from "./logger.js"; 6import { type SessionData, loadSession, saveSession } from "./session.js"; 7 8setGlobalDispatcher( 9 new Agent({ 10 connect: { timeout: 20_000 }, 11 keepAliveTimeout: 10_000, 12 keepAliveMaxTimeout: 20_000, 13 }), 14); 15 16const customFetch: typeof fetch = async (input, init) => { 17 const response = await fetch(input, init); 18 19 // Extract rate limit headers from ATP responses 20 const limitHeader = response.headers.get("ratelimit-limit"); 21 const remainingHeader = response.headers.get("ratelimit-remaining"); 22 const resetHeader = response.headers.get("ratelimit-reset"); 23 const policyHeader = response.headers.get("ratelimit-policy"); 24 25 if (limitHeader && remainingHeader && resetHeader) { 26 updateRateLimitState({ 27 limit: parseInt(limitHeader, 10), 28 remaining: parseInt(remainingHeader, 10), 29 reset: parseInt(resetHeader, 10), 30 policy: policyHeader ?? undefined, 31 }); 32 } 33 34 return response; 35}; 36 37export const agent = new AtpAgent({ 38 service: `https://${OZONE_PDS}`, 39 fetch: customFetch, 40}); 41 42const JWT_LIFETIME_MS = 2 * 60 * 60 * 1000; // 2 hours (typical ATP JWT lifetime) 43const REFRESH_AT_PERCENT = 0.8; // Refresh at 80% of lifetime 44let refreshTimer: NodeJS.Timeout | null = null; 45 46async function refreshSession(): Promise<void> { 47 try { 48 logger.info("Refreshing session tokens"); 49 if (!agent.session) { 50 throw new Error("No active session to refresh"); 51 } 52 await agent.resumeSession(agent.session); 53 54 saveSession(agent.session as SessionData); 55 scheduleSessionRefresh(); 56 } catch (error: unknown) { 57 logger.error({ error }, "Failed to refresh session, will re-authenticate"); 58 await performLogin(); 59 } 60} 61 62function scheduleSessionRefresh(): void { 63 if (refreshTimer) { 64 clearTimeout(refreshTimer); 65 } 66 67 const refreshIn = JWT_LIFETIME_MS * REFRESH_AT_PERCENT; 68 logger.debug( 69 `Scheduling session refresh in ${(refreshIn / 1000 / 60).toFixed(1)} minutes`, 70 ); 71 72 refreshTimer = setTimeout(() => { 73 refreshSession().catch((error: unknown) => { 74 logger.error({ error }, "Scheduled session refresh failed"); 75 }); 76 }, refreshIn); 77} 78 79async function performLogin(): Promise<boolean> { 80 try { 81 logger.info("Performing fresh login"); 82 const response = await agent.login({ 83 identifier: BSKY_HANDLE, 84 password: BSKY_PASSWORD, 85 }); 86 87 if (response.success && agent.session) { 88 saveSession(agent.session as SessionData); 89 scheduleSessionRefresh(); 90 logger.info("Login successful, session saved"); 91 return true; 92 } 93 94 logger.error("Login failed: no session returned"); 95 return false; 96 } catch (error) { 97 logger.error({ error }, "Login failed"); 98 return false; 99 } 100} 101 102const MAX_LOGIN_RETRIES = 3; 103const RETRY_DELAY_MS = 2000; 104 105let loginPromise: Promise<void> | null = null; 106 107async function sleep(ms: number): Promise<void> { 108 return new Promise((resolve) => setTimeout(resolve, ms)); 109} 110 111async function authenticate(): Promise<boolean> { 112 const savedSession = loadSession(); 113 114 if (savedSession) { 115 try { 116 logger.info("Attempting to resume saved session"); 117 await agent.resumeSession(savedSession); 118 119 // Verify session is still valid with a lightweight call 120 await agent.getProfile({ actor: savedSession.did }); 121 122 logger.info("Session resumed successfully"); 123 scheduleSessionRefresh(); 124 return true; 125 } catch (error) { 126 logger.warn({ error }, "Saved session invalid, will re-authenticate"); 127 } 128 } 129 130 return performLogin(); 131} 132 133async function authenticateWithRetry(): Promise<void> { 134 // Reuse existing login attempt if one is in progress 135 if (loginPromise) { 136 return loginPromise; 137 } 138 139 loginPromise = (async () => { 140 for (let attempt = 1; attempt <= MAX_LOGIN_RETRIES; attempt++) { 141 logger.info( 142 { attempt, maxRetries: MAX_LOGIN_RETRIES }, 143 "Attempting login", 144 ); 145 146 const success = await authenticate(); 147 148 if (success) { 149 logger.info("Authentication successful"); 150 return; 151 } 152 153 if (attempt < MAX_LOGIN_RETRIES) { 154 logger.warn( 155 { attempt, maxRetries: MAX_LOGIN_RETRIES, retryInMs: RETRY_DELAY_MS }, 156 "Login failed, retrying", 157 ); 158 await sleep(RETRY_DELAY_MS); 159 } 160 } 161 162 logger.error( 163 { maxRetries: MAX_LOGIN_RETRIES }, 164 "All login attempts failed, aborting", 165 ); 166 process.exit(1); 167 })(); 168 169 return loginPromise; 170} 171 172export const login = authenticateWithRetry; 173 174// Lazy getter for isLoggedIn - authentication only starts when first accessed 175let _isLoggedIn: Promise<boolean> | null = null; 176 177export function getIsLoggedIn(): Promise<boolean> { 178 if (!_isLoggedIn) { 179 _isLoggedIn = authenticateWithRetry().then(() => true); 180 } 181 return _isLoggedIn; 182} 183 184// For backward compatibility - callers can still use `await isLoggedIn` 185// but authentication is now lazy instead of eager 186export const isLoggedIn = { 187 then<T>(onFulfilled: (value: boolean) => T | PromiseLike<T>): Promise<T> { 188 return getIsLoggedIn().then(onFulfilled); 189 }, 190};