a tool for shared writing and social publishing
at update/reader 196 lines 6.1 kB view raw
1import { supabaseServerClient } from "supabase/serverClient"; 2import { inngest } from "../client"; 3import { restoreOAuthSession } from "src/atproto-oauth"; 4import { 5 AtpBaseClient, 6 SiteStandardDocument, 7 ComAtprotoRepoStrongRef, 8} from "lexicons/api"; 9import { AtUri } from "@atproto/syntax"; 10import { Json } from "supabase/database.types"; 11 12async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 13 const result = await restoreOAuthSession(did); 14 if (!result.ok) { 15 throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 16 } 17 const credentialSession = result.value; 18 return new AtpBaseClient( 19 credentialSession.fetchHandler.bind(credentialSession), 20 ); 21} 22 23/** 24 * Fixes site.standard.document records that have the legacy `postRef` field set. 25 * Migrates the value to `bskyPostRef` (the correct field for site.standard.document) 26 * and removes the legacy `postRef` field. 27 * 28 * Can be triggered with specific document URIs, or will find all affected documents 29 * if no URIs are provided. 30 */ 31export const fix_standard_document_postref = inngest.createFunction( 32 { id: "fix_standard_document_postref" }, 33 { event: "documents/fix-postref" }, 34 async ({ event, step }) => { 35 const { documentUris: providedUris } = event.data as { 36 documentUris?: string[]; 37 }; 38 39 const stats = { 40 documentsFound: 0, 41 documentsFixed: 0, 42 documentsSkipped: 0, 43 errors: [] as string[], 44 }; 45 46 // Step 1: Find documents to fix (either provided or query for them) 47 const documentUris = await step.run("find-documents", async () => { 48 if (providedUris && providedUris.length > 0) { 49 return providedUris; 50 } 51 52 // Find all site.standard.document records with postRef set 53 const { data: documents, error } = await supabaseServerClient 54 .from("documents") 55 .select("uri") 56 .like("uri", "at://%/site.standard.document/%") 57 .not("data->postRef", "is", null); 58 59 if (error) { 60 throw new Error(`Failed to query documents: ${error.message}`); 61 } 62 63 return (documents || []).map((d) => d.uri); 64 }); 65 66 stats.documentsFound = documentUris.length; 67 68 if (documentUris.length === 0) { 69 return { 70 success: true, 71 message: "No documents found with postRef field", 72 stats, 73 }; 74 } 75 76 // Step 2: Group documents by DID for efficient OAuth session handling 77 const docsByDid = new Map<string, string[]>(); 78 for (const uri of documentUris) { 79 try { 80 const aturi = new AtUri(uri); 81 const did = aturi.hostname; 82 const existing = docsByDid.get(did) || []; 83 existing.push(uri); 84 docsByDid.set(did, existing); 85 } catch (e) { 86 stats.errors.push(`Invalid URI: ${uri}`); 87 } 88 } 89 90 // Step 3: Process each DID's documents 91 for (const [did, uris] of docsByDid) { 92 // Verify OAuth session for this user 93 const oauthValid = await step.run( 94 `verify-oauth-${did.slice(-8)}`, 95 async () => { 96 const result = await restoreOAuthSession(did); 97 return result.ok; 98 }, 99 ); 100 101 if (!oauthValid) { 102 stats.errors.push(`No valid OAuth session for ${did}`); 103 stats.documentsSkipped += uris.length; 104 continue; 105 } 106 107 // Fix each document 108 for (const docUri of uris) { 109 const result = await step.run( 110 `fix-doc-${docUri.slice(-12)}`, 111 async () => { 112 // Fetch the document 113 const { data: doc, error: fetchError } = await supabaseServerClient 114 .from("documents") 115 .select("uri, data") 116 .eq("uri", docUri) 117 .single(); 118 119 if (fetchError || !doc) { 120 return { 121 success: false as const, 122 error: `Document not found: ${fetchError?.message || "no data"}`, 123 }; 124 } 125 126 const data = doc.data as Record<string, unknown>; 127 const postRef = data.postRef as 128 | ComAtprotoRepoStrongRef.Main 129 | undefined; 130 131 if (!postRef) { 132 return { 133 success: false as const, 134 skipped: true, 135 error: "Document does not have postRef field", 136 }; 137 } 138 139 // Build updated record: move postRef to bskyPostRef 140 const { postRef: _, ...restData } = data; 141 let updatedRecord: SiteStandardDocument.Record = { 142 ...(restData as SiteStandardDocument.Record), 143 }; 144 145 updatedRecord.bskyPostRef = data.bskyPostRef 146 ? (data.bskyPostRef as ComAtprotoRepoStrongRef.Main) 147 : postRef; 148 149 // Write to PDS 150 const docAturi = new AtUri(docUri); 151 const agent = await createAuthenticatedAgent(did); 152 await agent.com.atproto.repo.putRecord({ 153 repo: did, 154 collection: "site.standard.document", 155 rkey: docAturi.rkey, 156 record: updatedRecord, 157 validate: false, 158 }); 159 160 // Update database 161 const { error: dbError } = await supabaseServerClient 162 .from("documents") 163 .update({ data: updatedRecord as Json }) 164 .eq("uri", docUri); 165 166 if (dbError) { 167 return { 168 success: false as const, 169 error: `Database update failed: ${dbError.message}`, 170 }; 171 } 172 173 return { 174 success: true as const, 175 postRef, 176 bskyPostRef: updatedRecord.bskyPostRef, 177 }; 178 }, 179 ); 180 181 if (result.success) { 182 stats.documentsFixed++; 183 } else if ("skipped" in result && result.skipped) { 184 stats.documentsSkipped++; 185 } else { 186 stats.errors.push(`${docUri}: ${result.error}`); 187 } 188 } 189 } 190 191 return { 192 success: stats.errors.length === 0, 193 stats, 194 }; 195 }, 196);