a tool for shared writing and social publishing
1import { supabaseServerClient } from "supabase/serverClient";
2import { AtpAgent, AtUri } from "@atproto/api";
3import { createIdentity } from "actions/createIdentity";
4import { drizzle } from "drizzle-orm/node-postgres";
5import { inngest } from "../client";
6import { pool } from "supabase/pool";
7
8export const index_follows = inngest.createFunction(
9 {
10 id: "index_follows",
11 throttle: {
12 limit: 1,
13 period: "5m",
14 key: "event.data.did",
15 },
16 },
17 { event: "feeds/index-follows" },
18 async ({ event, step }) => {
19 let follows: string[] = [];
20 let cursor: null | string = null;
21 let hasMore = true;
22 let pageNumber = 0;
23 while (hasMore) {
24 let page: {
25 cursor?: string;
26 follows: string[];
27 } = await step.run(`get-follows-${pageNumber}`, async () => {
28 let agent = new AtpAgent({ service: "https://public.api.bsky.app" });
29 let follows = await agent.app.bsky.graph.getFollows({
30 actor: event.data.did,
31 limit: 100,
32 cursor: cursor || undefined,
33 });
34 if (!follows.success)
35 throw new Error(
36 "error during querying follows for: " + event.data.did,
37 );
38 return {
39 cursor: follows.data.cursor,
40 follows: follows.data.follows.map((f) => f.did),
41 };
42 });
43 pageNumber++;
44 follows.push(...page.follows);
45 cursor = page.cursor || null;
46 if (!cursor) hasMore = false;
47 }
48 let existingFollows: string[] = [];
49 const batchSize = 100;
50 let batchNumber = 0;
51
52 // Create all check batches in parallel
53 const checkBatches: Promise<any>[] = [
54 step.run("check-if-identity-exists", async () => {
55 let { data: exists } = await supabaseServerClient
56 .from("identities")
57 .select()
58 .eq("atp_did", event.data.did)
59 .single();
60 if (!exists) {
61 const client = await pool.connect();
62 let db = drizzle(client);
63 let identity = await createIdentity(db, { atp_did: event.data.did });
64 client.release();
65 return identity;
66 }
67 }),
68 ];
69 for (let i = 0; i < follows.length; i += batchSize) {
70 const batch = follows.slice(i, i + batchSize);
71 checkBatches.push(
72 step.run(`check-existing-follows-batch-${batchNumber}`, async () => {
73 const { data: existingIdentities } = await supabaseServerClient
74 .from("identities")
75 .select("atp_did")
76 .in("atp_did", batch);
77
78 return existingIdentities?.map((identity) => identity.atp_did!) || [];
79 }),
80 );
81 batchNumber++;
82 }
83
84 // Wait for all check batches to complete
85 const batchResults = await Promise.all(checkBatches);
86 existingFollows = batchResults.flat().filter(Boolean);
87
88 // Filter follows to only include those that exist in identities table
89 const insertBatchSize = 100;
90 let insertBatchNumber = 0;
91
92 // Create all insert batches in parallel
93 const insertBatches = [];
94 for (let i = 0; i < existingFollows.length; i += insertBatchSize) {
95 const batch = existingFollows.slice(i, i + insertBatchSize);
96 insertBatches.push(
97 step.run(`insert-follows-batch-${insertBatchNumber}`, async () => {
98 const insertData = batch.map((f) => ({
99 identity: event.data.did,
100 follows: f,
101 }));
102
103 return await supabaseServerClient
104 .from("bsky_follows")
105 .upsert(insertData);
106 }),
107 );
108 insertBatchNumber++;
109 }
110
111 // Wait for all insert batches to complete
112 await Promise.all(insertBatches);
113
114 // Delete follows that are no longer in the fetched list
115 // For large follow lists, we need to batch this operation
116 await step.run("delete-unfollowed", async () => {
117 // Get all current follows from the database
118 const { data: currentFollows } = await supabaseServerClient
119 .from("bsky_follows")
120 .select("follows")
121 .eq("identity", event.data.did);
122
123 if (!currentFollows || currentFollows.length === 0) {
124 return { deleted: 0 };
125 }
126
127 // Find follows that are in the database but not in the newly fetched list
128 const currentFollowDids = currentFollows.map((f) => f.follows);
129 const toDelete = currentFollowDids.filter(
130 (did) => !existingFollows.includes(did)
131 );
132
133 if (toDelete.length === 0) {
134 return { deleted: 0 };
135 }
136
137 // Delete in batches to avoid query size limits
138 const deleteBatchSize = 100;
139 const deletePromises = [];
140 for (let i = 0; i < toDelete.length; i += deleteBatchSize) {
141 const batch = toDelete.slice(i, i + deleteBatchSize);
142 deletePromises.push(
143 supabaseServerClient
144 .from("bsky_follows")
145 .delete()
146 .eq("identity", event.data.did)
147 .in("follows", batch)
148 );
149 }
150
151 await Promise.all(deletePromises);
152 return { deleted: toDelete.length };
153 });
154 return {
155 done: true,
156 };
157 },
158);