a tool for shared writing and social publishing
1import { supabaseServerClient } from "supabase/serverClient";
2import { inngest } from "../client";
3import { restoreOAuthSession } from "src/atproto-oauth";
4import {
5 AtpBaseClient,
6 SiteStandardDocument,
7 ComAtprotoRepoStrongRef,
8} from "lexicons/api";
9import { AtUri } from "@atproto/syntax";
10import { Json } from "supabase/database.types";
11
12async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> {
13 const result = await restoreOAuthSession(did);
14 if (!result.ok) {
15 throw new Error(`Failed to restore OAuth session: ${result.error.message}`);
16 }
17 const credentialSession = result.value;
18 return new AtpBaseClient(
19 credentialSession.fetchHandler.bind(credentialSession),
20 );
21}
22
23/**
24 * Fixes site.standard.document records that have the legacy `postRef` field set.
25 * Migrates the value to `bskyPostRef` (the correct field for site.standard.document)
26 * and removes the legacy `postRef` field.
27 *
28 * Can be triggered with specific document URIs, or will find all affected documents
29 * if no URIs are provided.
30 */
31export const fix_standard_document_postref = inngest.createFunction(
32 { id: "fix_standard_document_postref" },
33 { event: "documents/fix-postref" },
34 async ({ event, step }) => {
35 const { documentUris: providedUris } = event.data as {
36 documentUris?: string[];
37 };
38
39 const stats = {
40 documentsFound: 0,
41 documentsFixed: 0,
42 documentsSkipped: 0,
43 errors: [] as string[],
44 };
45
46 // Step 1: Find documents to fix (either provided or query for them)
47 const documentUris = await step.run("find-documents", async () => {
48 if (providedUris && providedUris.length > 0) {
49 return providedUris;
50 }
51
52 // Find all site.standard.document records with postRef set
53 const { data: documents, error } = await supabaseServerClient
54 .from("documents")
55 .select("uri")
56 .like("uri", "at://%/site.standard.document/%")
57 .not("data->postRef", "is", null);
58
59 if (error) {
60 throw new Error(`Failed to query documents: ${error.message}`);
61 }
62
63 return (documents || []).map((d) => d.uri);
64 });
65
66 stats.documentsFound = documentUris.length;
67
68 if (documentUris.length === 0) {
69 return {
70 success: true,
71 message: "No documents found with postRef field",
72 stats,
73 };
74 }
75
76 // Step 2: Group documents by DID for efficient OAuth session handling
77 const docsByDid = new Map<string, string[]>();
78 for (const uri of documentUris) {
79 try {
80 const aturi = new AtUri(uri);
81 const did = aturi.hostname;
82 const existing = docsByDid.get(did) || [];
83 existing.push(uri);
84 docsByDid.set(did, existing);
85 } catch (e) {
86 stats.errors.push(`Invalid URI: ${uri}`);
87 }
88 }
89
90 // Step 3: Process each DID's documents
91 for (const [did, uris] of docsByDid) {
92 // Verify OAuth session for this user
93 const oauthValid = await step.run(
94 `verify-oauth-${did.slice(-8)}`,
95 async () => {
96 const result = await restoreOAuthSession(did);
97 return result.ok;
98 },
99 );
100
101 if (!oauthValid) {
102 stats.errors.push(`No valid OAuth session for ${did}`);
103 stats.documentsSkipped += uris.length;
104 continue;
105 }
106
107 // Fix each document
108 for (const docUri of uris) {
109 const result = await step.run(
110 `fix-doc-${docUri.slice(-12)}`,
111 async () => {
112 // Fetch the document
113 const { data: doc, error: fetchError } = await supabaseServerClient
114 .from("documents")
115 .select("uri, data")
116 .eq("uri", docUri)
117 .single();
118
119 if (fetchError || !doc) {
120 return {
121 success: false as const,
122 error: `Document not found: ${fetchError?.message || "no data"}`,
123 };
124 }
125
126 const data = doc.data as Record<string, unknown>;
127 const postRef = data.postRef as
128 | ComAtprotoRepoStrongRef.Main
129 | undefined;
130
131 if (!postRef) {
132 return {
133 success: false as const,
134 skipped: true,
135 error: "Document does not have postRef field",
136 };
137 }
138
139 // Build updated record: move postRef to bskyPostRef
140 const { postRef: _, ...restData } = data;
141 let updatedRecord: SiteStandardDocument.Record = {
142 ...(restData as SiteStandardDocument.Record),
143 };
144
145 updatedRecord.bskyPostRef = data.bskyPostRef
146 ? (data.bskyPostRef as ComAtprotoRepoStrongRef.Main)
147 : postRef;
148
149 // Write to PDS
150 const docAturi = new AtUri(docUri);
151 const agent = await createAuthenticatedAgent(did);
152 await agent.com.atproto.repo.putRecord({
153 repo: did,
154 collection: "site.standard.document",
155 rkey: docAturi.rkey,
156 record: updatedRecord,
157 validate: false,
158 });
159
160 // Update database
161 const { error: dbError } = await supabaseServerClient
162 .from("documents")
163 .update({ data: updatedRecord as Json })
164 .eq("uri", docUri);
165
166 if (dbError) {
167 return {
168 success: false as const,
169 error: `Database update failed: ${dbError.message}`,
170 };
171 }
172
173 return {
174 success: true as const,
175 postRef,
176 bskyPostRef: updatedRecord.bskyPostRef,
177 };
178 },
179 );
180
181 if (result.success) {
182 stats.documentsFixed++;
183 } else if ("skipped" in result && result.skipped) {
184 stats.documentsSkipped++;
185 } else {
186 stats.errors.push(`${docUri}: ${result.error}`);
187 }
188 }
189 }
190
191 return {
192 success: stats.errors.length === 0,
193 stats,
194 };
195 },
196);