a tool for shared writing and social publishing

Compare changes

Choose any two refs to compare.

+111 -1242
+4 -7
actions/publishToPublication.ts
··· 65 65 } from "src/utils/collectionHelpers"; 66 66 67 67 type PublishResult = 68 - | { success: true; rkey: string; record: SiteStandardDocument.Record } 68 + | { success: true; rkey: string; record: PubLeafletDocument.Record } 69 69 | { success: false; error: OAuthSessionError }; 70 70 71 71 export async function publishToPublication({ ··· 199 199 } 200 200 201 201 // Determine the collection to use - preserve existing schema if updating 202 - const existingCollection = existingDocUri 203 - ? new AtUri(existingDocUri).collection 204 - : undefined; 202 + const existingCollection = existingDocUri ? new AtUri(existingDocUri).collection : undefined; 205 203 const documentType = getDocumentType(existingCollection); 206 204 207 205 // Build the pages array (used by both formats) ··· 230 228 if (documentType === "site.standard.document") { 231 229 // site.standard.document format 232 230 // For standalone docs, use HTTPS URL; for publication docs, use the publication AT-URI 233 - const siteUri = 234 - publication_uri || `https://leaflet.pub/p/${credentialSession.did}`; 231 + const siteUri = publication_uri || `https://leaflet.pub/p/${credentialSession.did}`; 235 232 236 233 record = { 237 234 $type: "site.standard.document", 238 235 title: title || "Untitled", 239 236 site: siteUri, 240 - path: "/" + rkey, 237 + path: rkey, 241 238 publishedAt: 242 239 publishedAt || existingRecord.publishedAt || new Date().toISOString(), 243 240 ...(description && { description }),
+1 -1
app/(home-pages)/discover/PubListing.tsx
··· 62 62 <p> 63 63 Updated{" "} 64 64 {timeAgo( 65 - props.documents_in_publications?.[0]?.documents?.sort_date || 65 + props.documents_in_publications?.[0]?.documents?.indexed_at || 66 66 "", 67 67 )} 68 68 </p>
+8 -8
app/(home-pages)/discover/getPublications.ts
··· 8 8 import { deduplicateByUri } from "src/utils/deduplicateRecords"; 9 9 10 10 export type Cursor = { 11 - sort_date?: string; 11 + indexed_at?: string; 12 12 count?: number; 13 13 uri: string; 14 14 }; ··· 32 32 .or( 33 33 "record->preferences->showInDiscover.is.null,record->preferences->>showInDiscover.eq.true", 34 34 ) 35 - .order("documents(sort_date)", { 35 + .order("indexed_at", { 36 36 referencedTable: "documents_in_publications", 37 37 ascending: false, 38 38 }) ··· 64 64 } else { 65 65 // recentlyUpdated 66 66 const aDate = new Date( 67 - a.documents_in_publications[0]?.documents?.sort_date || 0, 67 + a.documents_in_publications[0]?.indexed_at || 0, 68 68 ).getTime(); 69 69 const bDate = new Date( 70 - b.documents_in_publications[0]?.documents?.sort_date || 0, 70 + b.documents_in_publications[0]?.indexed_at || 0, 71 71 ).getTime(); 72 72 if (bDate !== aDate) { 73 73 return bDate - aDate; ··· 89 89 (pubCount === cursor.count && pub.uri < cursor.uri) 90 90 ); 91 91 } else { 92 - const pubDate = pub.documents_in_publications[0]?.documents?.sort_date || ""; 92 + const pubDate = pub.documents_in_publications[0]?.indexed_at || ""; 93 93 // Find first pub after cursor 94 94 return ( 95 - pubDate < (cursor.sort_date || "") || 96 - (pubDate === cursor.sort_date && pub.uri < cursor.uri) 95 + pubDate < (cursor.indexed_at || "") || 96 + (pubDate === cursor.indexed_at && pub.uri < cursor.uri) 97 97 ); 98 98 } 99 99 }); ··· 117 117 normalizedPage.length > 0 && startIndex + limit < allPubs.length 118 118 ? order === "recentlyUpdated" 119 119 ? { 120 - sort_date: lastItem.documents_in_publications[0]?.documents?.sort_date, 120 + indexed_at: lastItem.documents_in_publications[0]?.indexed_at, 121 121 uri: lastItem.uri, 122 122 } 123 123 : {
+5 -5
app/(home-pages)/p/[didOrHandle]/getProfilePosts.ts
··· 10 10 import { deduplicateByUriOrdered } from "src/utils/deduplicateRecords"; 11 11 12 12 export type Cursor = { 13 - sort_date: string; 13 + indexed_at: string; 14 14 uri: string; 15 15 }; 16 16 ··· 29 29 documents_in_publications(publications(*))`, 30 30 ) 31 31 .like("uri", `at://${did}/%`) 32 - .order("sort_date", { ascending: false }) 32 + .order("indexed_at", { ascending: false }) 33 33 .order("uri", { ascending: false }) 34 34 .limit(limit); 35 35 36 36 if (cursor) { 37 37 query = query.or( 38 - `sort_date.lt.${cursor.sort_date},and(sort_date.eq.${cursor.sort_date},uri.lt.${cursor.uri})`, 38 + `indexed_at.lt.${cursor.indexed_at},and(indexed_at.eq.${cursor.indexed_at},uri.lt.${cursor.uri})`, 39 39 ); 40 40 } 41 41 ··· 79 79 documents: { 80 80 data: normalizedData, 81 81 uri: doc.uri, 82 - sort_date: doc.sort_date, 82 + indexed_at: doc.indexed_at, 83 83 comments_on_documents: doc.comments_on_documents, 84 84 document_mentions_in_bsky: doc.document_mentions_in_bsky, 85 85 }, ··· 99 99 const nextCursor = 100 100 posts.length === limit 101 101 ? { 102 - sort_date: posts[posts.length - 1].documents.sort_date, 102 + indexed_at: posts[posts.length - 1].documents.indexed_at, 103 103 uri: posts[posts.length - 1].documents.uri, 104 104 } 105 105 : null;
+5 -5
app/(home-pages)/reader/getReaderFeed.ts
··· 38 38 "documents_in_publications.publications.publication_subscriptions.identity", 39 39 auth_res.atp_did, 40 40 ) 41 - .order("sort_date", { ascending: false }) 41 + .order("indexed_at", { ascending: false }) 42 42 .order("uri", { ascending: false }) 43 43 .limit(25); 44 44 if (cursor) { 45 45 query = query.or( 46 - `sort_date.lt.${cursor.timestamp},and(sort_date.eq.${cursor.timestamp},uri.lt.${cursor.uri})`, 46 + `indexed_at.lt.${cursor.timestamp},and(indexed_at.eq.${cursor.timestamp},uri.lt.${cursor.uri})`, 47 47 ); 48 48 } 49 49 let { data: rawFeed, error } = await query; ··· 78 78 document_mentions_in_bsky: post.document_mentions_in_bsky, 79 79 data: normalizedData, 80 80 uri: post.uri, 81 - sort_date: post.sort_date, 81 + indexed_at: post.indexed_at, 82 82 }, 83 83 }; 84 84 return p; ··· 88 88 const nextCursor = 89 89 posts.length > 0 90 90 ? { 91 - timestamp: posts[posts.length - 1].documents.sort_date, 91 + timestamp: posts[posts.length - 1].documents.indexed_at, 92 92 uri: posts[posts.length - 1].documents.uri, 93 93 } 94 94 : null; ··· 109 109 documents: { 110 110 data: NormalizedDocument | null; 111 111 uri: string; 112 - sort_date: string; 112 + indexed_at: string; 113 113 comments_on_documents: { count: number }[] | undefined; 114 114 document_mentions_in_bsky: { count: number }[] | undefined; 115 115 };
+2 -2
app/(home-pages)/reader/getSubscriptions.ts
··· 32 32 .select(`*, publications(*, documents_in_publications(*, documents(*)))`) 33 33 .order(`created_at`, { ascending: false }) 34 34 .order(`uri`, { ascending: false }) 35 - .order("documents(sort_date)", { 35 + .order("indexed_at", { 36 36 ascending: false, 37 37 referencedTable: "publications.documents_in_publications", 38 38 }) ··· 85 85 record: NormalizedPublication; 86 86 uri: string; 87 87 documents_in_publications: { 88 - documents: { data?: Json; sort_date: string } | null; 88 + documents: { data?: Json; indexed_at: string } | null; 89 89 }[]; 90 90 };
+2 -2
app/(home-pages)/tag/[tag]/getDocumentsByTag.ts
··· 24 24 documents_in_publications(publications(*))`, 25 25 ) 26 26 .contains("data->tags", `["${tag}"]`) 27 - .order("sort_date", { ascending: false }) 27 + .order("indexed_at", { ascending: false }) 28 28 .limit(50); 29 29 30 30 if (error) { ··· 69 69 document_mentions_in_bsky: doc.document_mentions_in_bsky, 70 70 data: normalizedData, 71 71 uri: doc.uri, 72 - sort_date: doc.sort_date, 72 + indexed_at: doc.indexed_at, 73 73 }, 74 74 }; 75 75 return post;
+7 -4
app/[leaflet_id]/publish/publishBskyPost.ts
··· 8 8 import sharp from "sharp"; 9 9 import { TID } from "@atproto/common"; 10 10 import { getIdentityData } from "actions/getIdentityData"; 11 - import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 12 - import { restoreOAuthSession, OAuthSessionError } from "src/atproto-oauth"; 11 + import { AtpBaseClient, PubLeafletDocument } from "lexicons/api"; 12 + import { 13 + restoreOAuthSession, 14 + OAuthSessionError, 15 + } from "src/atproto-oauth"; 13 16 import { supabaseServerClient } from "supabase/serverClient"; 14 17 import { Json } from "supabase/database.types"; 15 18 import { ··· 27 30 url: string; 28 31 title: string; 29 32 description: string; 30 - document_record: SiteStandardDocument.Record; 33 + document_record: PubLeafletDocument.Record; 31 34 rkey: string; 32 35 facets: AppBskyRichtextFacet.Main[]; 33 36 }): Promise<PublishBskyResult> { ··· 112 115 }, 113 116 ); 114 117 let record = args.document_record; 115 - record.bskyPostRef = post; 118 + record.postRef = post; 116 119 117 120 let { data: result } = await agent.com.atproto.repo.putRecord({ 118 121 rkey: args.rkey,
-25
app/api/inngest/client.ts
··· 26 26 did: string; 27 27 }; 28 28 }; 29 - "user/cleanup-expired-oauth-sessions": { 30 - data: {}; 31 - }; 32 - "user/check-oauth-session": { 33 - data: { 34 - identityId: string; 35 - did: string; 36 - tokenCount: number; 37 - }; 38 - }; 39 - "documents/fix-publication-references": { 40 - data: { 41 - documentUris: string[]; 42 - }; 43 - }; 44 - "documents/fix-incorrect-site-values": { 45 - data: { 46 - did: string; 47 - }; 48 - }; 49 - "documents/fix-postref": { 50 - data: { 51 - documentUris?: string[]; 52 - }; 53 - }; 54 29 }; 55 30 56 31 // Create a client to send and receive events
-123
app/api/inngest/functions/cleanup_expired_oauth_sessions.ts
··· 1 - import { supabaseServerClient } from "supabase/serverClient"; 2 - import { inngest } from "../client"; 3 - import { restoreOAuthSession } from "src/atproto-oauth"; 4 - 5 - // Main function that fetches identities and publishes events for each one 6 - export const cleanup_expired_oauth_sessions = inngest.createFunction( 7 - { id: "cleanup_expired_oauth_sessions" }, 8 - { event: "user/cleanup-expired-oauth-sessions" }, 9 - async ({ step }) => { 10 - // Get all identities with an atp_did (OAuth users) that have at least one auth token 11 - const identities = await step.run("fetch-oauth-identities", async () => { 12 - const { data, error } = await supabaseServerClient 13 - .from("identities") 14 - .select("id, atp_did, email_auth_tokens(count)") 15 - .not("atp_did", "is", null); 16 - 17 - if (error) { 18 - throw new Error(`Failed to fetch identities: ${error.message}`); 19 - } 20 - 21 - // Filter to only include identities with at least one auth token 22 - return (data || []) 23 - .filter((identity) => { 24 - const tokenCount = identity.email_auth_tokens?.[0]?.count ?? 0; 25 - return tokenCount > 0; 26 - }) 27 - .map((identity) => ({ 28 - id: identity.id, 29 - atp_did: identity.atp_did!, 30 - tokenCount: identity.email_auth_tokens?.[0]?.count ?? 0, 31 - })); 32 - }); 33 - 34 - console.log( 35 - `Found ${identities.length} OAuth identities with active sessions to check`, 36 - ); 37 - 38 - // Publish events for each identity in batches 39 - const BATCH_SIZE = 100; 40 - let totalSent = 0; 41 - 42 - for (let i = 0; i < identities.length; i += BATCH_SIZE) { 43 - const batch = identities.slice(i, i + BATCH_SIZE); 44 - 45 - await step.run(`send-events-batch-${i}`, async () => { 46 - const events = batch.map((identity) => ({ 47 - name: "user/check-oauth-session" as const, 48 - data: { 49 - identityId: identity.id, 50 - did: identity.atp_did, 51 - tokenCount: identity.tokenCount, 52 - }, 53 - })); 54 - 55 - await inngest.send(events); 56 - return events.length; 57 - }); 58 - 59 - totalSent += batch.length; 60 - } 61 - 62 - console.log(`Published ${totalSent} check-oauth-session events`); 63 - 64 - return { 65 - success: true, 66 - identitiesQueued: totalSent, 67 - }; 68 - }, 69 - ); 70 - 71 - // Function that checks a single identity's OAuth session and cleans up if expired 72 - export const check_oauth_session = inngest.createFunction( 73 - { id: "check_oauth_session" }, 74 - { event: "user/check-oauth-session" }, 75 - async ({ event, step }) => { 76 - const { identityId, did, tokenCount } = event.data; 77 - 78 - const result = await step.run("check-and-cleanup", async () => { 79 - console.log(`Checking OAuth session for DID: ${did} (${tokenCount} tokens)`); 80 - 81 - const sessionResult = await restoreOAuthSession(did); 82 - 83 - if (sessionResult.ok) { 84 - console.log(` Session valid for ${did}`); 85 - return { valid: true, tokensDeleted: 0 }; 86 - } 87 - 88 - // Session is expired/invalid - delete associated auth tokens 89 - console.log( 90 - ` Session expired for ${did}: ${sessionResult.error.message}`, 91 - ); 92 - 93 - const { error: deleteError } = await supabaseServerClient 94 - .from("email_auth_tokens") 95 - .delete() 96 - .eq("identity", identityId); 97 - 98 - if (deleteError) { 99 - console.error( 100 - ` Error deleting tokens for identity ${identityId}: ${deleteError.message}`, 101 - ); 102 - return { 103 - valid: false, 104 - tokensDeleted: 0, 105 - error: deleteError.message, 106 - }; 107 - } 108 - 109 - console.log(` Deleted ${tokenCount} auth tokens for identity ${identityId}`); 110 - 111 - return { 112 - valid: false, 113 - tokensDeleted: tokenCount, 114 - }; 115 - }); 116 - 117 - return { 118 - identityId, 119 - did, 120 - ...result, 121 - }; 122 - }, 123 - );
-300
app/api/inngest/functions/fix_incorrect_site_values.ts
··· 1 - import { supabaseServerClient } from "supabase/serverClient"; 2 - import { inngest } from "../client"; 3 - import { restoreOAuthSession } from "src/atproto-oauth"; 4 - import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5 - import { AtUri } from "@atproto/syntax"; 6 - import { Json } from "supabase/database.types"; 7 - 8 - async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 - const result = await restoreOAuthSession(did); 10 - if (!result.ok) { 11 - throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 - } 13 - const credentialSession = result.value; 14 - return new AtpBaseClient( 15 - credentialSession.fetchHandler.bind(credentialSession), 16 - ); 17 - } 18 - 19 - /** 20 - * Build set of valid site values for a publication. 21 - * A site value is valid if it matches the publication or its legacy equivalent. 22 - */ 23 - function buildValidSiteValues(pubUri: string): Set<string> { 24 - const validValues = new Set<string>([pubUri]); 25 - 26 - try { 27 - const aturi = new AtUri(pubUri); 28 - 29 - if (pubUri.includes("/site.standard.publication/")) { 30 - // Also accept legacy pub.leaflet.publication 31 - validValues.add( 32 - `at://${aturi.hostname}/pub.leaflet.publication/${aturi.rkey}`, 33 - ); 34 - } else if (pubUri.includes("/pub.leaflet.publication/")) { 35 - // Also accept new site.standard.publication 36 - validValues.add( 37 - `at://${aturi.hostname}/site.standard.publication/${aturi.rkey}`, 38 - ); 39 - } 40 - } catch (e) { 41 - // Invalid URI, just use the original 42 - } 43 - 44 - return validValues; 45 - } 46 - 47 - /** 48 - * This function finds and fixes documents that have incorrect site values. 49 - * A document has an incorrect site value if its `site` field doesn't match 50 - * the publication it belongs to (via documents_in_publications). 51 - * 52 - * Takes a DID as input and processes publications owned by that identity. 53 - */ 54 - export const fix_incorrect_site_values = inngest.createFunction( 55 - { id: "fix_incorrect_site_values" }, 56 - { event: "documents/fix-incorrect-site-values" }, 57 - async ({ event, step }) => { 58 - const { did } = event.data; 59 - 60 - const stats = { 61 - publicationsChecked: 0, 62 - documentsChecked: 0, 63 - documentsWithIncorrectSite: 0, 64 - documentsFixed: 0, 65 - documentsMissingSite: 0, 66 - errors: [] as string[], 67 - }; 68 - 69 - // Step 1: Get all publications owned by this identity 70 - const publications = await step.run("fetch-publications", async () => { 71 - const { data, error } = await supabaseServerClient 72 - .from("publications") 73 - .select("uri") 74 - .eq("identity_did", did); 75 - 76 - if (error) { 77 - throw new Error(`Failed to fetch publications: ${error.message}`); 78 - } 79 - return data || []; 80 - }); 81 - 82 - stats.publicationsChecked = publications.length; 83 - 84 - if (publications.length === 0) { 85 - return { 86 - success: true, 87 - message: "No publications found for this identity", 88 - stats, 89 - }; 90 - } 91 - 92 - // Step 2: Get all documents_in_publications entries for these publications 93 - const publicationUris = publications.map((p) => p.uri); 94 - 95 - const joinEntries = await step.run( 96 - "fetch-documents-in-publications", 97 - async () => { 98 - const { data, error } = await supabaseServerClient 99 - .from("documents_in_publications") 100 - .select("document, publication") 101 - .in("publication", publicationUris); 102 - 103 - if (error) { 104 - throw new Error( 105 - `Failed to fetch documents_in_publications: ${error.message}`, 106 - ); 107 - } 108 - return data || []; 109 - }, 110 - ); 111 - 112 - if (joinEntries.length === 0) { 113 - return { 114 - success: true, 115 - message: "No documents found in publications", 116 - stats, 117 - }; 118 - } 119 - 120 - // Create a map of document URI -> expected publication URI 121 - const documentToPublication = new Map<string, string>(); 122 - for (const row of joinEntries) { 123 - documentToPublication.set(row.document, row.publication); 124 - } 125 - 126 - // Step 3: Fetch all document records 127 - const documentUris = Array.from(documentToPublication.keys()); 128 - 129 - const allDocuments = await step.run("fetch-documents", async () => { 130 - const { data, error } = await supabaseServerClient 131 - .from("documents") 132 - .select("uri, data") 133 - .in("uri", documentUris); 134 - 135 - if (error) { 136 - throw new Error(`Failed to fetch documents: ${error.message}`); 137 - } 138 - return data || []; 139 - }); 140 - 141 - stats.documentsChecked = allDocuments.length; 142 - 143 - // Step 4: Find documents with incorrect site values 144 - const documentsToFix: Array<{ 145 - uri: string; 146 - currentSite: string | null; 147 - correctSite: string; 148 - docData: SiteStandardDocument.Record; 149 - }> = []; 150 - 151 - for (const doc of allDocuments) { 152 - const expectedPubUri = documentToPublication.get(doc.uri); 153 - if (!expectedPubUri) continue; 154 - 155 - const data = doc.data as unknown as SiteStandardDocument.Record; 156 - const currentSite = data?.site; 157 - 158 - if (!currentSite) { 159 - stats.documentsMissingSite++; 160 - continue; 161 - } 162 - 163 - const validSiteValues = buildValidSiteValues(expectedPubUri); 164 - 165 - if (!validSiteValues.has(currentSite)) { 166 - // Document has incorrect site value - determine the correct one 167 - // Prefer the site.standard.publication format if the doc is site.standard.document 168 - let correctSite = expectedPubUri; 169 - 170 - if (doc.uri.includes("/site.standard.document/")) { 171 - // For site.standard.document, use site.standard.publication format 172 - try { 173 - const pubAturi = new AtUri(expectedPubUri); 174 - if (expectedPubUri.includes("/pub.leaflet.publication/")) { 175 - correctSite = `at://${pubAturi.hostname}/site.standard.publication/${pubAturi.rkey}`; 176 - } 177 - } catch (e) { 178 - // Use as-is 179 - } 180 - } 181 - 182 - documentsToFix.push({ 183 - uri: doc.uri, 184 - currentSite, 185 - correctSite, 186 - docData: data, 187 - }); 188 - } 189 - } 190 - 191 - stats.documentsWithIncorrectSite = documentsToFix.length; 192 - 193 - if (documentsToFix.length === 0) { 194 - return { 195 - success: true, 196 - message: "All documents have correct site values", 197 - stats, 198 - }; 199 - } 200 - 201 - // Step 5: Group documents by author DID for efficient OAuth session handling 202 - const docsByDid = new Map<string, typeof documentsToFix>(); 203 - for (const doc of documentsToFix) { 204 - try { 205 - const aturi = new AtUri(doc.uri); 206 - const authorDid = aturi.hostname; 207 - const existing = docsByDid.get(authorDid) || []; 208 - existing.push(doc); 209 - docsByDid.set(authorDid, existing); 210 - } catch (e) { 211 - stats.errors.push(`Invalid URI: ${doc.uri}`); 212 - } 213 - } 214 - 215 - // Step 6: Process each author's documents 216 - for (const [authorDid, docs] of docsByDid) { 217 - // Verify OAuth session for this author 218 - const oauthValid = await step.run( 219 - `verify-oauth-${authorDid.slice(-8)}`, 220 - async () => { 221 - const result = await restoreOAuthSession(authorDid); 222 - return result.ok; 223 - }, 224 - ); 225 - 226 - if (!oauthValid) { 227 - stats.errors.push(`No valid OAuth session for ${authorDid}`); 228 - continue; 229 - } 230 - 231 - // Fix each document for this author 232 - for (const docToFix of docs) { 233 - const result = await step.run( 234 - `fix-doc-${docToFix.uri.slice(-12)}`, 235 - async () => { 236 - try { 237 - const docAturi = new AtUri(docToFix.uri); 238 - 239 - // Build updated record 240 - const updatedRecord: SiteStandardDocument.Record = { 241 - ...docToFix.docData, 242 - site: docToFix.correctSite, 243 - }; 244 - 245 - // Update on PDS 246 - const agent = await createAuthenticatedAgent(authorDid); 247 - await agent.com.atproto.repo.putRecord({ 248 - repo: authorDid, 249 - collection: docAturi.collection, 250 - rkey: docAturi.rkey, 251 - record: updatedRecord, 252 - validate: false, 253 - }); 254 - 255 - // Update in database 256 - const { error: dbError } = await supabaseServerClient 257 - .from("documents") 258 - .update({ data: updatedRecord as Json }) 259 - .eq("uri", docToFix.uri); 260 - 261 - if (dbError) { 262 - return { 263 - success: false as const, 264 - error: `Database update failed: ${dbError.message}`, 265 - }; 266 - } 267 - 268 - return { 269 - success: true as const, 270 - oldSite: docToFix.currentSite, 271 - newSite: docToFix.correctSite, 272 - }; 273 - } catch (e) { 274 - return { 275 - success: false as const, 276 - error: e instanceof Error ? e.message : String(e), 277 - }; 278 - } 279 - }, 280 - ); 281 - 282 - if (result.success) { 283 - stats.documentsFixed++; 284 - } else { 285 - stats.errors.push(`${docToFix.uri}: ${result.error}`); 286 - } 287 - } 288 - } 289 - 290 - return { 291 - success: stats.errors.length === 0, 292 - stats, 293 - documentsToFix: documentsToFix.map((d) => ({ 294 - uri: d.uri, 295 - oldSite: d.currentSite, 296 - newSite: d.correctSite, 297 - })), 298 - }; 299 - }, 300 - );
-196
app/api/inngest/functions/fix_standard_document_postref.ts
··· 1 - import { supabaseServerClient } from "supabase/serverClient"; 2 - import { inngest } from "../client"; 3 - import { restoreOAuthSession } from "src/atproto-oauth"; 4 - import { 5 - AtpBaseClient, 6 - SiteStandardDocument, 7 - ComAtprotoRepoStrongRef, 8 - } from "lexicons/api"; 9 - import { AtUri } from "@atproto/syntax"; 10 - import { Json } from "supabase/database.types"; 11 - 12 - async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 13 - const result = await restoreOAuthSession(did); 14 - if (!result.ok) { 15 - throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 16 - } 17 - const credentialSession = result.value; 18 - return new AtpBaseClient( 19 - credentialSession.fetchHandler.bind(credentialSession), 20 - ); 21 - } 22 - 23 - /** 24 - * Fixes site.standard.document records that have the legacy `postRef` field set. 25 - * Migrates the value to `bskyPostRef` (the correct field for site.standard.document) 26 - * and removes the legacy `postRef` field. 27 - * 28 - * Can be triggered with specific document URIs, or will find all affected documents 29 - * if no URIs are provided. 30 - */ 31 - export const fix_standard_document_postref = inngest.createFunction( 32 - { id: "fix_standard_document_postref" }, 33 - { event: "documents/fix-postref" }, 34 - async ({ event, step }) => { 35 - const { documentUris: providedUris } = event.data as { 36 - documentUris?: string[]; 37 - }; 38 - 39 - const stats = { 40 - documentsFound: 0, 41 - documentsFixed: 0, 42 - documentsSkipped: 0, 43 - errors: [] as string[], 44 - }; 45 - 46 - // Step 1: Find documents to fix (either provided or query for them) 47 - const documentUris = await step.run("find-documents", async () => { 48 - if (providedUris && providedUris.length > 0) { 49 - return providedUris; 50 - } 51 - 52 - // Find all site.standard.document records with postRef set 53 - const { data: documents, error } = await supabaseServerClient 54 - .from("documents") 55 - .select("uri") 56 - .like("uri", "at://%/site.standard.document/%") 57 - .not("data->postRef", "is", null); 58 - 59 - if (error) { 60 - throw new Error(`Failed to query documents: ${error.message}`); 61 - } 62 - 63 - return (documents || []).map((d) => d.uri); 64 - }); 65 - 66 - stats.documentsFound = documentUris.length; 67 - 68 - if (documentUris.length === 0) { 69 - return { 70 - success: true, 71 - message: "No documents found with postRef field", 72 - stats, 73 - }; 74 - } 75 - 76 - // Step 2: Group documents by DID for efficient OAuth session handling 77 - const docsByDid = new Map<string, string[]>(); 78 - for (const uri of documentUris) { 79 - try { 80 - const aturi = new AtUri(uri); 81 - const did = aturi.hostname; 82 - const existing = docsByDid.get(did) || []; 83 - existing.push(uri); 84 - docsByDid.set(did, existing); 85 - } catch (e) { 86 - stats.errors.push(`Invalid URI: ${uri}`); 87 - } 88 - } 89 - 90 - // Step 3: Process each DID's documents 91 - for (const [did, uris] of docsByDid) { 92 - // Verify OAuth session for this user 93 - const oauthValid = await step.run( 94 - `verify-oauth-${did.slice(-8)}`, 95 - async () => { 96 - const result = await restoreOAuthSession(did); 97 - return result.ok; 98 - }, 99 - ); 100 - 101 - if (!oauthValid) { 102 - stats.errors.push(`No valid OAuth session for ${did}`); 103 - stats.documentsSkipped += uris.length; 104 - continue; 105 - } 106 - 107 - // Fix each document 108 - for (const docUri of uris) { 109 - const result = await step.run( 110 - `fix-doc-${docUri.slice(-12)}`, 111 - async () => { 112 - // Fetch the document 113 - const { data: doc, error: fetchError } = await supabaseServerClient 114 - .from("documents") 115 - .select("uri, data") 116 - .eq("uri", docUri) 117 - .single(); 118 - 119 - if (fetchError || !doc) { 120 - return { 121 - success: false as const, 122 - error: `Document not found: ${fetchError?.message || "no data"}`, 123 - }; 124 - } 125 - 126 - const data = doc.data as Record<string, unknown>; 127 - const postRef = data.postRef as 128 - | ComAtprotoRepoStrongRef.Main 129 - | undefined; 130 - 131 - if (!postRef) { 132 - return { 133 - success: false as const, 134 - skipped: true, 135 - error: "Document does not have postRef field", 136 - }; 137 - } 138 - 139 - // Build updated record: move postRef to bskyPostRef 140 - const { postRef: _, ...restData } = data; 141 - let updatedRecord: SiteStandardDocument.Record = { 142 - ...(restData as SiteStandardDocument.Record), 143 - }; 144 - 145 - updatedRecord.bskyPostRef = data.bskyPostRef 146 - ? (data.bskyPostRef as ComAtprotoRepoStrongRef.Main) 147 - : postRef; 148 - 149 - // Write to PDS 150 - const docAturi = new AtUri(docUri); 151 - const agent = await createAuthenticatedAgent(did); 152 - await agent.com.atproto.repo.putRecord({ 153 - repo: did, 154 - collection: "site.standard.document", 155 - rkey: docAturi.rkey, 156 - record: updatedRecord, 157 - validate: false, 158 - }); 159 - 160 - // Update database 161 - const { error: dbError } = await supabaseServerClient 162 - .from("documents") 163 - .update({ data: updatedRecord as Json }) 164 - .eq("uri", docUri); 165 - 166 - if (dbError) { 167 - return { 168 - success: false as const, 169 - error: `Database update failed: ${dbError.message}`, 170 - }; 171 - } 172 - 173 - return { 174 - success: true as const, 175 - postRef, 176 - bskyPostRef: updatedRecord.bskyPostRef, 177 - }; 178 - }, 179 - ); 180 - 181 - if (result.success) { 182 - stats.documentsFixed++; 183 - } else if ("skipped" in result && result.skipped) { 184 - stats.documentsSkipped++; 185 - } else { 186 - stats.errors.push(`${docUri}: ${result.error}`); 187 - } 188 - } 189 - } 190 - 191 - return { 192 - success: stats.errors.length === 0, 193 - stats, 194 - }; 195 - }, 196 - );
-213
app/api/inngest/functions/fix_standard_document_publications.ts
··· 1 - import { supabaseServerClient } from "supabase/serverClient"; 2 - import { inngest } from "../client"; 3 - import { restoreOAuthSession } from "src/atproto-oauth"; 4 - import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5 - import { AtUri } from "@atproto/syntax"; 6 - import { Json } from "supabase/database.types"; 7 - 8 - async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 - const result = await restoreOAuthSession(did); 10 - if (!result.ok) { 11 - throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 - } 13 - const credentialSession = result.value; 14 - return new AtpBaseClient( 15 - credentialSession.fetchHandler.bind(credentialSession), 16 - ); 17 - } 18 - 19 - /** 20 - * Fixes site.standard.document records that have stale pub.leaflet.publication 21 - * references in their site field. Updates both the PDS record and database. 22 - */ 23 - export const fix_standard_document_publications = inngest.createFunction( 24 - { id: "fix_standard_document_publications" }, 25 - { event: "documents/fix-publication-references" }, 26 - async ({ event, step }) => { 27 - const { documentUris } = event.data as { documentUris: string[] }; 28 - 29 - const stats = { 30 - documentsFixed: 0, 31 - joinEntriesFixed: 0, 32 - errors: [] as string[], 33 - }; 34 - 35 - if (!documentUris || documentUris.length === 0) { 36 - return { success: true, stats, message: "No documents to fix" }; 37 - } 38 - 39 - // Group documents by DID (author) for efficient OAuth session handling 40 - const docsByDid = new Map<string, string[]>(); 41 - for (const uri of documentUris) { 42 - try { 43 - const aturi = new AtUri(uri); 44 - const did = aturi.hostname; 45 - const existing = docsByDid.get(did) || []; 46 - existing.push(uri); 47 - docsByDid.set(did, existing); 48 - } catch (e) { 49 - stats.errors.push(`Invalid URI: ${uri}`); 50 - } 51 - } 52 - 53 - // Process each DID's documents 54 - for (const [did, uris] of docsByDid) { 55 - // Verify OAuth session for this user 56 - const oauthValid = await step.run( 57 - `verify-oauth-${did.slice(-8)}`, 58 - async () => { 59 - const result = await restoreOAuthSession(did); 60 - return result.ok; 61 - }, 62 - ); 63 - 64 - if (!oauthValid) { 65 - stats.errors.push(`No valid OAuth session for ${did}`); 66 - continue; 67 - } 68 - 69 - // Fix each document 70 - for (const docUri of uris) { 71 - const result = await step.run( 72 - `fix-doc-${docUri.slice(-12)}`, 73 - async () => { 74 - // Fetch the document 75 - const { data: doc, error: fetchError } = await supabaseServerClient 76 - .from("documents") 77 - .select("uri, data") 78 - .eq("uri", docUri) 79 - .single(); 80 - 81 - if (fetchError || !doc) { 82 - return { 83 - success: false as const, 84 - error: `Document not found: ${fetchError?.message || "no data"}`, 85 - }; 86 - } 87 - 88 - const data = doc.data as SiteStandardDocument.Record; 89 - const oldSite = data?.site; 90 - 91 - if (!oldSite || !oldSite.includes("/pub.leaflet.publication/")) { 92 - return { 93 - success: false as const, 94 - error: "Document does not have a pub.leaflet.publication site reference", 95 - }; 96 - } 97 - 98 - // Convert to new publication URI 99 - const oldPubAturi = new AtUri(oldSite); 100 - const newSite = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 101 - 102 - // Update the record 103 - const updatedRecord: SiteStandardDocument.Record = { 104 - ...data, 105 - site: newSite, 106 - }; 107 - 108 - // Write to PDS 109 - const docAturi = new AtUri(docUri); 110 - const agent = await createAuthenticatedAgent(did); 111 - await agent.com.atproto.repo.putRecord({ 112 - repo: did, 113 - collection: "site.standard.document", 114 - rkey: docAturi.rkey, 115 - record: updatedRecord, 116 - validate: false, 117 - }); 118 - 119 - // Update database 120 - const { error: dbError } = await supabaseServerClient 121 - .from("documents") 122 - .update({ data: updatedRecord as Json }) 123 - .eq("uri", docUri); 124 - 125 - if (dbError) { 126 - return { 127 - success: false as const, 128 - error: `Database update failed: ${dbError.message}`, 129 - }; 130 - } 131 - 132 - return { 133 - success: true as const, 134 - oldSite, 135 - newSite, 136 - }; 137 - }, 138 - ); 139 - 140 - if (result.success) { 141 - stats.documentsFixed++; 142 - 143 - // Fix the documents_in_publications entry 144 - const joinResult = await step.run( 145 - `fix-join-${docUri.slice(-12)}`, 146 - async () => { 147 - // Find the publication URI that exists in the database 148 - const { data: doc } = await supabaseServerClient 149 - .from("documents") 150 - .select("data") 151 - .eq("uri", docUri) 152 - .single(); 153 - 154 - const newSite = (doc?.data as any)?.site; 155 - if (!newSite) { 156 - return { success: false as const, error: "Could not read updated document" }; 157 - } 158 - 159 - // Check which publication URI exists 160 - const newPubAturi = new AtUri(newSite); 161 - const oldPubUri = `at://${newPubAturi.hostname}/pub.leaflet.publication/${newPubAturi.rkey}`; 162 - 163 - const { data: pubs } = await supabaseServerClient 164 - .from("publications") 165 - .select("uri") 166 - .in("uri", [newSite, oldPubUri]); 167 - 168 - const existingPubUri = pubs?.find((p) => p.uri === newSite)?.uri || 169 - pubs?.find((p) => p.uri === oldPubUri)?.uri; 170 - 171 - if (!existingPubUri) { 172 - return { success: false as const, error: "No matching publication found" }; 173 - } 174 - 175 - // Delete any existing entries for this document 176 - await supabaseServerClient 177 - .from("documents_in_publications") 178 - .delete() 179 - .eq("document", docUri); 180 - 181 - // Insert the correct entry 182 - const { error: insertError } = await supabaseServerClient 183 - .from("documents_in_publications") 184 - .insert({ 185 - document: docUri, 186 - publication: existingPubUri, 187 - }); 188 - 189 - if (insertError) { 190 - return { success: false as const, error: insertError.message }; 191 - } 192 - 193 - return { success: true as const, publication: existingPubUri }; 194 - }, 195 - ); 196 - 197 - if (joinResult.success) { 198 - stats.joinEntriesFixed++; 199 - } else { 200 - stats.errors.push(`Join table fix failed for ${docUri}: ${"error" in joinResult ? joinResult.error : "unknown error"}`); 201 - } 202 - } else { 203 - stats.errors.push(`${docUri}: ${result.error}`); 204 - } 205 - } 206 - } 207 - 208 - return { 209 - success: stats.errors.length === 0, 210 - stats, 211 - }; 212 - }, 213 - );
+52 -186
app/api/inngest/functions/migrate_user_to_standard.ts
··· 38 38 const stats = { 39 39 publicationsMigrated: 0, 40 40 documentsMigrated: 0, 41 - standardDocumentsFixed: 0, 42 41 userSubscriptionsMigrated: 0, 43 42 referencesUpdated: 0, 44 43 errors: [] as string[], 45 44 }; 46 45 47 46 // Step 1: Verify OAuth session is valid 48 - const oauthValid = await step.run("verify-oauth-session", async () => { 47 + await step.run("verify-oauth-session", async () => { 49 48 const result = await restoreOAuthSession(did); 50 49 if (!result.ok) { 51 - // Mark identity as needing migration so we can retry later 52 - await supabaseServerClient 53 - .from("identities") 54 - .update({ 55 - metadata: { needsStandardSiteMigration: true }, 56 - }) 57 - .eq("atp_did", did); 58 - 59 - return { success: false, error: result.error.message }; 50 + throw new Error( 51 + `Failed to restore OAuth session: ${result.error.message}`, 52 + ); 60 53 } 61 54 return { success: true }; 62 55 }); 63 - 64 - if (!oauthValid.success) { 65 - return { 66 - success: false, 67 - error: `Failed to restore OAuth session`, 68 - stats, 69 - publicationUriMap: {}, 70 - documentUriMap: {}, 71 - userSubscriptionUriMap: {}, 72 - }; 73 - } 74 56 75 57 // Step 2: Get user's pub.leaflet.publication records 76 58 const oldPublications = await step.run( ··· 127 109 }) 128 110 .filter((x) => x !== null); 129 111 130 - // Run PDS + DB writes together for each publication 131 - const pubResults = await Promise.all( 132 - publicationsToMigrate.map(({ pub, rkey, normalized, newRecord }) => 133 - step.run(`migrate-publication-${pub.uri}`, async () => { 134 - // PDS write 112 + // Run all PDS writes in parallel 113 + const pubPdsResults = await Promise.all( 114 + publicationsToMigrate.map(({ pub, rkey, newRecord }) => 115 + step.run(`pds-write-publication-${pub.uri}`, async () => { 135 116 const agent = await createAuthenticatedAgent(did); 136 117 const putResult = await agent.com.atproto.repo.putRecord({ 137 118 repo: did, ··· 140 121 record: newRecord, 141 122 validate: false, 142 123 }); 143 - const newUri = putResult.data.uri; 124 + return { oldUri: pub.uri, newUri: putResult.data.uri }; 125 + }), 126 + ), 127 + ); 144 128 145 - // DB write 129 + // Run all DB writes in parallel 130 + const pubDbResults = await Promise.all( 131 + publicationsToMigrate.map(({ pub, normalized, newRecord }, index) => { 132 + const newUri = pubPdsResults[index].newUri; 133 + return step.run(`db-write-publication-${pub.uri}`, async () => { 146 134 const { error: dbError } = await supabaseServerClient 147 135 .from("publications") 148 136 .upsert({ ··· 161 149 }; 162 150 } 163 151 return { success: true as const, oldUri: pub.uri, newUri }; 164 - }), 165 - ), 152 + }); 153 + }), 166 154 ); 167 155 168 156 // Process results 169 - for (const result of pubResults) { 157 + for (const result of pubDbResults) { 170 158 if (result.success) { 171 159 publicationUriMap[result.oldUri] = result.newUri; 172 160 stats.publicationsMigrated++; ··· 251 239 $type: "site.standard.document", 252 240 title: normalized.title || "Untitled", 253 241 site: siteValue, 254 - path: "/" + rkey, 242 + path: rkey, 255 243 publishedAt: normalized.publishedAt || new Date().toISOString(), 256 244 description: normalized.description, 257 245 content: normalized.content, ··· 264 252 }) 265 253 .filter((x) => x !== null); 266 254 267 - // Run PDS + DB writes together for each document 268 - const docResults = await Promise.all( 269 - documentsToMigrate.map(({ doc, rkey, newRecord, oldPubUri }) => 270 - step.run(`migrate-document-${doc.uri}`, async () => { 271 - // PDS write 255 + // Run all PDS writes in parallel 256 + const docPdsResults = await Promise.all( 257 + documentsToMigrate.map(({ doc, rkey, newRecord }) => 258 + step.run(`pds-write-document-${doc.uri}`, async () => { 272 259 const agent = await createAuthenticatedAgent(did); 273 260 const putResult = await agent.com.atproto.repo.putRecord({ 274 261 repo: did, ··· 277 264 record: newRecord, 278 265 validate: false, 279 266 }); 280 - const newUri = putResult.data.uri; 267 + return { oldUri: doc.uri, newUri: putResult.data.uri }; 268 + }), 269 + ), 270 + ); 281 271 282 - // DB write 272 + // Run all DB writes in parallel 273 + const docDbResults = await Promise.all( 274 + documentsToMigrate.map(({ doc, newRecord, oldPubUri }, index) => { 275 + const newUri = docPdsResults[index].newUri; 276 + return step.run(`db-write-document-${doc.uri}`, async () => { 283 277 const { error: dbError } = await supabaseServerClient 284 278 .from("documents") 285 279 .upsert({ ··· 308 302 } 309 303 310 304 return { success: true as const, oldUri: doc.uri, newUri }; 311 - }), 312 - ), 305 + }); 306 + }), 313 307 ); 314 308 315 309 // Process results 316 - for (const result of docResults) { 310 + for (const result of docDbResults) { 317 311 if (result.success) { 318 312 documentUriMap[result.oldUri] = result.newUri; 319 313 stats.documentsMigrated++; ··· 324 318 } 325 319 } 326 320 327 - // Step 4b: Fix existing site.standard.document records that reference pub.leaflet.publication 328 - // This handles the case where site.standard.document records were created pointing to 329 - // pub.leaflet.publication URIs before the publication was migrated to site.standard.publication 330 - const existingStandardDocs = await step.run( 331 - "fetch-existing-standard-documents", 332 - async () => { 333 - const { data, error } = await supabaseServerClient 334 - .from("documents") 335 - .select("uri, data") 336 - .like("uri", `at://${did}/site.standard.document/%`); 337 - 338 - if (error) 339 - throw new Error( 340 - `Failed to fetch existing standard documents: ${error.message}`, 341 - ); 342 - return data || []; 343 - }, 344 - ); 345 - 346 - // Find documents that reference pub.leaflet.publication and need their site field updated 347 - const standardDocsToFix = existingStandardDocs 348 - .map((doc) => { 349 - const data = doc.data as SiteStandardDocument.Record; 350 - const site = data?.site; 351 - 352 - // Check if site field references a pub.leaflet.publication 353 - if (!site || !site.includes("/pub.leaflet.publication/")) { 354 - return null; 355 - } 356 - 357 - try { 358 - const oldPubAturi = new AtUri(site); 359 - const newPubUri = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 360 - 361 - // Only fix if we have the new publication in our map (meaning it was migrated) 362 - // or if the new publication exists (check against all migrated publications) 363 - if ( 364 - publicationUriMap[site] || 365 - Object.values(publicationUriMap).includes(newPubUri) 366 - ) { 367 - const docAturi = new AtUri(doc.uri); 368 - const updatedRecord: SiteStandardDocument.Record = { 369 - ...data, 370 - site: newPubUri, 371 - }; 372 - 373 - return { 374 - doc, 375 - rkey: docAturi.rkey, 376 - oldSite: site, 377 - newSite: newPubUri, 378 - updatedRecord, 379 - }; 380 - } 381 - } catch (e) { 382 - stats.errors.push(`Invalid site URI in document ${doc.uri}: ${site}`); 383 - } 384 - 385 - return null; 386 - }) 387 - .filter((x) => x !== null); 388 - 389 - // Update these documents on PDS and in database 390 - if (standardDocsToFix.length > 0) { 391 - const fixResults = await Promise.all( 392 - standardDocsToFix.map(({ doc, rkey, oldSite, newSite, updatedRecord }) => 393 - step.run(`fix-standard-document-${doc.uri}`, async () => { 394 - // PDS write to update the site field 395 - const agent = await createAuthenticatedAgent(did); 396 - await agent.com.atproto.repo.putRecord({ 397 - repo: did, 398 - collection: "site.standard.document", 399 - rkey, 400 - record: updatedRecord, 401 - validate: false, 402 - }); 403 - 404 - // DB write 405 - const { error: dbError } = await supabaseServerClient 406 - .from("documents") 407 - .update({ data: updatedRecord as Json }) 408 - .eq("uri", doc.uri); 409 - 410 - if (dbError) { 411 - return { 412 - success: false as const, 413 - uri: doc.uri, 414 - error: dbError.message, 415 - }; 416 - } 417 - 418 - // Update documents_in_publications to point to new publication URI 419 - await supabaseServerClient 420 - .from("documents_in_publications") 421 - .upsert({ 422 - publication: newSite, 423 - document: doc.uri, 424 - }); 425 - 426 - // Remove old publication reference if different 427 - if (oldSite !== newSite) { 428 - await supabaseServerClient 429 - .from("documents_in_publications") 430 - .delete() 431 - .eq("publication", oldSite) 432 - .eq("document", doc.uri); 433 - } 434 - 435 - return { success: true as const, uri: doc.uri }; 436 - }), 437 - ), 438 - ); 439 - 440 - for (const result of fixResults) { 441 - if (result.success) { 442 - stats.standardDocumentsFixed++; 443 - } else { 444 - stats.errors.push( 445 - `Fix standard document ${result.uri}: Database error: ${result.error}`, 446 - ); 447 - } 448 - } 449 - } 450 - 451 321 // Step 5: Update references in database tables (all in parallel) 452 322 await step.run("update-references", async () => { 453 323 const pubEntries = Object.entries(publicationUriMap); ··· 558 428 }) 559 429 .filter((x) => x !== null); 560 430 561 - // Run PDS + DB writes together for each subscription 562 - const subResults = await Promise.all( 431 + // Run all PDS writes in parallel 432 + const subPdsResults = await Promise.all( 563 433 subscriptionsToMigrate.map(({ sub, rkey, newRecord }) => 564 - step.run(`migrate-subscription-${sub.uri}`, async () => { 565 - // PDS write 434 + step.run(`pds-write-subscription-${sub.uri}`, async () => { 566 435 const agent = await createAuthenticatedAgent(did); 567 436 const putResult = await agent.com.atproto.repo.putRecord({ 568 437 repo: did, ··· 571 440 record: newRecord, 572 441 validate: false, 573 442 }); 574 - const newUri = putResult.data.uri; 443 + return { oldUri: sub.uri, newUri: putResult.data.uri }; 444 + }), 445 + ), 446 + ); 575 447 576 - // DB write 448 + // Run all DB writes in parallel 449 + const subDbResults = await Promise.all( 450 + subscriptionsToMigrate.map(({ sub, newRecord }, index) => { 451 + const newUri = subPdsResults[index].newUri; 452 + return step.run(`db-write-subscription-${sub.uri}`, async () => { 577 453 const { error: dbError } = await supabaseServerClient 578 454 .from("publication_subscriptions") 579 455 .update({ ··· 591 467 }; 592 468 } 593 469 return { success: true as const, oldUri: sub.uri, newUri }; 594 - }), 595 - ), 470 + }); 471 + }), 596 472 ); 597 473 598 474 // Process results 599 - for (const result of subResults) { 475 + for (const result of subDbResults) { 600 476 if (result.success) { 601 477 userSubscriptionUriMap[result.oldUri] = result.newUri; 602 478 stats.userSubscriptionsMigrated++; ··· 613 489 // 2. External references (e.g., from other AT Proto apps) to old URIs continue to work 614 490 // 3. The normalization layer handles both schemas transparently for reads 615 491 // Old records are also kept on the user's PDS so existing AT-URI references remain valid. 616 - 617 - // Clear the migration flag on success 618 - if (stats.errors.length === 0) { 619 - await step.run("clear-migration-flag", async () => { 620 - await supabaseServerClient 621 - .from("identities") 622 - .update({ metadata: null }) 623 - .eq("atp_did", did); 624 - }); 625 - } 626 492 627 493 return { 628 494 success: stats.errors.length === 0,
-12
app/api/inngest/route.tsx
··· 5 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 6 import { index_follows } from "./functions/index_follows"; 7 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 - import { fix_standard_document_publications } from "./functions/fix_standard_document_publications"; 9 - import { fix_incorrect_site_values } from "./functions/fix_incorrect_site_values"; 10 - import { fix_standard_document_postref } from "./functions/fix_standard_document_postref"; 11 - import { 12 - cleanup_expired_oauth_sessions, 13 - check_oauth_session, 14 - } from "./functions/cleanup_expired_oauth_sessions"; 15 8 16 9 export const { GET, POST, PUT } = serve({ 17 10 client: inngest, ··· 21 14 batched_update_profiles, 22 15 index_follows, 23 16 migrate_user_to_standard, 24 - fix_standard_document_publications, 25 - fix_incorrect_site_values, 26 - fix_standard_document_postref, 27 - cleanup_expired_oauth_sessions, 28 - check_oauth_session, 29 17 ], 30 18 });
-11
app/api/oauth/[route]/route.ts
··· 11 11 ActionAfterSignIn, 12 12 parseActionFromSearchParam, 13 13 } from "./afterSignInActions"; 14 - import { inngest } from "app/api/inngest/client"; 15 14 16 15 type OauthRequestClientState = { 17 16 redirect: string | null; ··· 85 84 .single(); 86 85 identity = data; 87 86 } 88 - 89 - // Trigger migration if identity needs it 90 - const metadata = identity?.metadata as Record<string, unknown> | null; 91 - if (metadata?.needsStandardSiteMigration) { 92 - await inngest.send({ 93 - name: "user/migrate-to-standard", 94 - data: { did: session.did }, 95 - }); 96 - } 97 - 98 87 let { data: token } = await supabaseServerClient 99 88 .from("email_auth_tokens") 100 89 .insert({
-1
app/api/rpc/[command]/get_publication_data.ts
··· 83 83 uri: dip.documents.uri, 84 84 record: normalized, 85 85 indexed_at: dip.documents.indexed_at, 86 - sort_date: dip.documents.sort_date, 87 86 data: dip.documents.data, 88 87 commentsCount: dip.documents.comments_on_documents[0]?.count || 0, 89 88 mentionsCount: dip.documents.document_mentions_in_bsky[0]?.count || 0,
-34
app/lish/[did]/[publication]/.well-known/site.standard.publication/route.ts
··· 1 - import { publicationNameOrUriFilter } from "src/utils/uriHelpers"; 2 - import { supabaseServerClient } from "supabase/serverClient"; 3 - 4 - export async function GET( 5 - req: Request, 6 - props: { 7 - params: Promise<{ publication: string; did: string }>; 8 - }, 9 - ) { 10 - let params = await props.params; 11 - let did = decodeURIComponent(params.did); 12 - let publication_name = decodeURIComponent(params.publication); 13 - let [{ data: publications }] = await Promise.all([ 14 - supabaseServerClient 15 - .from("publications") 16 - .select( 17 - `*, 18 - publication_subscriptions(*), 19 - documents_in_publications(documents( 20 - *, 21 - comments_on_documents(count), 22 - document_mentions_in_bsky(count) 23 - )) 24 - `, 25 - ) 26 - .eq("identity_did", did) 27 - .or(publicationNameOrUriFilter(did, publication_name)) 28 - .order("uri", { ascending: false }) 29 - .limit(1), 30 - ]); 31 - let publication = publications?.[0]; 32 - if (!did || !publication) return new Response(null, { status: 404 }); 33 - return new Response(publication.uri); 34 - }
+4 -7
app/lish/[did]/[publication]/[rkey]/page.tsx
··· 35 35 sizes: "32x32", 36 36 type: "image/png", 37 37 }, 38 - other: [ 39 - { 40 - rel: "alternate", 41 - url: document.uri, 42 - }, 43 - { rel: "site.standard.document", url: document.uri }, 44 - ], 38 + other: { 39 + rel: "alternate", 40 + url: document.uri, 41 + }, 45 42 }, 46 43 title: 47 44 docRecord.title +
-1
app/lish/[did]/[publication]/dashboard/PublishedPostsLists.tsx
··· 111 111 documents: { 112 112 uri: doc.uri, 113 113 indexed_at: doc.indexed_at, 114 - sort_date: doc.sort_date, 115 114 data: doc.data, 116 115 }, 117 116 },
+3 -7
app/lish/createPub/getPublicationURL.ts
··· 25 25 } 26 26 27 27 // Fall back to checking raw record for legacy base_path 28 - if ( 29 - isLeafletPublication(pub.record) && 30 - pub.record.base_path && 31 - isProductionDomain() 32 - ) { 28 + if (isLeafletPublication(pub.record) && pub.record.base_path && isProductionDomain()) { 33 29 return `https://${pub.record.base_path}`; 34 30 } 35 31 ··· 40 36 const normalized = normalizePublicationRecord(pub.record); 41 37 const aturi = new AtUri(pub.uri); 42 38 43 - //use rkey, fallback to name 44 - const name = aturi.rkey || normalized?.name; 39 + // Use normalized name if available, fall back to rkey 40 + const name = normalized?.name || aturi.rkey; 45 41 return `/lish/${aturi.host}/${encodeURIComponent(name || "")}`; 46 42 }
-1
app/lish/createPub/updatePublication.ts
··· 278 278 return withPublicationUpdate(uri, async ({ normalizedPub, existingBasePath, publicationType, agent }) => { 279 279 // Build theme object 280 280 const themeData = { 281 - $type: "pub.leaflet.publication#theme" as const, 282 281 backgroundImage: theme.backgroundImage 283 282 ? { 284 283 $type: "pub.leaflet.theme.backgroundImage",
+4 -7
app/p/[didOrHandle]/[rkey]/page.tsx
··· 38 38 39 39 return { 40 40 icons: { 41 - other: [ 42 - { 43 - rel: "alternate", 44 - url: document.uri, 45 - }, 46 - { rel: "site.standard.document", url: document.uri }, 47 - ], 41 + other: { 42 + rel: "alternate", 43 + url: document.uri, 44 + }, 48 45 }, 49 46 title: docRecord.title, 50 47 description: docRecord?.description || "",
+1 -2
components/Blocks/PublicationPollBlock.tsx
··· 27 27 setAreYouSure?: (value: boolean) => void; 28 28 }, 29 29 ) => { 30 - let { data: publicationData, normalizedDocument } = 31 - useLeafletPublicationData(); 30 + let { data: publicationData } = useLeafletPublicationData(); 32 31 let isSelected = useUIState((s) => 33 32 s.selectedBlocks.find((b) => b.value === props.entityID), 34 33 );
-1
drizzle/schema.ts
··· 140 140 email: text("email"), 141 141 atp_did: text("atp_did"), 142 142 interface_state: jsonb("interface_state"), 143 - metadata: jsonb("metadata"), 144 143 }, 145 144 (table) => { 146 145 return {
+3 -3
feeds/index.ts
··· 116 116 } 117 117 query = query 118 118 .or("data->postRef.not.is.null,data->bskyPostRef.not.is.null") 119 - .order("sort_date", { ascending: false }) 119 + .order("indexed_at", { ascending: false }) 120 120 .order("uri", { ascending: false }) 121 121 .limit(25); 122 122 if (parsedCursor) 123 123 query = query.or( 124 - `sort_date.lt.${parsedCursor.date},and(sort_date.eq.${parsedCursor.date},uri.lt.${parsedCursor.uri})`, 124 + `indexed_at.lt.${parsedCursor.date},and(indexed_at.eq.${parsedCursor.date},uri.lt.${parsedCursor.uri})`, 125 125 ); 126 126 127 127 let { data, error } = await query; ··· 131 131 posts = posts || []; 132 132 133 133 let lastPost = posts[posts.length - 1]; 134 - let newCursor = lastPost ? `${lastPost.sort_date}::${lastPost.uri}` : null; 134 + let newCursor = lastPost ? `${lastPost.indexed_at}::${lastPost.uri}` : null; 135 135 return c.json({ 136 136 cursor: newCursor || cursor, 137 137 feed: posts.flatMap((p) => {
+2 -2
lexicons/api/lexicons.ts
··· 2215 2215 type: 'ref', 2216 2216 }, 2217 2217 theme: { 2218 - type: 'union', 2219 - refs: ['lex:pub.leaflet.publication#theme'], 2218 + type: 'ref', 2219 + ref: 'lex:pub.leaflet.publication#theme', 2220 2220 }, 2221 2221 description: { 2222 2222 maxGraphemes: 300,
+1 -1
lexicons/api/types/site/standard/publication.ts
··· 15 15 export interface Record { 16 16 $type: 'site.standard.publication' 17 17 basicTheme?: SiteStandardThemeBasic.Main 18 - theme?: $Typed<PubLeafletPublication.Theme> | { $type: string } 18 + theme?: PubLeafletPublication.Theme 19 19 description?: string 20 20 icon?: BlobRef 21 21 name: string
+2 -2
lexicons/site/standard/publication.json
··· 9 9 "type": "ref" 10 10 }, 11 11 "theme": { 12 - "type": "union", 13 - "refs": ["pub.leaflet.publication#theme"] 12 + "type": "ref", 13 + "ref": "pub.leaflet.publication#theme" 14 14 }, 15 15 "description": { 16 16 "maxGraphemes": 300,
+5 -40
lexicons/src/normalize.ts
··· 14 14 */ 15 15 16 16 import type * as PubLeafletDocument from "../api/types/pub/leaflet/document"; 17 - import * as PubLeafletPublication from "../api/types/pub/leaflet/publication"; 17 + import type * as PubLeafletPublication from "../api/types/pub/leaflet/publication"; 18 18 import type * as PubLeafletContent from "../api/types/pub/leaflet/content"; 19 19 import type * as SiteStandardDocument from "../api/types/site/standard/document"; 20 20 import type * as SiteStandardPublication from "../api/types/site/standard/publication"; ··· 31 31 }; 32 32 33 33 // Normalized publication type - uses the generated site.standard.publication type 34 - // with the theme narrowed to only the valid pub.leaflet.publication#theme type 35 - // (isTheme validates that $type is present, so we use $Typed) 36 - // Note: We explicitly list fields rather than using Omit because the generated Record type 37 - // has an index signature [k: string]: unknown that interferes with property typing 38 - export type NormalizedPublication = { 39 - $type: "site.standard.publication"; 40 - name: string; 41 - url: string; 42 - description?: string; 43 - icon?: SiteStandardPublication.Record["icon"]; 44 - basicTheme?: SiteStandardThemeBasic.Main; 45 - theme?: $Typed<PubLeafletPublication.Theme>; 46 - preferences?: SiteStandardPublication.Preferences; 47 - }; 34 + export type NormalizedPublication = SiteStandardPublication.Record; 48 35 49 36 /** 50 37 * Checks if the record is a pub.leaflet.document ··· 223 210 ): NormalizedPublication | null { 224 211 if (!record || typeof record !== "object") return null; 225 212 226 - // Pass through site.standard records directly, but validate the theme 213 + // Pass through site.standard records directly 227 214 if (isStandardPublication(record)) { 228 - // Validate theme - only keep if it's a valid pub.leaflet.publication#theme 229 - const theme = PubLeafletPublication.isTheme(record.theme) 230 - ? (record.theme as $Typed<PubLeafletPublication.Theme>) 231 - : undefined; 232 - return { 233 - ...record, 234 - theme, 235 - }; 215 + return record; 236 216 } 237 217 238 218 if (isLeafletPublication(record)) { ··· 245 225 246 226 const basicTheme = leafletThemeToBasicTheme(record.theme); 247 227 248 - // Validate theme - only keep if it's a valid pub.leaflet.publication#theme with $type set 249 - // For legacy records without $type, add it during normalization 250 - let theme: $Typed<PubLeafletPublication.Theme> | undefined; 251 - if (record.theme) { 252 - if (PubLeafletPublication.isTheme(record.theme)) { 253 - theme = record.theme as $Typed<PubLeafletPublication.Theme>; 254 - } else { 255 - // Legacy theme without $type - add it 256 - theme = { 257 - ...record.theme, 258 - $type: "pub.leaflet.publication#theme", 259 - }; 260 - } 261 - } 262 - 263 228 // Convert preferences to site.standard format (strip/replace $type) 264 229 const preferences: SiteStandardPublication.Preferences | undefined = 265 230 record.preferences ··· 278 243 description: record.description, 279 244 icon: record.icon, 280 245 basicTheme, 281 - theme, 246 + theme: record.theme, 282 247 preferences, 283 248 }; 284 249 }
-4
supabase/database.types.ts
··· 337 337 Row: { 338 338 data: Json 339 339 indexed_at: string 340 - sort_date: string 341 340 uri: string 342 341 } 343 342 Insert: { ··· 552 551 home_page: string 553 552 id: string 554 553 interface_state: Json | null 555 - metadata: Json | null 556 554 } 557 555 Insert: { 558 556 atp_did?: string | null ··· 561 559 home_page?: string 562 560 id?: string 563 561 interface_state?: Json | null 564 - metadata?: Json | null 565 562 } 566 563 Update: { 567 564 atp_did?: string | null ··· 570 567 home_page?: string 571 568 id?: string 572 569 interface_state?: Json | null 573 - metadata?: Json | null 574 570 } 575 571 Relationships: [ 576 572 {
-1
supabase/migrations/20260123000000_add_metadata_to_identities.sql
··· 1 - alter table "public"."identities" add column "metadata" jsonb;
-28
supabase/migrations/20260125000000_add_sort_date_column.sql
··· 1 - -- Add sort_date computed column to documents table 2 - -- This column stores the older of publishedAt (from JSON data) or indexed_at 3 - -- Used for sorting feeds chronologically by when content was actually published 4 - 5 - -- Create an immutable function to parse ISO 8601 timestamps from text 6 - -- This is needed because direct ::timestamp cast is not immutable (accepts 'now', 'today', etc.) 7 - -- The regex validates the format before casting to ensure immutability 8 - CREATE OR REPLACE FUNCTION parse_iso_timestamp(text) RETURNS timestamptz 9 - LANGUAGE sql IMMUTABLE STRICT AS $$ 10 - SELECT CASE 11 - -- Match ISO 8601 format: YYYY-MM-DDTHH:MM:SS with optional fractional seconds and Z/timezone 12 - WHEN $1 ~ '^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:?\d{2})?$' THEN 13 - $1::timestamptz 14 - ELSE 15 - NULL 16 - END 17 - $$; 18 - 19 - ALTER TABLE documents 20 - ADD COLUMN sort_date timestamptz GENERATED ALWAYS AS ( 21 - LEAST( 22 - COALESCE(parse_iso_timestamp(data->>'publishedAt'), indexed_at), 23 - indexed_at 24 - ) 25 - ) STORED; 26 - 27 - -- Create index on sort_date for efficient ordering 28 - CREATE INDEX documents_sort_date_idx ON documents (sort_date DESC, uri DESC);