A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules

feat: Add retry logic to authentication

The authentication flow was updated to include retry logic. This ensures
that the application attempts to log in multiple times if the initial
attempts fail. This also addresses an issue where the app was exiting
before the Jetstream client was started.

Skywatch a084c6a7 9fa51480

+2 -1
.claude/settings.local.json
··· 12 "mcp__git-mcp-server__git_set_working_dir", 13 "Bash(npm run test:run:*)", 14 "Bash(bunx eslint:*)", 15 - "Bash(bun test:run:*)" 16 ], 17 "deny": [], 18 "ask": []
··· 12 "mcp__git-mcp-server__git_set_working_dir", 13 "Bash(npm run test:run:*)", 14 "Bash(bunx eslint:*)", 15 + "Bash(bun test:run:*)", 16 + "Bash(bun run type-check:*)" 17 ], 18 "deny": [], 19 "ask": []
+1
.gitignore
··· 5 labels.db* 6 .DS_Store 7 coverage/
··· 5 labels.db* 6 .DS_Store 7 coverage/ 8 + .session
-10
.session
··· 1 - { 2 - "accessJwt": "eyJ0eXAiOiJhdCtqd3QiLCJhbGciOiJFUzI1NksifQ.eyJzY29wZSI6ImNvbS5hdHByb3RvLmFwcFBhc3MiLCJzdWIiOiJkaWQ6cGxjOmpzamxoajM1NzRvZHRmcWF6cXhuNG9uZCIsImlhdCI6MTc2MTA3OTUzMSwiZXhwIjoxNzYxMDg2NzMxLCJhdWQiOiJkaWQ6d2ViOm95c3RlcmxpbmcudXMtd2VzdC5ob3N0LmJza3kubmV0d29yayJ9.2EPsA8yDLvngSPzOu-DHy-2SQCjgzk4wFxgsOL7BXq1gwmRkoJy_Poykjb8m9JeYt9_s08-VCM_h1C43FOVosg", 3 - "refreshJwt": "eyJ0eXAiOiJyZWZyZXNoK2p3dCIsImFsZyI6IkVTMjU2SyJ9.eyJzY29wZSI6ImNvbS5hdHByb3RvLnJlZnJlc2giLCJzdWIiOiJkaWQ6cGxjOmpzamxoajM1NzRvZHRmcWF6cXhuNG9uZCIsImF1ZCI6ImRpZDp3ZWI6YnNreS5zb2NpYWwiLCJqdGkiOiJpOWVKcTVHa0VQeS9ISVV0YWtUb0dqMW55Mllzb25PK0VGMHUySGRoNFNFIiwiaWF0IjoxNzYxMDc5NTMxLCJleHAiOjE3Njg4NTU1MzF9.5YawggT0amOGgZryO5h2kJ11ePimtc0YMqs8W-ZzxPkU8aymD0m29w4_wZXeyoK4vclU-YvlUc9iDr5SgrqJ2w", 4 - "handle": "automod.skywatch.blue", 5 - "did": "did:plc:jsjlhj3574odtfqazqxn4ond", 6 - "email": "bsky.duration409@passmail.net", 7 - "emailConfirmed": true, 8 - "emailAuthFactor": true, 9 - "active": true 10 - }
···
+50 -4
src/agent.ts
··· 99 } 100 } 101 102 async function authenticate(): Promise<boolean> { 103 const savedSession = loadSession(); 104 ··· 121 return performLogin(); 122 } 123 124 - export const login = authenticate; 125 - export const isLoggedIn = authenticate() 126 - .then((success) => success) 127 - .catch(() => false);
··· 99 } 100 } 101 102 + const MAX_LOGIN_RETRIES = 3; 103 + const RETRY_DELAY_MS = 2000; 104 + 105 + let loginPromise: Promise<void> | null = null; 106 + 107 + async function sleep(ms: number): Promise<void> { 108 + return new Promise((resolve) => setTimeout(resolve, ms)); 109 + } 110 + 111 async function authenticate(): Promise<boolean> { 112 const savedSession = loadSession(); 113 ··· 130 return performLogin(); 131 } 132 133 + async function authenticateWithRetry(): Promise<void> { 134 + // Reuse existing login attempt if one is in progress 135 + if (loginPromise) { 136 + return loginPromise; 137 + } 138 + 139 + loginPromise = (async () => { 140 + for (let attempt = 1; attempt <= MAX_LOGIN_RETRIES; attempt++) { 141 + logger.info( 142 + { attempt, maxRetries: MAX_LOGIN_RETRIES }, 143 + "Attempting login", 144 + ); 145 + 146 + const success = await authenticate(); 147 + 148 + if (success) { 149 + logger.info("Authentication successful"); 150 + return; 151 + } 152 + 153 + if (attempt < MAX_LOGIN_RETRIES) { 154 + logger.warn( 155 + { attempt, maxRetries: MAX_LOGIN_RETRIES, retryInMs: RETRY_DELAY_MS }, 156 + "Login failed, retrying", 157 + ); 158 + await sleep(RETRY_DELAY_MS); 159 + } 160 + } 161 + 162 + logger.error( 163 + { maxRetries: MAX_LOGIN_RETRIES }, 164 + "All login attempts failed, aborting", 165 + ); 166 + process.exit(1); 167 + })(); 168 + 169 + return loginPromise; 170 + } 171 + 172 + export const login = authenticateWithRetry; 173 + export const isLoggedIn = authenticateWithRetry().then(() => true);
+5
src/main.ts
··· 5 IdentityEvent, 6 } from "@skyware/jetstream"; 7 import { Jetstream } from "@skyware/jetstream"; 8 import { 9 CURSOR_UPDATE_INTERVAL, 10 FIREHOSE_URL, ··· 343 344 logger.info({ process: "MAIN" }, "Connecting to Redis"); 345 await connectRedis(); 346 347 jetstream.start(); 348
··· 5 IdentityEvent, 6 } from "@skyware/jetstream"; 7 import { Jetstream } from "@skyware/jetstream"; 8 + import { login } from "./agent.js"; 9 import { 10 CURSOR_UPDATE_INTERVAL, 11 FIREHOSE_URL, ··· 344 345 logger.info({ process: "MAIN" }, "Connecting to Redis"); 346 await connectRedis(); 347 + 348 + logger.info({ process: "MAIN" }, "Authenticating with Bluesky"); 349 + await login(); 350 + logger.info({ process: "MAIN" }, "Authentication complete, starting Jetstream"); 351 352 jetstream.start(); 353
+12 -1
src/tests/agent.test.ts
··· 13 OZONE_PDS: "pds.test.com", 14 })); 15 16 // Mock the AtpAgent 17 - const mockLogin = vi.fn(() => Promise.resolve()); 18 const mockConstructor = vi.fn(); 19 vi.doMock("@atproto/api", () => ({ 20 AtpAgent: class { 21 login = mockLogin; 22 service: URL; 23 constructor(options: { service: string }) { 24 mockConstructor(options); 25 this.service = new URL(options.service);
··· 13 OZONE_PDS: "pds.test.com", 14 })); 15 16 + // Mock session 17 + const mockSession = { 18 + did: "did:plc:test123", 19 + handle: "test.bsky.social", 20 + accessJwt: "test-access-jwt", 21 + refreshJwt: "test-refresh-jwt", 22 + }; 23 + 24 // Mock the AtpAgent 25 + const mockLogin = vi.fn(() => 26 + Promise.resolve({ success: true, data: mockSession }), 27 + ); 28 const mockConstructor = vi.fn(); 29 vi.doMock("@atproto/api", () => ({ 30 AtpAgent: class { 31 login = mockLogin; 32 service: URL; 33 + session = mockSession; 34 constructor(options: { service: string }) { 35 mockConstructor(options); 36 this.service = new URL(options.service);