ATlast — you'll never need to find your favorites on another platform again. Find your favs in the ATmosphere.
atproto

parallelize batch follows

byarielm.fyi da36ed7c aca6bc26

verified
Changed files
+49 -35
netlify
+49 -35
netlify/functions/batch-follow-users.ts
··· 31 31 followLexicon, 32 32 ); 33 33 34 - const results = []; 35 - let consecutiveErrors = 0; 36 - const MAX_CONSECUTIVE_ERRORS = 3; 37 34 const matchRepo = new MatchRepository(); 35 + const CONCURRENCY = 5; // Process 5 follows in parallel 38 36 39 - for (const did of dids) { 37 + // Helper function to follow a single user 38 + const followUser = async (did: string) => { 40 39 if (alreadyFollowing.has(did)) { 41 - results.push({ 42 - did, 43 - success: true, 44 - alreadyFollowing: true, 45 - error: null, 46 - }); 47 - 48 40 try { 49 41 await matchRepo.updateFollowStatus(did, followLexicon, true); 50 42 } catch (dbError) { 51 43 console.error("Failed to update follow status in DB:", dbError); 52 44 } 53 45 54 - continue; 46 + return { 47 + did, 48 + success: true, 49 + alreadyFollowing: true, 50 + error: null, 51 + }; 55 52 } 56 53 57 54 try { ··· 65 62 }, 66 63 }); 67 64 68 - results.push({ 69 - did, 70 - success: true, 71 - alreadyFollowing: false, 72 - error: null, 73 - }); 74 - 75 65 try { 76 66 await matchRepo.updateFollowStatus(did, followLexicon, true); 77 67 } catch (dbError) { 78 68 console.error("Failed to update follow status in DB:", dbError); 79 69 } 80 70 81 - consecutiveErrors = 0; 82 - } catch (error) { 83 - consecutiveErrors++; 84 - 85 - results.push({ 71 + return { 86 72 did, 87 - success: false, 73 + success: true, 88 74 alreadyFollowing: false, 89 - error: error instanceof Error ? error.message : "Follow failed", 90 - }); 91 - 75 + error: null, 76 + }; 77 + } catch (error) { 78 + // Rate limit handling with backoff 92 79 if ( 93 80 error instanceof Error && 94 81 (error.message.includes("rate limit") || error.message.includes("429")) 95 82 ) { 96 - const backoffDelay = Math.min( 97 - 200 * Math.pow(2, consecutiveErrors), 98 - 2000, 99 - ); 100 - console.log(`Rate limit hit. Backing off for ${backoffDelay}ms...`); 83 + const backoffDelay = 1000; // 1 second backoff for rate limits 84 + console.log(`Rate limit hit for ${did}. Backing off for ${backoffDelay}ms...`); 101 85 await new Promise((resolve) => setTimeout(resolve, backoffDelay)); 102 - } else if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { 103 - await new Promise((resolve) => setTimeout(resolve, 500)); 86 + } 87 + 88 + return { 89 + did, 90 + success: false, 91 + alreadyFollowing: false, 92 + error: error instanceof Error ? error.message : "Follow failed", 93 + }; 94 + } 95 + }; 96 + 97 + // Process follows in chunks with controlled concurrency 98 + const results = []; 99 + for (let i = 0; i < dids.length; i += CONCURRENCY) { 100 + const chunk = dids.slice(i, i + CONCURRENCY); 101 + const chunkResults = await Promise.allSettled( 102 + chunk.map(did => followUser(did)) 103 + ); 104 + 105 + // Extract results from Promise.allSettled 106 + for (const result of chunkResults) { 107 + if (result.status === 'fulfilled') { 108 + results.push(result.value); 109 + } else { 110 + // This shouldn't happen as we handle errors in followUser 111 + console.error('Unexpected promise rejection:', result.reason); 112 + results.push({ 113 + did: 'unknown', 114 + success: false, 115 + alreadyFollowing: false, 116 + error: 'Unexpected error', 117 + }); 104 118 } 105 119 } 106 120 }