a tool for shared writing and social publishing
298
fork

Configure Feed

Select the types of activity you want to include in your feed.

at update/delete-leaflets 158 lines 5.1 kB view raw
1import { supabaseServerClient } from "supabase/serverClient"; 2import { AtpAgent, AtUri } from "@atproto/api"; 3import { createIdentity } from "actions/createIdentity"; 4import { drizzle } from "drizzle-orm/node-postgres"; 5import { inngest } from "../client"; 6import { pool } from "supabase/pool"; 7 8export const index_follows = inngest.createFunction( 9 { 10 id: "index_follows", 11 throttle: { 12 limit: 1, 13 period: "5m", 14 key: "event.data.did", 15 }, 16 }, 17 { event: "feeds/index-follows" }, 18 async ({ event, step }) => { 19 let follows: string[] = []; 20 let cursor: null | string = null; 21 let hasMore = true; 22 let pageNumber = 0; 23 while (hasMore) { 24 let page: { 25 cursor?: string; 26 follows: string[]; 27 } = await step.run(`get-follows-${pageNumber}`, async () => { 28 let agent = new AtpAgent({ service: "https://public.api.bsky.app" }); 29 let follows = await agent.app.bsky.graph.getFollows({ 30 actor: event.data.did, 31 limit: 100, 32 cursor: cursor || undefined, 33 }); 34 if (!follows.success) 35 throw new Error( 36 "error during querying follows for: " + event.data.did, 37 ); 38 return { 39 cursor: follows.data.cursor, 40 follows: follows.data.follows.map((f) => f.did), 41 }; 42 }); 43 pageNumber++; 44 follows.push(...page.follows); 45 cursor = page.cursor || null; 46 if (!cursor) hasMore = false; 47 } 48 let existingFollows: string[] = []; 49 const batchSize = 100; 50 let batchNumber = 0; 51 52 // Create all check batches in parallel 53 const checkBatches: Promise<any>[] = [ 54 step.run("check-if-identity-exists", async () => { 55 let { data: exists } = await supabaseServerClient 56 .from("identities") 57 .select() 58 .eq("atp_did", event.data.did) 59 .single(); 60 if (!exists) { 61 const client = await pool.connect(); 62 let db = drizzle(client); 63 let identity = await createIdentity(db, { atp_did: event.data.did }); 64 client.release(); 65 return identity; 66 } 67 }), 68 ]; 69 for (let i = 0; i < follows.length; i += batchSize) { 70 const batch = follows.slice(i, i + batchSize); 71 checkBatches.push( 72 step.run(`check-existing-follows-batch-${batchNumber}`, async () => { 73 const { data: existingIdentities } = await supabaseServerClient 74 .from("identities") 75 .select("atp_did") 76 .in("atp_did", batch); 77 78 return existingIdentities?.map((identity) => identity.atp_did!) || []; 79 }), 80 ); 81 batchNumber++; 82 } 83 84 // Wait for all check batches to complete 85 const batchResults = await Promise.all(checkBatches); 86 existingFollows = batchResults.flat().filter(Boolean); 87 88 // Filter follows to only include those that exist in identities table 89 const insertBatchSize = 100; 90 let insertBatchNumber = 0; 91 92 // Create all insert batches in parallel 93 const insertBatches = []; 94 for (let i = 0; i < existingFollows.length; i += insertBatchSize) { 95 const batch = existingFollows.slice(i, i + insertBatchSize); 96 insertBatches.push( 97 step.run(`insert-follows-batch-${insertBatchNumber}`, async () => { 98 const insertData = batch.map((f) => ({ 99 identity: event.data.did, 100 follows: f, 101 })); 102 103 return await supabaseServerClient 104 .from("bsky_follows") 105 .upsert(insertData); 106 }), 107 ); 108 insertBatchNumber++; 109 } 110 111 // Wait for all insert batches to complete 112 await Promise.all(insertBatches); 113 114 // Delete follows that are no longer in the fetched list 115 // For large follow lists, we need to batch this operation 116 await step.run("delete-unfollowed", async () => { 117 // Get all current follows from the database 118 const { data: currentFollows } = await supabaseServerClient 119 .from("bsky_follows") 120 .select("follows") 121 .eq("identity", event.data.did); 122 123 if (!currentFollows || currentFollows.length === 0) { 124 return { deleted: 0 }; 125 } 126 127 // Find follows that are in the database but not in the newly fetched list 128 const currentFollowDids = currentFollows.map((f) => f.follows); 129 const toDelete = currentFollowDids.filter( 130 (did) => !existingFollows.includes(did) 131 ); 132 133 if (toDelete.length === 0) { 134 return { deleted: 0 }; 135 } 136 137 // Delete in batches to avoid query size limits 138 const deleteBatchSize = 100; 139 const deletePromises = []; 140 for (let i = 0; i < toDelete.length; i += deleteBatchSize) { 141 const batch = toDelete.slice(i, i + deleteBatchSize); 142 deletePromises.push( 143 supabaseServerClient 144 .from("bsky_follows") 145 .delete() 146 .eq("identity", event.data.did) 147 .in("follows", batch) 148 ); 149 } 150 151 await Promise.all(deletePromises); 152 return { deleted: toDelete.length }; 153 }); 154 return { 155 done: true, 156 }; 157 }, 158);