a tool for shared writing and social publishing

Compare changes

Choose any two refs to compare.

+1242 -111
+7 -4
actions/publishToPublication.ts
··· 65 65 } from "src/utils/collectionHelpers"; 66 66 67 67 type PublishResult = 68 - | { success: true; rkey: string; record: PubLeafletDocument.Record } 68 + | { success: true; rkey: string; record: SiteStandardDocument.Record } 69 69 | { success: false; error: OAuthSessionError }; 70 70 71 71 export async function publishToPublication({ ··· 199 199 } 200 200 201 201 // Determine the collection to use - preserve existing schema if updating 202 - const existingCollection = existingDocUri ? new AtUri(existingDocUri).collection : undefined; 202 + const existingCollection = existingDocUri 203 + ? new AtUri(existingDocUri).collection 204 + : undefined; 203 205 const documentType = getDocumentType(existingCollection); 204 206 205 207 // Build the pages array (used by both formats) ··· 228 230 if (documentType === "site.standard.document") { 229 231 // site.standard.document format 230 232 // For standalone docs, use HTTPS URL; for publication docs, use the publication AT-URI 231 - const siteUri = publication_uri || `https://leaflet.pub/p/${credentialSession.did}`; 233 + const siteUri = 234 + publication_uri || `https://leaflet.pub/p/${credentialSession.did}`; 232 235 233 236 record = { 234 237 $type: "site.standard.document", 235 238 title: title || "Untitled", 236 239 site: siteUri, 237 - path: rkey, 240 + path: "/" + rkey, 238 241 publishedAt: 239 242 publishedAt || existingRecord.publishedAt || new Date().toISOString(), 240 243 ...(description && { description }),
+1 -1
app/(home-pages)/discover/PubListing.tsx
··· 62 62 <p> 63 63 Updated{" "} 64 64 {timeAgo( 65 - props.documents_in_publications?.[0]?.documents?.indexed_at || 65 + props.documents_in_publications?.[0]?.documents?.sort_date || 66 66 "", 67 67 )} 68 68 </p>
+8 -8
app/(home-pages)/discover/getPublications.ts
··· 8 8 import { deduplicateByUri } from "src/utils/deduplicateRecords"; 9 9 10 10 export type Cursor = { 11 - indexed_at?: string; 11 + sort_date?: string; 12 12 count?: number; 13 13 uri: string; 14 14 }; ··· 32 32 .or( 33 33 "record->preferences->showInDiscover.is.null,record->preferences->>showInDiscover.eq.true", 34 34 ) 35 - .order("indexed_at", { 35 + .order("documents(sort_date)", { 36 36 referencedTable: "documents_in_publications", 37 37 ascending: false, 38 38 }) ··· 64 64 } else { 65 65 // recentlyUpdated 66 66 const aDate = new Date( 67 - a.documents_in_publications[0]?.indexed_at || 0, 67 + a.documents_in_publications[0]?.documents?.sort_date || 0, 68 68 ).getTime(); 69 69 const bDate = new Date( 70 - b.documents_in_publications[0]?.indexed_at || 0, 70 + b.documents_in_publications[0]?.documents?.sort_date || 0, 71 71 ).getTime(); 72 72 if (bDate !== aDate) { 73 73 return bDate - aDate; ··· 89 89 (pubCount === cursor.count && pub.uri < cursor.uri) 90 90 ); 91 91 } else { 92 - const pubDate = pub.documents_in_publications[0]?.indexed_at || ""; 92 + const pubDate = pub.documents_in_publications[0]?.documents?.sort_date || ""; 93 93 // Find first pub after cursor 94 94 return ( 95 - pubDate < (cursor.indexed_at || "") || 96 - (pubDate === cursor.indexed_at && pub.uri < cursor.uri) 95 + pubDate < (cursor.sort_date || "") || 96 + (pubDate === cursor.sort_date && pub.uri < cursor.uri) 97 97 ); 98 98 } 99 99 }); ··· 117 117 normalizedPage.length > 0 && startIndex + limit < allPubs.length 118 118 ? order === "recentlyUpdated" 119 119 ? { 120 - indexed_at: lastItem.documents_in_publications[0]?.indexed_at, 120 + sort_date: lastItem.documents_in_publications[0]?.documents?.sort_date, 121 121 uri: lastItem.uri, 122 122 } 123 123 : {
+5 -5
app/(home-pages)/p/[didOrHandle]/getProfilePosts.ts
··· 10 10 import { deduplicateByUriOrdered } from "src/utils/deduplicateRecords"; 11 11 12 12 export type Cursor = { 13 - indexed_at: string; 13 + sort_date: string; 14 14 uri: string; 15 15 }; 16 16 ··· 29 29 documents_in_publications(publications(*))`, 30 30 ) 31 31 .like("uri", `at://${did}/%`) 32 - .order("indexed_at", { ascending: false }) 32 + .order("sort_date", { ascending: false }) 33 33 .order("uri", { ascending: false }) 34 34 .limit(limit); 35 35 36 36 if (cursor) { 37 37 query = query.or( 38 - `indexed_at.lt.${cursor.indexed_at},and(indexed_at.eq.${cursor.indexed_at},uri.lt.${cursor.uri})`, 38 + `sort_date.lt.${cursor.sort_date},and(sort_date.eq.${cursor.sort_date},uri.lt.${cursor.uri})`, 39 39 ); 40 40 } 41 41 ··· 79 79 documents: { 80 80 data: normalizedData, 81 81 uri: doc.uri, 82 - indexed_at: doc.indexed_at, 82 + sort_date: doc.sort_date, 83 83 comments_on_documents: doc.comments_on_documents, 84 84 document_mentions_in_bsky: doc.document_mentions_in_bsky, 85 85 }, ··· 99 99 const nextCursor = 100 100 posts.length === limit 101 101 ? { 102 - indexed_at: posts[posts.length - 1].documents.indexed_at, 102 + sort_date: posts[posts.length - 1].documents.sort_date, 103 103 uri: posts[posts.length - 1].documents.uri, 104 104 } 105 105 : null;
+5 -5
app/(home-pages)/reader/getReaderFeed.ts
··· 38 38 "documents_in_publications.publications.publication_subscriptions.identity", 39 39 auth_res.atp_did, 40 40 ) 41 - .order("indexed_at", { ascending: false }) 41 + .order("sort_date", { ascending: false }) 42 42 .order("uri", { ascending: false }) 43 43 .limit(25); 44 44 if (cursor) { 45 45 query = query.or( 46 - `indexed_at.lt.${cursor.timestamp},and(indexed_at.eq.${cursor.timestamp},uri.lt.${cursor.uri})`, 46 + `sort_date.lt.${cursor.timestamp},and(sort_date.eq.${cursor.timestamp},uri.lt.${cursor.uri})`, 47 47 ); 48 48 } 49 49 let { data: rawFeed, error } = await query; ··· 78 78 document_mentions_in_bsky: post.document_mentions_in_bsky, 79 79 data: normalizedData, 80 80 uri: post.uri, 81 - indexed_at: post.indexed_at, 81 + sort_date: post.sort_date, 82 82 }, 83 83 }; 84 84 return p; ··· 88 88 const nextCursor = 89 89 posts.length > 0 90 90 ? { 91 - timestamp: posts[posts.length - 1].documents.indexed_at, 91 + timestamp: posts[posts.length - 1].documents.sort_date, 92 92 uri: posts[posts.length - 1].documents.uri, 93 93 } 94 94 : null; ··· 109 109 documents: { 110 110 data: NormalizedDocument | null; 111 111 uri: string; 112 - indexed_at: string; 112 + sort_date: string; 113 113 comments_on_documents: { count: number }[] | undefined; 114 114 document_mentions_in_bsky: { count: number }[] | undefined; 115 115 };
+2 -2
app/(home-pages)/reader/getSubscriptions.ts
··· 32 32 .select(`*, publications(*, documents_in_publications(*, documents(*)))`) 33 33 .order(`created_at`, { ascending: false }) 34 34 .order(`uri`, { ascending: false }) 35 - .order("indexed_at", { 35 + .order("documents(sort_date)", { 36 36 ascending: false, 37 37 referencedTable: "publications.documents_in_publications", 38 38 }) ··· 85 85 record: NormalizedPublication; 86 86 uri: string; 87 87 documents_in_publications: { 88 - documents: { data?: Json; indexed_at: string } | null; 88 + documents: { data?: Json; sort_date: string } | null; 89 89 }[]; 90 90 };
+2 -2
app/(home-pages)/tag/[tag]/getDocumentsByTag.ts
··· 24 24 documents_in_publications(publications(*))`, 25 25 ) 26 26 .contains("data->tags", `["${tag}"]`) 27 - .order("indexed_at", { ascending: false }) 27 + .order("sort_date", { ascending: false }) 28 28 .limit(50); 29 29 30 30 if (error) { ··· 69 69 document_mentions_in_bsky: doc.document_mentions_in_bsky, 70 70 data: normalizedData, 71 71 uri: doc.uri, 72 - indexed_at: doc.indexed_at, 72 + sort_date: doc.sort_date, 73 73 }, 74 74 }; 75 75 return post;
+4 -7
app/[leaflet_id]/publish/publishBskyPost.ts
··· 8 8 import sharp from "sharp"; 9 9 import { TID } from "@atproto/common"; 10 10 import { getIdentityData } from "actions/getIdentityData"; 11 - import { AtpBaseClient, PubLeafletDocument } from "lexicons/api"; 12 - import { 13 - restoreOAuthSession, 14 - OAuthSessionError, 15 - } from "src/atproto-oauth"; 11 + import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 12 + import { restoreOAuthSession, OAuthSessionError } from "src/atproto-oauth"; 16 13 import { supabaseServerClient } from "supabase/serverClient"; 17 14 import { Json } from "supabase/database.types"; 18 15 import { ··· 30 27 url: string; 31 28 title: string; 32 29 description: string; 33 - document_record: PubLeafletDocument.Record; 30 + document_record: SiteStandardDocument.Record; 34 31 rkey: string; 35 32 facets: AppBskyRichtextFacet.Main[]; 36 33 }): Promise<PublishBskyResult> { ··· 115 112 }, 116 113 ); 117 114 let record = args.document_record; 118 - record.postRef = post; 115 + record.bskyPostRef = post; 119 116 120 117 let { data: result } = await agent.com.atproto.repo.putRecord({ 121 118 rkey: args.rkey,
+25
app/api/inngest/client.ts
··· 26 26 did: string; 27 27 }; 28 28 }; 29 + "user/cleanup-expired-oauth-sessions": { 30 + data: {}; 31 + }; 32 + "user/check-oauth-session": { 33 + data: { 34 + identityId: string; 35 + did: string; 36 + tokenCount: number; 37 + }; 38 + }; 39 + "documents/fix-publication-references": { 40 + data: { 41 + documentUris: string[]; 42 + }; 43 + }; 44 + "documents/fix-incorrect-site-values": { 45 + data: { 46 + did: string; 47 + }; 48 + }; 49 + "documents/fix-postref": { 50 + data: { 51 + documentUris?: string[]; 52 + }; 53 + }; 29 54 }; 30 55 31 56 // Create a client to send and receive events
+123
app/api/inngest/functions/cleanup_expired_oauth_sessions.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + 5 + // Main function that fetches identities and publishes events for each one 6 + export const cleanup_expired_oauth_sessions = inngest.createFunction( 7 + { id: "cleanup_expired_oauth_sessions" }, 8 + { event: "user/cleanup-expired-oauth-sessions" }, 9 + async ({ step }) => { 10 + // Get all identities with an atp_did (OAuth users) that have at least one auth token 11 + const identities = await step.run("fetch-oauth-identities", async () => { 12 + const { data, error } = await supabaseServerClient 13 + .from("identities") 14 + .select("id, atp_did, email_auth_tokens(count)") 15 + .not("atp_did", "is", null); 16 + 17 + if (error) { 18 + throw new Error(`Failed to fetch identities: ${error.message}`); 19 + } 20 + 21 + // Filter to only include identities with at least one auth token 22 + return (data || []) 23 + .filter((identity) => { 24 + const tokenCount = identity.email_auth_tokens?.[0]?.count ?? 0; 25 + return tokenCount > 0; 26 + }) 27 + .map((identity) => ({ 28 + id: identity.id, 29 + atp_did: identity.atp_did!, 30 + tokenCount: identity.email_auth_tokens?.[0]?.count ?? 0, 31 + })); 32 + }); 33 + 34 + console.log( 35 + `Found ${identities.length} OAuth identities with active sessions to check`, 36 + ); 37 + 38 + // Publish events for each identity in batches 39 + const BATCH_SIZE = 100; 40 + let totalSent = 0; 41 + 42 + for (let i = 0; i < identities.length; i += BATCH_SIZE) { 43 + const batch = identities.slice(i, i + BATCH_SIZE); 44 + 45 + await step.run(`send-events-batch-${i}`, async () => { 46 + const events = batch.map((identity) => ({ 47 + name: "user/check-oauth-session" as const, 48 + data: { 49 + identityId: identity.id, 50 + did: identity.atp_did, 51 + tokenCount: identity.tokenCount, 52 + }, 53 + })); 54 + 55 + await inngest.send(events); 56 + return events.length; 57 + }); 58 + 59 + totalSent += batch.length; 60 + } 61 + 62 + console.log(`Published ${totalSent} check-oauth-session events`); 63 + 64 + return { 65 + success: true, 66 + identitiesQueued: totalSent, 67 + }; 68 + }, 69 + ); 70 + 71 + // Function that checks a single identity's OAuth session and cleans up if expired 72 + export const check_oauth_session = inngest.createFunction( 73 + { id: "check_oauth_session" }, 74 + { event: "user/check-oauth-session" }, 75 + async ({ event, step }) => { 76 + const { identityId, did, tokenCount } = event.data; 77 + 78 + const result = await step.run("check-and-cleanup", async () => { 79 + console.log(`Checking OAuth session for DID: ${did} (${tokenCount} tokens)`); 80 + 81 + const sessionResult = await restoreOAuthSession(did); 82 + 83 + if (sessionResult.ok) { 84 + console.log(` Session valid for ${did}`); 85 + return { valid: true, tokensDeleted: 0 }; 86 + } 87 + 88 + // Session is expired/invalid - delete associated auth tokens 89 + console.log( 90 + ` Session expired for ${did}: ${sessionResult.error.message}`, 91 + ); 92 + 93 + const { error: deleteError } = await supabaseServerClient 94 + .from("email_auth_tokens") 95 + .delete() 96 + .eq("identity", identityId); 97 + 98 + if (deleteError) { 99 + console.error( 100 + ` Error deleting tokens for identity ${identityId}: ${deleteError.message}`, 101 + ); 102 + return { 103 + valid: false, 104 + tokensDeleted: 0, 105 + error: deleteError.message, 106 + }; 107 + } 108 + 109 + console.log(` Deleted ${tokenCount} auth tokens for identity ${identityId}`); 110 + 111 + return { 112 + valid: false, 113 + tokensDeleted: tokenCount, 114 + }; 115 + }); 116 + 117 + return { 118 + identityId, 119 + did, 120 + ...result, 121 + }; 122 + }, 123 + );
+300
app/api/inngest/functions/fix_incorrect_site_values.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5 + import { AtUri } from "@atproto/syntax"; 6 + import { Json } from "supabase/database.types"; 7 + 8 + async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 + const result = await restoreOAuthSession(did); 10 + if (!result.ok) { 11 + throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 + } 13 + const credentialSession = result.value; 14 + return new AtpBaseClient( 15 + credentialSession.fetchHandler.bind(credentialSession), 16 + ); 17 + } 18 + 19 + /** 20 + * Build set of valid site values for a publication. 21 + * A site value is valid if it matches the publication or its legacy equivalent. 22 + */ 23 + function buildValidSiteValues(pubUri: string): Set<string> { 24 + const validValues = new Set<string>([pubUri]); 25 + 26 + try { 27 + const aturi = new AtUri(pubUri); 28 + 29 + if (pubUri.includes("/site.standard.publication/")) { 30 + // Also accept legacy pub.leaflet.publication 31 + validValues.add( 32 + `at://${aturi.hostname}/pub.leaflet.publication/${aturi.rkey}`, 33 + ); 34 + } else if (pubUri.includes("/pub.leaflet.publication/")) { 35 + // Also accept new site.standard.publication 36 + validValues.add( 37 + `at://${aturi.hostname}/site.standard.publication/${aturi.rkey}`, 38 + ); 39 + } 40 + } catch (e) { 41 + // Invalid URI, just use the original 42 + } 43 + 44 + return validValues; 45 + } 46 + 47 + /** 48 + * This function finds and fixes documents that have incorrect site values. 49 + * A document has an incorrect site value if its `site` field doesn't match 50 + * the publication it belongs to (via documents_in_publications). 51 + * 52 + * Takes a DID as input and processes publications owned by that identity. 53 + */ 54 + export const fix_incorrect_site_values = inngest.createFunction( 55 + { id: "fix_incorrect_site_values" }, 56 + { event: "documents/fix-incorrect-site-values" }, 57 + async ({ event, step }) => { 58 + const { did } = event.data; 59 + 60 + const stats = { 61 + publicationsChecked: 0, 62 + documentsChecked: 0, 63 + documentsWithIncorrectSite: 0, 64 + documentsFixed: 0, 65 + documentsMissingSite: 0, 66 + errors: [] as string[], 67 + }; 68 + 69 + // Step 1: Get all publications owned by this identity 70 + const publications = await step.run("fetch-publications", async () => { 71 + const { data, error } = await supabaseServerClient 72 + .from("publications") 73 + .select("uri") 74 + .eq("identity_did", did); 75 + 76 + if (error) { 77 + throw new Error(`Failed to fetch publications: ${error.message}`); 78 + } 79 + return data || []; 80 + }); 81 + 82 + stats.publicationsChecked = publications.length; 83 + 84 + if (publications.length === 0) { 85 + return { 86 + success: true, 87 + message: "No publications found for this identity", 88 + stats, 89 + }; 90 + } 91 + 92 + // Step 2: Get all documents_in_publications entries for these publications 93 + const publicationUris = publications.map((p) => p.uri); 94 + 95 + const joinEntries = await step.run( 96 + "fetch-documents-in-publications", 97 + async () => { 98 + const { data, error } = await supabaseServerClient 99 + .from("documents_in_publications") 100 + .select("document, publication") 101 + .in("publication", publicationUris); 102 + 103 + if (error) { 104 + throw new Error( 105 + `Failed to fetch documents_in_publications: ${error.message}`, 106 + ); 107 + } 108 + return data || []; 109 + }, 110 + ); 111 + 112 + if (joinEntries.length === 0) { 113 + return { 114 + success: true, 115 + message: "No documents found in publications", 116 + stats, 117 + }; 118 + } 119 + 120 + // Create a map of document URI -> expected publication URI 121 + const documentToPublication = new Map<string, string>(); 122 + for (const row of joinEntries) { 123 + documentToPublication.set(row.document, row.publication); 124 + } 125 + 126 + // Step 3: Fetch all document records 127 + const documentUris = Array.from(documentToPublication.keys()); 128 + 129 + const allDocuments = await step.run("fetch-documents", async () => { 130 + const { data, error } = await supabaseServerClient 131 + .from("documents") 132 + .select("uri, data") 133 + .in("uri", documentUris); 134 + 135 + if (error) { 136 + throw new Error(`Failed to fetch documents: ${error.message}`); 137 + } 138 + return data || []; 139 + }); 140 + 141 + stats.documentsChecked = allDocuments.length; 142 + 143 + // Step 4: Find documents with incorrect site values 144 + const documentsToFix: Array<{ 145 + uri: string; 146 + currentSite: string | null; 147 + correctSite: string; 148 + docData: SiteStandardDocument.Record; 149 + }> = []; 150 + 151 + for (const doc of allDocuments) { 152 + const expectedPubUri = documentToPublication.get(doc.uri); 153 + if (!expectedPubUri) continue; 154 + 155 + const data = doc.data as unknown as SiteStandardDocument.Record; 156 + const currentSite = data?.site; 157 + 158 + if (!currentSite) { 159 + stats.documentsMissingSite++; 160 + continue; 161 + } 162 + 163 + const validSiteValues = buildValidSiteValues(expectedPubUri); 164 + 165 + if (!validSiteValues.has(currentSite)) { 166 + // Document has incorrect site value - determine the correct one 167 + // Prefer the site.standard.publication format if the doc is site.standard.document 168 + let correctSite = expectedPubUri; 169 + 170 + if (doc.uri.includes("/site.standard.document/")) { 171 + // For site.standard.document, use site.standard.publication format 172 + try { 173 + const pubAturi = new AtUri(expectedPubUri); 174 + if (expectedPubUri.includes("/pub.leaflet.publication/")) { 175 + correctSite = `at://${pubAturi.hostname}/site.standard.publication/${pubAturi.rkey}`; 176 + } 177 + } catch (e) { 178 + // Use as-is 179 + } 180 + } 181 + 182 + documentsToFix.push({ 183 + uri: doc.uri, 184 + currentSite, 185 + correctSite, 186 + docData: data, 187 + }); 188 + } 189 + } 190 + 191 + stats.documentsWithIncorrectSite = documentsToFix.length; 192 + 193 + if (documentsToFix.length === 0) { 194 + return { 195 + success: true, 196 + message: "All documents have correct site values", 197 + stats, 198 + }; 199 + } 200 + 201 + // Step 5: Group documents by author DID for efficient OAuth session handling 202 + const docsByDid = new Map<string, typeof documentsToFix>(); 203 + for (const doc of documentsToFix) { 204 + try { 205 + const aturi = new AtUri(doc.uri); 206 + const authorDid = aturi.hostname; 207 + const existing = docsByDid.get(authorDid) || []; 208 + existing.push(doc); 209 + docsByDid.set(authorDid, existing); 210 + } catch (e) { 211 + stats.errors.push(`Invalid URI: ${doc.uri}`); 212 + } 213 + } 214 + 215 + // Step 6: Process each author's documents 216 + for (const [authorDid, docs] of docsByDid) { 217 + // Verify OAuth session for this author 218 + const oauthValid = await step.run( 219 + `verify-oauth-${authorDid.slice(-8)}`, 220 + async () => { 221 + const result = await restoreOAuthSession(authorDid); 222 + return result.ok; 223 + }, 224 + ); 225 + 226 + if (!oauthValid) { 227 + stats.errors.push(`No valid OAuth session for ${authorDid}`); 228 + continue; 229 + } 230 + 231 + // Fix each document for this author 232 + for (const docToFix of docs) { 233 + const result = await step.run( 234 + `fix-doc-${docToFix.uri.slice(-12)}`, 235 + async () => { 236 + try { 237 + const docAturi = new AtUri(docToFix.uri); 238 + 239 + // Build updated record 240 + const updatedRecord: SiteStandardDocument.Record = { 241 + ...docToFix.docData, 242 + site: docToFix.correctSite, 243 + }; 244 + 245 + // Update on PDS 246 + const agent = await createAuthenticatedAgent(authorDid); 247 + await agent.com.atproto.repo.putRecord({ 248 + repo: authorDid, 249 + collection: docAturi.collection, 250 + rkey: docAturi.rkey, 251 + record: updatedRecord, 252 + validate: false, 253 + }); 254 + 255 + // Update in database 256 + const { error: dbError } = await supabaseServerClient 257 + .from("documents") 258 + .update({ data: updatedRecord as Json }) 259 + .eq("uri", docToFix.uri); 260 + 261 + if (dbError) { 262 + return { 263 + success: false as const, 264 + error: `Database update failed: ${dbError.message}`, 265 + }; 266 + } 267 + 268 + return { 269 + success: true as const, 270 + oldSite: docToFix.currentSite, 271 + newSite: docToFix.correctSite, 272 + }; 273 + } catch (e) { 274 + return { 275 + success: false as const, 276 + error: e instanceof Error ? e.message : String(e), 277 + }; 278 + } 279 + }, 280 + ); 281 + 282 + if (result.success) { 283 + stats.documentsFixed++; 284 + } else { 285 + stats.errors.push(`${docToFix.uri}: ${result.error}`); 286 + } 287 + } 288 + } 289 + 290 + return { 291 + success: stats.errors.length === 0, 292 + stats, 293 + documentsToFix: documentsToFix.map((d) => ({ 294 + uri: d.uri, 295 + oldSite: d.currentSite, 296 + newSite: d.correctSite, 297 + })), 298 + }; 299 + }, 300 + );
+196
app/api/inngest/functions/fix_standard_document_postref.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + import { 5 + AtpBaseClient, 6 + SiteStandardDocument, 7 + ComAtprotoRepoStrongRef, 8 + } from "lexicons/api"; 9 + import { AtUri } from "@atproto/syntax"; 10 + import { Json } from "supabase/database.types"; 11 + 12 + async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 13 + const result = await restoreOAuthSession(did); 14 + if (!result.ok) { 15 + throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 16 + } 17 + const credentialSession = result.value; 18 + return new AtpBaseClient( 19 + credentialSession.fetchHandler.bind(credentialSession), 20 + ); 21 + } 22 + 23 + /** 24 + * Fixes site.standard.document records that have the legacy `postRef` field set. 25 + * Migrates the value to `bskyPostRef` (the correct field for site.standard.document) 26 + * and removes the legacy `postRef` field. 27 + * 28 + * Can be triggered with specific document URIs, or will find all affected documents 29 + * if no URIs are provided. 30 + */ 31 + export const fix_standard_document_postref = inngest.createFunction( 32 + { id: "fix_standard_document_postref" }, 33 + { event: "documents/fix-postref" }, 34 + async ({ event, step }) => { 35 + const { documentUris: providedUris } = event.data as { 36 + documentUris?: string[]; 37 + }; 38 + 39 + const stats = { 40 + documentsFound: 0, 41 + documentsFixed: 0, 42 + documentsSkipped: 0, 43 + errors: [] as string[], 44 + }; 45 + 46 + // Step 1: Find documents to fix (either provided or query for them) 47 + const documentUris = await step.run("find-documents", async () => { 48 + if (providedUris && providedUris.length > 0) { 49 + return providedUris; 50 + } 51 + 52 + // Find all site.standard.document records with postRef set 53 + const { data: documents, error } = await supabaseServerClient 54 + .from("documents") 55 + .select("uri") 56 + .like("uri", "at://%/site.standard.document/%") 57 + .not("data->postRef", "is", null); 58 + 59 + if (error) { 60 + throw new Error(`Failed to query documents: ${error.message}`); 61 + } 62 + 63 + return (documents || []).map((d) => d.uri); 64 + }); 65 + 66 + stats.documentsFound = documentUris.length; 67 + 68 + if (documentUris.length === 0) { 69 + return { 70 + success: true, 71 + message: "No documents found with postRef field", 72 + stats, 73 + }; 74 + } 75 + 76 + // Step 2: Group documents by DID for efficient OAuth session handling 77 + const docsByDid = new Map<string, string[]>(); 78 + for (const uri of documentUris) { 79 + try { 80 + const aturi = new AtUri(uri); 81 + const did = aturi.hostname; 82 + const existing = docsByDid.get(did) || []; 83 + existing.push(uri); 84 + docsByDid.set(did, existing); 85 + } catch (e) { 86 + stats.errors.push(`Invalid URI: ${uri}`); 87 + } 88 + } 89 + 90 + // Step 3: Process each DID's documents 91 + for (const [did, uris] of docsByDid) { 92 + // Verify OAuth session for this user 93 + const oauthValid = await step.run( 94 + `verify-oauth-${did.slice(-8)}`, 95 + async () => { 96 + const result = await restoreOAuthSession(did); 97 + return result.ok; 98 + }, 99 + ); 100 + 101 + if (!oauthValid) { 102 + stats.errors.push(`No valid OAuth session for ${did}`); 103 + stats.documentsSkipped += uris.length; 104 + continue; 105 + } 106 + 107 + // Fix each document 108 + for (const docUri of uris) { 109 + const result = await step.run( 110 + `fix-doc-${docUri.slice(-12)}`, 111 + async () => { 112 + // Fetch the document 113 + const { data: doc, error: fetchError } = await supabaseServerClient 114 + .from("documents") 115 + .select("uri, data") 116 + .eq("uri", docUri) 117 + .single(); 118 + 119 + if (fetchError || !doc) { 120 + return { 121 + success: false as const, 122 + error: `Document not found: ${fetchError?.message || "no data"}`, 123 + }; 124 + } 125 + 126 + const data = doc.data as Record<string, unknown>; 127 + const postRef = data.postRef as 128 + | ComAtprotoRepoStrongRef.Main 129 + | undefined; 130 + 131 + if (!postRef) { 132 + return { 133 + success: false as const, 134 + skipped: true, 135 + error: "Document does not have postRef field", 136 + }; 137 + } 138 + 139 + // Build updated record: move postRef to bskyPostRef 140 + const { postRef: _, ...restData } = data; 141 + let updatedRecord: SiteStandardDocument.Record = { 142 + ...(restData as SiteStandardDocument.Record), 143 + }; 144 + 145 + updatedRecord.bskyPostRef = data.bskyPostRef 146 + ? (data.bskyPostRef as ComAtprotoRepoStrongRef.Main) 147 + : postRef; 148 + 149 + // Write to PDS 150 + const docAturi = new AtUri(docUri); 151 + const agent = await createAuthenticatedAgent(did); 152 + await agent.com.atproto.repo.putRecord({ 153 + repo: did, 154 + collection: "site.standard.document", 155 + rkey: docAturi.rkey, 156 + record: updatedRecord, 157 + validate: false, 158 + }); 159 + 160 + // Update database 161 + const { error: dbError } = await supabaseServerClient 162 + .from("documents") 163 + .update({ data: updatedRecord as Json }) 164 + .eq("uri", docUri); 165 + 166 + if (dbError) { 167 + return { 168 + success: false as const, 169 + error: `Database update failed: ${dbError.message}`, 170 + }; 171 + } 172 + 173 + return { 174 + success: true as const, 175 + postRef, 176 + bskyPostRef: updatedRecord.bskyPostRef, 177 + }; 178 + }, 179 + ); 180 + 181 + if (result.success) { 182 + stats.documentsFixed++; 183 + } else if ("skipped" in result && result.skipped) { 184 + stats.documentsSkipped++; 185 + } else { 186 + stats.errors.push(`${docUri}: ${result.error}`); 187 + } 188 + } 189 + } 190 + 191 + return { 192 + success: stats.errors.length === 0, 193 + stats, 194 + }; 195 + }, 196 + );
+213
app/api/inngest/functions/fix_standard_document_publications.ts
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5 + import { AtUri } from "@atproto/syntax"; 6 + import { Json } from "supabase/database.types"; 7 + 8 + async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 + const result = await restoreOAuthSession(did); 10 + if (!result.ok) { 11 + throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 + } 13 + const credentialSession = result.value; 14 + return new AtpBaseClient( 15 + credentialSession.fetchHandler.bind(credentialSession), 16 + ); 17 + } 18 + 19 + /** 20 + * Fixes site.standard.document records that have stale pub.leaflet.publication 21 + * references in their site field. Updates both the PDS record and database. 22 + */ 23 + export const fix_standard_document_publications = inngest.createFunction( 24 + { id: "fix_standard_document_publications" }, 25 + { event: "documents/fix-publication-references" }, 26 + async ({ event, step }) => { 27 + const { documentUris } = event.data as { documentUris: string[] }; 28 + 29 + const stats = { 30 + documentsFixed: 0, 31 + joinEntriesFixed: 0, 32 + errors: [] as string[], 33 + }; 34 + 35 + if (!documentUris || documentUris.length === 0) { 36 + return { success: true, stats, message: "No documents to fix" }; 37 + } 38 + 39 + // Group documents by DID (author) for efficient OAuth session handling 40 + const docsByDid = new Map<string, string[]>(); 41 + for (const uri of documentUris) { 42 + try { 43 + const aturi = new AtUri(uri); 44 + const did = aturi.hostname; 45 + const existing = docsByDid.get(did) || []; 46 + existing.push(uri); 47 + docsByDid.set(did, existing); 48 + } catch (e) { 49 + stats.errors.push(`Invalid URI: ${uri}`); 50 + } 51 + } 52 + 53 + // Process each DID's documents 54 + for (const [did, uris] of docsByDid) { 55 + // Verify OAuth session for this user 56 + const oauthValid = await step.run( 57 + `verify-oauth-${did.slice(-8)}`, 58 + async () => { 59 + const result = await restoreOAuthSession(did); 60 + return result.ok; 61 + }, 62 + ); 63 + 64 + if (!oauthValid) { 65 + stats.errors.push(`No valid OAuth session for ${did}`); 66 + continue; 67 + } 68 + 69 + // Fix each document 70 + for (const docUri of uris) { 71 + const result = await step.run( 72 + `fix-doc-${docUri.slice(-12)}`, 73 + async () => { 74 + // Fetch the document 75 + const { data: doc, error: fetchError } = await supabaseServerClient 76 + .from("documents") 77 + .select("uri, data") 78 + .eq("uri", docUri) 79 + .single(); 80 + 81 + if (fetchError || !doc) { 82 + return { 83 + success: false as const, 84 + error: `Document not found: ${fetchError?.message || "no data"}`, 85 + }; 86 + } 87 + 88 + const data = doc.data as SiteStandardDocument.Record; 89 + const oldSite = data?.site; 90 + 91 + if (!oldSite || !oldSite.includes("/pub.leaflet.publication/")) { 92 + return { 93 + success: false as const, 94 + error: "Document does not have a pub.leaflet.publication site reference", 95 + }; 96 + } 97 + 98 + // Convert to new publication URI 99 + const oldPubAturi = new AtUri(oldSite); 100 + const newSite = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 101 + 102 + // Update the record 103 + const updatedRecord: SiteStandardDocument.Record = { 104 + ...data, 105 + site: newSite, 106 + }; 107 + 108 + // Write to PDS 109 + const docAturi = new AtUri(docUri); 110 + const agent = await createAuthenticatedAgent(did); 111 + await agent.com.atproto.repo.putRecord({ 112 + repo: did, 113 + collection: "site.standard.document", 114 + rkey: docAturi.rkey, 115 + record: updatedRecord, 116 + validate: false, 117 + }); 118 + 119 + // Update database 120 + const { error: dbError } = await supabaseServerClient 121 + .from("documents") 122 + .update({ data: updatedRecord as Json }) 123 + .eq("uri", docUri); 124 + 125 + if (dbError) { 126 + return { 127 + success: false as const, 128 + error: `Database update failed: ${dbError.message}`, 129 + }; 130 + } 131 + 132 + return { 133 + success: true as const, 134 + oldSite, 135 + newSite, 136 + }; 137 + }, 138 + ); 139 + 140 + if (result.success) { 141 + stats.documentsFixed++; 142 + 143 + // Fix the documents_in_publications entry 144 + const joinResult = await step.run( 145 + `fix-join-${docUri.slice(-12)}`, 146 + async () => { 147 + // Find the publication URI that exists in the database 148 + const { data: doc } = await supabaseServerClient 149 + .from("documents") 150 + .select("data") 151 + .eq("uri", docUri) 152 + .single(); 153 + 154 + const newSite = (doc?.data as any)?.site; 155 + if (!newSite) { 156 + return { success: false as const, error: "Could not read updated document" }; 157 + } 158 + 159 + // Check which publication URI exists 160 + const newPubAturi = new AtUri(newSite); 161 + const oldPubUri = `at://${newPubAturi.hostname}/pub.leaflet.publication/${newPubAturi.rkey}`; 162 + 163 + const { data: pubs } = await supabaseServerClient 164 + .from("publications") 165 + .select("uri") 166 + .in("uri", [newSite, oldPubUri]); 167 + 168 + const existingPubUri = pubs?.find((p) => p.uri === newSite)?.uri || 169 + pubs?.find((p) => p.uri === oldPubUri)?.uri; 170 + 171 + if (!existingPubUri) { 172 + return { success: false as const, error: "No matching publication found" }; 173 + } 174 + 175 + // Delete any existing entries for this document 176 + await supabaseServerClient 177 + .from("documents_in_publications") 178 + .delete() 179 + .eq("document", docUri); 180 + 181 + // Insert the correct entry 182 + const { error: insertError } = await supabaseServerClient 183 + .from("documents_in_publications") 184 + .insert({ 185 + document: docUri, 186 + publication: existingPubUri, 187 + }); 188 + 189 + if (insertError) { 190 + return { success: false as const, error: insertError.message }; 191 + } 192 + 193 + return { success: true as const, publication: existingPubUri }; 194 + }, 195 + ); 196 + 197 + if (joinResult.success) { 198 + stats.joinEntriesFixed++; 199 + } else { 200 + stats.errors.push(`Join table fix failed for ${docUri}: ${"error" in joinResult ? joinResult.error : "unknown error"}`); 201 + } 202 + } else { 203 + stats.errors.push(`${docUri}: ${result.error}`); 204 + } 205 + } 206 + } 207 + 208 + return { 209 + success: stats.errors.length === 0, 210 + stats, 211 + }; 212 + }, 213 + );
+186 -52
app/api/inngest/functions/migrate_user_to_standard.ts
··· 38 38 const stats = { 39 39 publicationsMigrated: 0, 40 40 documentsMigrated: 0, 41 + standardDocumentsFixed: 0, 41 42 userSubscriptionsMigrated: 0, 42 43 referencesUpdated: 0, 43 44 errors: [] as string[], 44 45 }; 45 46 46 47 // Step 1: Verify OAuth session is valid 47 - await step.run("verify-oauth-session", async () => { 48 + const oauthValid = await step.run("verify-oauth-session", async () => { 48 49 const result = await restoreOAuthSession(did); 49 50 if (!result.ok) { 50 - throw new Error( 51 - `Failed to restore OAuth session: ${result.error.message}`, 52 - ); 51 + // Mark identity as needing migration so we can retry later 52 + await supabaseServerClient 53 + .from("identities") 54 + .update({ 55 + metadata: { needsStandardSiteMigration: true }, 56 + }) 57 + .eq("atp_did", did); 58 + 59 + return { success: false, error: result.error.message }; 53 60 } 54 61 return { success: true }; 55 62 }); 63 + 64 + if (!oauthValid.success) { 65 + return { 66 + success: false, 67 + error: `Failed to restore OAuth session`, 68 + stats, 69 + publicationUriMap: {}, 70 + documentUriMap: {}, 71 + userSubscriptionUriMap: {}, 72 + }; 73 + } 56 74 57 75 // Step 2: Get user's pub.leaflet.publication records 58 76 const oldPublications = await step.run( ··· 109 127 }) 110 128 .filter((x) => x !== null); 111 129 112 - // Run all PDS writes in parallel 113 - const pubPdsResults = await Promise.all( 114 - publicationsToMigrate.map(({ pub, rkey, newRecord }) => 115 - step.run(`pds-write-publication-${pub.uri}`, async () => { 130 + // Run PDS + DB writes together for each publication 131 + const pubResults = await Promise.all( 132 + publicationsToMigrate.map(({ pub, rkey, normalized, newRecord }) => 133 + step.run(`migrate-publication-${pub.uri}`, async () => { 134 + // PDS write 116 135 const agent = await createAuthenticatedAgent(did); 117 136 const putResult = await agent.com.atproto.repo.putRecord({ 118 137 repo: did, ··· 121 140 record: newRecord, 122 141 validate: false, 123 142 }); 124 - return { oldUri: pub.uri, newUri: putResult.data.uri }; 125 - }), 126 - ), 127 - ); 143 + const newUri = putResult.data.uri; 128 144 129 - // Run all DB writes in parallel 130 - const pubDbResults = await Promise.all( 131 - publicationsToMigrate.map(({ pub, normalized, newRecord }, index) => { 132 - const newUri = pubPdsResults[index].newUri; 133 - return step.run(`db-write-publication-${pub.uri}`, async () => { 145 + // DB write 134 146 const { error: dbError } = await supabaseServerClient 135 147 .from("publications") 136 148 .upsert({ ··· 149 161 }; 150 162 } 151 163 return { success: true as const, oldUri: pub.uri, newUri }; 152 - }); 153 - }), 164 + }), 165 + ), 154 166 ); 155 167 156 168 // Process results 157 - for (const result of pubDbResults) { 169 + for (const result of pubResults) { 158 170 if (result.success) { 159 171 publicationUriMap[result.oldUri] = result.newUri; 160 172 stats.publicationsMigrated++; ··· 239 251 $type: "site.standard.document", 240 252 title: normalized.title || "Untitled", 241 253 site: siteValue, 242 - path: rkey, 254 + path: "/" + rkey, 243 255 publishedAt: normalized.publishedAt || new Date().toISOString(), 244 256 description: normalized.description, 245 257 content: normalized.content, ··· 252 264 }) 253 265 .filter((x) => x !== null); 254 266 255 - // Run all PDS writes in parallel 256 - const docPdsResults = await Promise.all( 257 - documentsToMigrate.map(({ doc, rkey, newRecord }) => 258 - step.run(`pds-write-document-${doc.uri}`, async () => { 267 + // Run PDS + DB writes together for each document 268 + const docResults = await Promise.all( 269 + documentsToMigrate.map(({ doc, rkey, newRecord, oldPubUri }) => 270 + step.run(`migrate-document-${doc.uri}`, async () => { 271 + // PDS write 259 272 const agent = await createAuthenticatedAgent(did); 260 273 const putResult = await agent.com.atproto.repo.putRecord({ 261 274 repo: did, ··· 264 277 record: newRecord, 265 278 validate: false, 266 279 }); 267 - return { oldUri: doc.uri, newUri: putResult.data.uri }; 268 - }), 269 - ), 270 - ); 280 + const newUri = putResult.data.uri; 271 281 272 - // Run all DB writes in parallel 273 - const docDbResults = await Promise.all( 274 - documentsToMigrate.map(({ doc, newRecord, oldPubUri }, index) => { 275 - const newUri = docPdsResults[index].newUri; 276 - return step.run(`db-write-document-${doc.uri}`, async () => { 282 + // DB write 277 283 const { error: dbError } = await supabaseServerClient 278 284 .from("documents") 279 285 .upsert({ ··· 302 308 } 303 309 304 310 return { success: true as const, oldUri: doc.uri, newUri }; 305 - }); 306 - }), 311 + }), 312 + ), 307 313 ); 308 314 309 315 // Process results 310 - for (const result of docDbResults) { 316 + for (const result of docResults) { 311 317 if (result.success) { 312 318 documentUriMap[result.oldUri] = result.newUri; 313 319 stats.documentsMigrated++; ··· 318 324 } 319 325 } 320 326 327 + // Step 4b: Fix existing site.standard.document records that reference pub.leaflet.publication 328 + // This handles the case where site.standard.document records were created pointing to 329 + // pub.leaflet.publication URIs before the publication was migrated to site.standard.publication 330 + const existingStandardDocs = await step.run( 331 + "fetch-existing-standard-documents", 332 + async () => { 333 + const { data, error } = await supabaseServerClient 334 + .from("documents") 335 + .select("uri, data") 336 + .like("uri", `at://${did}/site.standard.document/%`); 337 + 338 + if (error) 339 + throw new Error( 340 + `Failed to fetch existing standard documents: ${error.message}`, 341 + ); 342 + return data || []; 343 + }, 344 + ); 345 + 346 + // Find documents that reference pub.leaflet.publication and need their site field updated 347 + const standardDocsToFix = existingStandardDocs 348 + .map((doc) => { 349 + const data = doc.data as SiteStandardDocument.Record; 350 + const site = data?.site; 351 + 352 + // Check if site field references a pub.leaflet.publication 353 + if (!site || !site.includes("/pub.leaflet.publication/")) { 354 + return null; 355 + } 356 + 357 + try { 358 + const oldPubAturi = new AtUri(site); 359 + const newPubUri = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 360 + 361 + // Only fix if we have the new publication in our map (meaning it was migrated) 362 + // or if the new publication exists (check against all migrated publications) 363 + if ( 364 + publicationUriMap[site] || 365 + Object.values(publicationUriMap).includes(newPubUri) 366 + ) { 367 + const docAturi = new AtUri(doc.uri); 368 + const updatedRecord: SiteStandardDocument.Record = { 369 + ...data, 370 + site: newPubUri, 371 + }; 372 + 373 + return { 374 + doc, 375 + rkey: docAturi.rkey, 376 + oldSite: site, 377 + newSite: newPubUri, 378 + updatedRecord, 379 + }; 380 + } 381 + } catch (e) { 382 + stats.errors.push(`Invalid site URI in document ${doc.uri}: ${site}`); 383 + } 384 + 385 + return null; 386 + }) 387 + .filter((x) => x !== null); 388 + 389 + // Update these documents on PDS and in database 390 + if (standardDocsToFix.length > 0) { 391 + const fixResults = await Promise.all( 392 + standardDocsToFix.map(({ doc, rkey, oldSite, newSite, updatedRecord }) => 393 + step.run(`fix-standard-document-${doc.uri}`, async () => { 394 + // PDS write to update the site field 395 + const agent = await createAuthenticatedAgent(did); 396 + await agent.com.atproto.repo.putRecord({ 397 + repo: did, 398 + collection: "site.standard.document", 399 + rkey, 400 + record: updatedRecord, 401 + validate: false, 402 + }); 403 + 404 + // DB write 405 + const { error: dbError } = await supabaseServerClient 406 + .from("documents") 407 + .update({ data: updatedRecord as Json }) 408 + .eq("uri", doc.uri); 409 + 410 + if (dbError) { 411 + return { 412 + success: false as const, 413 + uri: doc.uri, 414 + error: dbError.message, 415 + }; 416 + } 417 + 418 + // Update documents_in_publications to point to new publication URI 419 + await supabaseServerClient 420 + .from("documents_in_publications") 421 + .upsert({ 422 + publication: newSite, 423 + document: doc.uri, 424 + }); 425 + 426 + // Remove old publication reference if different 427 + if (oldSite !== newSite) { 428 + await supabaseServerClient 429 + .from("documents_in_publications") 430 + .delete() 431 + .eq("publication", oldSite) 432 + .eq("document", doc.uri); 433 + } 434 + 435 + return { success: true as const, uri: doc.uri }; 436 + }), 437 + ), 438 + ); 439 + 440 + for (const result of fixResults) { 441 + if (result.success) { 442 + stats.standardDocumentsFixed++; 443 + } else { 444 + stats.errors.push( 445 + `Fix standard document ${result.uri}: Database error: ${result.error}`, 446 + ); 447 + } 448 + } 449 + } 450 + 321 451 // Step 5: Update references in database tables (all in parallel) 322 452 await step.run("update-references", async () => { 323 453 const pubEntries = Object.entries(publicationUriMap); ··· 428 558 }) 429 559 .filter((x) => x !== null); 430 560 431 - // Run all PDS writes in parallel 432 - const subPdsResults = await Promise.all( 561 + // Run PDS + DB writes together for each subscription 562 + const subResults = await Promise.all( 433 563 subscriptionsToMigrate.map(({ sub, rkey, newRecord }) => 434 - step.run(`pds-write-subscription-${sub.uri}`, async () => { 564 + step.run(`migrate-subscription-${sub.uri}`, async () => { 565 + // PDS write 435 566 const agent = await createAuthenticatedAgent(did); 436 567 const putResult = await agent.com.atproto.repo.putRecord({ 437 568 repo: did, ··· 440 571 record: newRecord, 441 572 validate: false, 442 573 }); 443 - return { oldUri: sub.uri, newUri: putResult.data.uri }; 444 - }), 445 - ), 446 - ); 574 + const newUri = putResult.data.uri; 447 575 448 - // Run all DB writes in parallel 449 - const subDbResults = await Promise.all( 450 - subscriptionsToMigrate.map(({ sub, newRecord }, index) => { 451 - const newUri = subPdsResults[index].newUri; 452 - return step.run(`db-write-subscription-${sub.uri}`, async () => { 576 + // DB write 453 577 const { error: dbError } = await supabaseServerClient 454 578 .from("publication_subscriptions") 455 579 .update({ ··· 467 591 }; 468 592 } 469 593 return { success: true as const, oldUri: sub.uri, newUri }; 470 - }); 471 - }), 594 + }), 595 + ), 472 596 ); 473 597 474 598 // Process results 475 - for (const result of subDbResults) { 599 + for (const result of subResults) { 476 600 if (result.success) { 477 601 userSubscriptionUriMap[result.oldUri] = result.newUri; 478 602 stats.userSubscriptionsMigrated++; ··· 489 613 // 2. External references (e.g., from other AT Proto apps) to old URIs continue to work 490 614 // 3. The normalization layer handles both schemas transparently for reads 491 615 // Old records are also kept on the user's PDS so existing AT-URI references remain valid. 616 + 617 + // Clear the migration flag on success 618 + if (stats.errors.length === 0) { 619 + await step.run("clear-migration-flag", async () => { 620 + await supabaseServerClient 621 + .from("identities") 622 + .update({ metadata: null }) 623 + .eq("atp_did", did); 624 + }); 625 + } 492 626 493 627 return { 494 628 success: stats.errors.length === 0,
+12
app/api/inngest/route.tsx
··· 5 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 6 import { index_follows } from "./functions/index_follows"; 7 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 + import { fix_standard_document_publications } from "./functions/fix_standard_document_publications"; 9 + import { fix_incorrect_site_values } from "./functions/fix_incorrect_site_values"; 10 + import { fix_standard_document_postref } from "./functions/fix_standard_document_postref"; 11 + import { 12 + cleanup_expired_oauth_sessions, 13 + check_oauth_session, 14 + } from "./functions/cleanup_expired_oauth_sessions"; 8 15 9 16 export const { GET, POST, PUT } = serve({ 10 17 client: inngest, ··· 14 21 batched_update_profiles, 15 22 index_follows, 16 23 migrate_user_to_standard, 24 + fix_standard_document_publications, 25 + fix_incorrect_site_values, 26 + fix_standard_document_postref, 27 + cleanup_expired_oauth_sessions, 28 + check_oauth_session, 17 29 ], 18 30 });
+11
app/api/oauth/[route]/route.ts
··· 11 11 ActionAfterSignIn, 12 12 parseActionFromSearchParam, 13 13 } from "./afterSignInActions"; 14 + import { inngest } from "app/api/inngest/client"; 14 15 15 16 type OauthRequestClientState = { 16 17 redirect: string | null; ··· 84 85 .single(); 85 86 identity = data; 86 87 } 88 + 89 + // Trigger migration if identity needs it 90 + const metadata = identity?.metadata as Record<string, unknown> | null; 91 + if (metadata?.needsStandardSiteMigration) { 92 + await inngest.send({ 93 + name: "user/migrate-to-standard", 94 + data: { did: session.did }, 95 + }); 96 + } 97 + 87 98 let { data: token } = await supabaseServerClient 88 99 .from("email_auth_tokens") 89 100 .insert({
+1
app/api/rpc/[command]/get_publication_data.ts
··· 83 83 uri: dip.documents.uri, 84 84 record: normalized, 85 85 indexed_at: dip.documents.indexed_at, 86 + sort_date: dip.documents.sort_date, 86 87 data: dip.documents.data, 87 88 commentsCount: dip.documents.comments_on_documents[0]?.count || 0, 88 89 mentionsCount: dip.documents.document_mentions_in_bsky[0]?.count || 0,
+34
app/lish/[did]/[publication]/.well-known/site.standard.publication/route.ts
··· 1 + import { publicationNameOrUriFilter } from "src/utils/uriHelpers"; 2 + import { supabaseServerClient } from "supabase/serverClient"; 3 + 4 + export async function GET( 5 + req: Request, 6 + props: { 7 + params: Promise<{ publication: string; did: string }>; 8 + }, 9 + ) { 10 + let params = await props.params; 11 + let did = decodeURIComponent(params.did); 12 + let publication_name = decodeURIComponent(params.publication); 13 + let [{ data: publications }] = await Promise.all([ 14 + supabaseServerClient 15 + .from("publications") 16 + .select( 17 + `*, 18 + publication_subscriptions(*), 19 + documents_in_publications(documents( 20 + *, 21 + comments_on_documents(count), 22 + document_mentions_in_bsky(count) 23 + )) 24 + `, 25 + ) 26 + .eq("identity_did", did) 27 + .or(publicationNameOrUriFilter(did, publication_name)) 28 + .order("uri", { ascending: false }) 29 + .limit(1), 30 + ]); 31 + let publication = publications?.[0]; 32 + if (!did || !publication) return new Response(null, { status: 404 }); 33 + return new Response(publication.uri); 34 + }
+7 -4
app/lish/[did]/[publication]/[rkey]/page.tsx
··· 35 35 sizes: "32x32", 36 36 type: "image/png", 37 37 }, 38 - other: { 39 - rel: "alternate", 40 - url: document.uri, 41 - }, 38 + other: [ 39 + { 40 + rel: "alternate", 41 + url: document.uri, 42 + }, 43 + { rel: "site.standard.document", url: document.uri }, 44 + ], 42 45 }, 43 46 title: 44 47 docRecord.title +
+1
app/lish/[did]/[publication]/dashboard/PublishedPostsLists.tsx
··· 111 111 documents: { 112 112 uri: doc.uri, 113 113 indexed_at: doc.indexed_at, 114 + sort_date: doc.sort_date, 114 115 data: doc.data, 115 116 }, 116 117 },
+7 -3
app/lish/createPub/getPublicationURL.ts
··· 25 25 } 26 26 27 27 // Fall back to checking raw record for legacy base_path 28 - if (isLeafletPublication(pub.record) && pub.record.base_path && isProductionDomain()) { 28 + if ( 29 + isLeafletPublication(pub.record) && 30 + pub.record.base_path && 31 + isProductionDomain() 32 + ) { 29 33 return `https://${pub.record.base_path}`; 30 34 } 31 35 ··· 36 40 const normalized = normalizePublicationRecord(pub.record); 37 41 const aturi = new AtUri(pub.uri); 38 42 39 - // Use normalized name if available, fall back to rkey 40 - const name = normalized?.name || aturi.rkey; 43 + //use rkey, fallback to name 44 + const name = aturi.rkey || normalized?.name; 41 45 return `/lish/${aturi.host}/${encodeURIComponent(name || "")}`; 42 46 }
+1
app/lish/createPub/updatePublication.ts
··· 278 278 return withPublicationUpdate(uri, async ({ normalizedPub, existingBasePath, publicationType, agent }) => { 279 279 // Build theme object 280 280 const themeData = { 281 + $type: "pub.leaflet.publication#theme" as const, 281 282 backgroundImage: theme.backgroundImage 282 283 ? { 283 284 $type: "pub.leaflet.theme.backgroundImage",
+7 -4
app/p/[didOrHandle]/[rkey]/page.tsx
··· 38 38 39 39 return { 40 40 icons: { 41 - other: { 42 - rel: "alternate", 43 - url: document.uri, 44 - }, 41 + other: [ 42 + { 43 + rel: "alternate", 44 + url: document.uri, 45 + }, 46 + { rel: "site.standard.document", url: document.uri }, 47 + ], 45 48 }, 46 49 title: docRecord.title, 47 50 description: docRecord?.description || "",
+2 -1
components/Blocks/PublicationPollBlock.tsx
··· 27 27 setAreYouSure?: (value: boolean) => void; 28 28 }, 29 29 ) => { 30 - let { data: publicationData } = useLeafletPublicationData(); 30 + let { data: publicationData, normalizedDocument } = 31 + useLeafletPublicationData(); 31 32 let isSelected = useUIState((s) => 32 33 s.selectedBlocks.find((b) => b.value === props.entityID), 33 34 );
+1
drizzle/schema.ts
··· 140 140 email: text("email"), 141 141 atp_did: text("atp_did"), 142 142 interface_state: jsonb("interface_state"), 143 + metadata: jsonb("metadata"), 143 144 }, 144 145 (table) => { 145 146 return {
+3 -3
feeds/index.ts
··· 116 116 } 117 117 query = query 118 118 .or("data->postRef.not.is.null,data->bskyPostRef.not.is.null") 119 - .order("indexed_at", { ascending: false }) 119 + .order("sort_date", { ascending: false }) 120 120 .order("uri", { ascending: false }) 121 121 .limit(25); 122 122 if (parsedCursor) 123 123 query = query.or( 124 - `indexed_at.lt.${parsedCursor.date},and(indexed_at.eq.${parsedCursor.date},uri.lt.${parsedCursor.uri})`, 124 + `sort_date.lt.${parsedCursor.date},and(sort_date.eq.${parsedCursor.date},uri.lt.${parsedCursor.uri})`, 125 125 ); 126 126 127 127 let { data, error } = await query; ··· 131 131 posts = posts || []; 132 132 133 133 let lastPost = posts[posts.length - 1]; 134 - let newCursor = lastPost ? `${lastPost.indexed_at}::${lastPost.uri}` : null; 134 + let newCursor = lastPost ? `${lastPost.sort_date}::${lastPost.uri}` : null; 135 135 return c.json({ 136 136 cursor: newCursor || cursor, 137 137 feed: posts.flatMap((p) => {
+2 -2
lexicons/api/lexicons.ts
··· 2215 2215 type: 'ref', 2216 2216 }, 2217 2217 theme: { 2218 - type: 'ref', 2219 - ref: 'lex:pub.leaflet.publication#theme', 2218 + type: 'union', 2219 + refs: ['lex:pub.leaflet.publication#theme'], 2220 2220 }, 2221 2221 description: { 2222 2222 maxGraphemes: 300,
+1 -1
lexicons/api/types/site/standard/publication.ts
··· 15 15 export interface Record { 16 16 $type: 'site.standard.publication' 17 17 basicTheme?: SiteStandardThemeBasic.Main 18 - theme?: PubLeafletPublication.Theme 18 + theme?: $Typed<PubLeafletPublication.Theme> | { $type: string } 19 19 description?: string 20 20 icon?: BlobRef 21 21 name: string
+2 -2
lexicons/site/standard/publication.json
··· 9 9 "type": "ref" 10 10 }, 11 11 "theme": { 12 - "type": "ref", 13 - "ref": "pub.leaflet.publication#theme" 12 + "type": "union", 13 + "refs": ["pub.leaflet.publication#theme"] 14 14 }, 15 15 "description": { 16 16 "maxGraphemes": 300,
+40 -5
lexicons/src/normalize.ts
··· 14 14 */ 15 15 16 16 import type * as PubLeafletDocument from "../api/types/pub/leaflet/document"; 17 - import type * as PubLeafletPublication from "../api/types/pub/leaflet/publication"; 17 + import * as PubLeafletPublication from "../api/types/pub/leaflet/publication"; 18 18 import type * as PubLeafletContent from "../api/types/pub/leaflet/content"; 19 19 import type * as SiteStandardDocument from "../api/types/site/standard/document"; 20 20 import type * as SiteStandardPublication from "../api/types/site/standard/publication"; ··· 31 31 }; 32 32 33 33 // Normalized publication type - uses the generated site.standard.publication type 34 - export type NormalizedPublication = SiteStandardPublication.Record; 34 + // with the theme narrowed to only the valid pub.leaflet.publication#theme type 35 + // (isTheme validates that $type is present, so we use $Typed) 36 + // Note: We explicitly list fields rather than using Omit because the generated Record type 37 + // has an index signature [k: string]: unknown that interferes with property typing 38 + export type NormalizedPublication = { 39 + $type: "site.standard.publication"; 40 + name: string; 41 + url: string; 42 + description?: string; 43 + icon?: SiteStandardPublication.Record["icon"]; 44 + basicTheme?: SiteStandardThemeBasic.Main; 45 + theme?: $Typed<PubLeafletPublication.Theme>; 46 + preferences?: SiteStandardPublication.Preferences; 47 + }; 35 48 36 49 /** 37 50 * Checks if the record is a pub.leaflet.document ··· 210 223 ): NormalizedPublication | null { 211 224 if (!record || typeof record !== "object") return null; 212 225 213 - // Pass through site.standard records directly 226 + // Pass through site.standard records directly, but validate the theme 214 227 if (isStandardPublication(record)) { 215 - return record; 228 + // Validate theme - only keep if it's a valid pub.leaflet.publication#theme 229 + const theme = PubLeafletPublication.isTheme(record.theme) 230 + ? (record.theme as $Typed<PubLeafletPublication.Theme>) 231 + : undefined; 232 + return { 233 + ...record, 234 + theme, 235 + }; 216 236 } 217 237 218 238 if (isLeafletPublication(record)) { ··· 225 245 226 246 const basicTheme = leafletThemeToBasicTheme(record.theme); 227 247 248 + // Validate theme - only keep if it's a valid pub.leaflet.publication#theme with $type set 249 + // For legacy records without $type, add it during normalization 250 + let theme: $Typed<PubLeafletPublication.Theme> | undefined; 251 + if (record.theme) { 252 + if (PubLeafletPublication.isTheme(record.theme)) { 253 + theme = record.theme as $Typed<PubLeafletPublication.Theme>; 254 + } else { 255 + // Legacy theme without $type - add it 256 + theme = { 257 + ...record.theme, 258 + $type: "pub.leaflet.publication#theme", 259 + }; 260 + } 261 + } 262 + 228 263 // Convert preferences to site.standard format (strip/replace $type) 229 264 const preferences: SiteStandardPublication.Preferences | undefined = 230 265 record.preferences ··· 243 278 description: record.description, 244 279 icon: record.icon, 245 280 basicTheme, 246 - theme: record.theme, 281 + theme, 247 282 preferences, 248 283 }; 249 284 }
+4
supabase/database.types.ts
··· 337 337 Row: { 338 338 data: Json 339 339 indexed_at: string 340 + sort_date: string 340 341 uri: string 341 342 } 342 343 Insert: { ··· 551 552 home_page: string 552 553 id: string 553 554 interface_state: Json | null 555 + metadata: Json | null 554 556 } 555 557 Insert: { 556 558 atp_did?: string | null ··· 559 561 home_page?: string 560 562 id?: string 561 563 interface_state?: Json | null 564 + metadata?: Json | null 562 565 } 563 566 Update: { 564 567 atp_did?: string | null ··· 567 570 home_page?: string 568 571 id?: string 569 572 interface_state?: Json | null 573 + metadata?: Json | null 570 574 } 571 575 Relationships: [ 572 576 {
+1
supabase/migrations/20260123000000_add_metadata_to_identities.sql
··· 1 + alter table "public"."identities" add column "metadata" jsonb;
+28
supabase/migrations/20260125000000_add_sort_date_column.sql
··· 1 + -- Add sort_date computed column to documents table 2 + -- This column stores the older of publishedAt (from JSON data) or indexed_at 3 + -- Used for sorting feeds chronologically by when content was actually published 4 + 5 + -- Create an immutable function to parse ISO 8601 timestamps from text 6 + -- This is needed because direct ::timestamp cast is not immutable (accepts 'now', 'today', etc.) 7 + -- The regex validates the format before casting to ensure immutability 8 + CREATE OR REPLACE FUNCTION parse_iso_timestamp(text) RETURNS timestamptz 9 + LANGUAGE sql IMMUTABLE STRICT AS $$ 10 + SELECT CASE 11 + -- Match ISO 8601 format: YYYY-MM-DDTHH:MM:SS with optional fractional seconds and Z/timezone 12 + WHEN $1 ~ '^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?$' THEN 13 + $1::timestamptz 14 + ELSE 15 + NULL 16 + END 17 + $$; 18 + 19 + ALTER TABLE documents 20 + ADD COLUMN sort_date timestamptz GENERATED ALWAYS AS ( 21 + LEAST( 22 + COALESCE(parse_iso_timestamp(data->>'publishedAt'), indexed_at), 23 + indexed_at 24 + ) 25 + ) STORED; 26 + 27 + -- Create index on sort_date for efficient ordering 28 + CREATE INDEX documents_sort_date_idx ON documents (sort_date DESC, uri DESC);