A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules
1import { Agent, setGlobalDispatcher } from "undici";
2import { AtpAgent } from "@atproto/api";
3import { BSKY_HANDLE, BSKY_PASSWORD, OZONE_PDS } from "./config.js";
4import { updateRateLimitState } from "./limits.js";
5import { logger } from "./logger.js";
6import { type SessionData, loadSession, saveSession } from "./session.js";
7
8setGlobalDispatcher(
9 new Agent({
10 connect: { timeout: 20_000 },
11 keepAliveTimeout: 10_000,
12 keepAliveMaxTimeout: 20_000,
13 }),
14);
15
16const customFetch: typeof fetch = async (input, init) => {
17 const response = await fetch(input, init);
18
19 // Extract rate limit headers from ATP responses
20 const limitHeader = response.headers.get("ratelimit-limit");
21 const remainingHeader = response.headers.get("ratelimit-remaining");
22 const resetHeader = response.headers.get("ratelimit-reset");
23 const policyHeader = response.headers.get("ratelimit-policy");
24
25 if (limitHeader && remainingHeader && resetHeader) {
26 updateRateLimitState({
27 limit: parseInt(limitHeader, 10),
28 remaining: parseInt(remainingHeader, 10),
29 reset: parseInt(resetHeader, 10),
30 policy: policyHeader ?? undefined,
31 });
32 }
33
34 return response;
35};
36
37export const agent = new AtpAgent({
38 service: `https://${OZONE_PDS}`,
39 fetch: customFetch,
40});
41
42const JWT_LIFETIME_MS = 2 * 60 * 60 * 1000; // 2 hours (typical ATP JWT lifetime)
43const REFRESH_AT_PERCENT = 0.8; // Refresh at 80% of lifetime
44let refreshTimer: NodeJS.Timeout | null = null;
45
46async function refreshSession(): Promise<void> {
47 try {
48 logger.info("Refreshing session tokens");
49 if (!agent.session) {
50 throw new Error("No active session to refresh");
51 }
52 await agent.resumeSession(agent.session);
53
54 saveSession(agent.session as SessionData);
55 scheduleSessionRefresh();
56 } catch (error: unknown) {
57 logger.error({ error }, "Failed to refresh session, will re-authenticate");
58 await performLogin();
59 }
60}
61
62function scheduleSessionRefresh(): void {
63 if (refreshTimer) {
64 clearTimeout(refreshTimer);
65 }
66
67 const refreshIn = JWT_LIFETIME_MS * REFRESH_AT_PERCENT;
68 logger.debug(
69 `Scheduling session refresh in ${(refreshIn / 1000 / 60).toFixed(1)} minutes`,
70 );
71
72 refreshTimer = setTimeout(() => {
73 refreshSession().catch((error: unknown) => {
74 logger.error({ error }, "Scheduled session refresh failed");
75 });
76 }, refreshIn);
77}
78
79async function performLogin(): Promise<boolean> {
80 try {
81 logger.info("Performing fresh login");
82 const response = await agent.login({
83 identifier: BSKY_HANDLE,
84 password: BSKY_PASSWORD,
85 });
86
87 if (response.success && agent.session) {
88 saveSession(agent.session as SessionData);
89 scheduleSessionRefresh();
90 logger.info("Login successful, session saved");
91 return true;
92 }
93
94 logger.error("Login failed: no session returned");
95 return false;
96 } catch (error) {
97 logger.error({ error }, "Login failed");
98 return false;
99 }
100}
101
102const MAX_LOGIN_RETRIES = 3;
103const RETRY_DELAY_MS = 2000;
104
105let loginPromise: Promise<void> | null = null;
106
107async function sleep(ms: number): Promise<void> {
108 return new Promise((resolve) => setTimeout(resolve, ms));
109}
110
111async function authenticate(): Promise<boolean> {
112 const savedSession = loadSession();
113
114 if (savedSession) {
115 try {
116 logger.info("Attempting to resume saved session");
117 await agent.resumeSession(savedSession);
118
119 // Verify session is still valid with a lightweight call
120 await agent.getProfile({ actor: savedSession.did });
121
122 logger.info("Session resumed successfully");
123 scheduleSessionRefresh();
124 return true;
125 } catch (error) {
126 logger.warn({ error }, "Saved session invalid, will re-authenticate");
127 }
128 }
129
130 return performLogin();
131}
132
133async function authenticateWithRetry(): Promise<void> {
134 // Reuse existing login attempt if one is in progress
135 if (loginPromise) {
136 return loginPromise;
137 }
138
139 loginPromise = (async () => {
140 for (let attempt = 1; attempt <= MAX_LOGIN_RETRIES; attempt++) {
141 logger.info(
142 { attempt, maxRetries: MAX_LOGIN_RETRIES },
143 "Attempting login",
144 );
145
146 const success = await authenticate();
147
148 if (success) {
149 logger.info("Authentication successful");
150 return;
151 }
152
153 if (attempt < MAX_LOGIN_RETRIES) {
154 logger.warn(
155 { attempt, maxRetries: MAX_LOGIN_RETRIES, retryInMs: RETRY_DELAY_MS },
156 "Login failed, retrying",
157 );
158 await sleep(RETRY_DELAY_MS);
159 }
160 }
161
162 logger.error(
163 { maxRetries: MAX_LOGIN_RETRIES },
164 "All login attempts failed, aborting",
165 );
166 process.exit(1);
167 })();
168
169 return loginPromise;
170}
171
172export const login = authenticateWithRetry;
173
174// Lazy getter for isLoggedIn - authentication only starts when first accessed
175let _isLoggedIn: Promise<boolean> | null = null;
176
177export function getIsLoggedIn(): Promise<boolean> {
178 if (!_isLoggedIn) {
179 _isLoggedIn = authenticateWithRetry().then(() => true);
180 }
181 return _isLoggedIn;
182}
183
184// For backward compatibility - callers can still use `await isLoggedIn`
185// but authentication is now lazy instead of eager
186export const isLoggedIn = {
187 then<T>(onFulfilled: (value: boolean) => T | PromiseLike<T>): Promise<T> {
188 return getIsLoggedIn().then(onFulfilled);
189 },
190};