a tool for shared writing and social publishing

Merge branch 'main' of https://github.com/hyperlink-academy/minilink into update/thread-viewer

+661 -73
+6 -3
actions/publishToPublication.ts
··· 199 } 200 201 // Determine the collection to use - preserve existing schema if updating 202 - const existingCollection = existingDocUri ? new AtUri(existingDocUri).collection : undefined; 203 const documentType = getDocumentType(existingCollection); 204 205 // Build the pages array (used by both formats) ··· 228 if (documentType === "site.standard.document") { 229 // site.standard.document format 230 // For standalone docs, use HTTPS URL; for publication docs, use the publication AT-URI 231 - const siteUri = publication_uri || `https://leaflet.pub/p/${credentialSession.did}`; 232 233 record = { 234 $type: "site.standard.document", 235 title: title || "Untitled", 236 site: siteUri, 237 - path: rkey, 238 publishedAt: 239 publishedAt || existingRecord.publishedAt || new Date().toISOString(), 240 ...(description && { description }),
··· 199 } 200 201 // Determine the collection to use - preserve existing schema if updating 202 + const existingCollection = existingDocUri 203 + ? new AtUri(existingDocUri).collection 204 + : undefined; 205 const documentType = getDocumentType(existingCollection); 206 207 // Build the pages array (used by both formats) ··· 230 if (documentType === "site.standard.document") { 231 // site.standard.document format 232 // For standalone docs, use HTTPS URL; for publication docs, use the publication AT-URI 233 + const siteUri = 234 + publication_uri || `https://leaflet.pub/p/${credentialSession.did}`; 235 236 record = { 237 $type: "site.standard.document", 238 title: title || "Untitled", 239 site: siteUri, 240 + path: "/" + rkey, 241 publishedAt: 242 publishedAt || existingRecord.publishedAt || new Date().toISOString(), 243 ...(description && { description }),
+15
app/api/inngest/client.ts
··· 26 did: string; 27 }; 28 }; 29 }; 30 31 // Create a client to send and receive events
··· 26 did: string; 27 }; 28 }; 29 + "user/cleanup-expired-oauth-sessions": { 30 + data: {}; 31 + }; 32 + "user/check-oauth-session": { 33 + data: { 34 + identityId: string; 35 + did: string; 36 + tokenCount: number; 37 + }; 38 + }; 39 + "documents/fix-publication-references": { 40 + data: { 41 + documentUris: string[]; 42 + }; 43 + }; 44 }; 45 46 // Create a client to send and receive events
+123
app/api/inngest/functions/cleanup_expired_oauth_sessions.ts
···
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + 5 + // Main function that fetches identities and publishes events for each one 6 + export const cleanup_expired_oauth_sessions = inngest.createFunction( 7 + { id: "cleanup_expired_oauth_sessions" }, 8 + { event: "user/cleanup-expired-oauth-sessions" }, 9 + async ({ step }) => { 10 + // Get all identities with an atp_did (OAuth users) that have at least one auth token 11 + const identities = await step.run("fetch-oauth-identities", async () => { 12 + const { data, error } = await supabaseServerClient 13 + .from("identities") 14 + .select("id, atp_did, email_auth_tokens(count)") 15 + .not("atp_did", "is", null); 16 + 17 + if (error) { 18 + throw new Error(`Failed to fetch identities: ${error.message}`); 19 + } 20 + 21 + // Filter to only include identities with at least one auth token 22 + return (data || []) 23 + .filter((identity) => { 24 + const tokenCount = identity.email_auth_tokens?.[0]?.count ?? 0; 25 + return tokenCount > 0; 26 + }) 27 + .map((identity) => ({ 28 + id: identity.id, 29 + atp_did: identity.atp_did!, 30 + tokenCount: identity.email_auth_tokens?.[0]?.count ?? 0, 31 + })); 32 + }); 33 + 34 + console.log( 35 + `Found ${identities.length} OAuth identities with active sessions to check`, 36 + ); 37 + 38 + // Publish events for each identity in batches 39 + const BATCH_SIZE = 100; 40 + let totalSent = 0; 41 + 42 + for (let i = 0; i < identities.length; i += BATCH_SIZE) { 43 + const batch = identities.slice(i, i + BATCH_SIZE); 44 + 45 + await step.run(`send-events-batch-${i}`, async () => { 46 + const events = batch.map((identity) => ({ 47 + name: "user/check-oauth-session" as const, 48 + data: { 49 + identityId: identity.id, 50 + did: identity.atp_did, 51 + tokenCount: identity.tokenCount, 52 + }, 53 + })); 54 + 55 + await inngest.send(events); 56 + return events.length; 57 + }); 58 + 59 + totalSent += batch.length; 60 + } 61 + 62 + console.log(`Published ${totalSent} check-oauth-session events`); 63 + 64 + return { 65 + success: true, 66 + identitiesQueued: totalSent, 67 + }; 68 + }, 69 + ); 70 + 71 + // Function that checks a single identity's OAuth session and cleans up if expired 72 + export const check_oauth_session = inngest.createFunction( 73 + { id: "check_oauth_session" }, 74 + { event: "user/check-oauth-session" }, 75 + async ({ event, step }) => { 76 + const { identityId, did, tokenCount } = event.data; 77 + 78 + const result = await step.run("check-and-cleanup", async () => { 79 + console.log(`Checking OAuth session for DID: ${did} (${tokenCount} tokens)`); 80 + 81 + const sessionResult = await restoreOAuthSession(did); 82 + 83 + if (sessionResult.ok) { 84 + console.log(` Session valid for ${did}`); 85 + return { valid: true, tokensDeleted: 0 }; 86 + } 87 + 88 + // Session is expired/invalid - delete associated auth tokens 89 + console.log( 90 + ` Session expired for ${did}: ${sessionResult.error.message}`, 91 + ); 92 + 93 + const { error: deleteError } = await supabaseServerClient 94 + .from("email_auth_tokens") 95 + .delete() 96 + .eq("identity", identityId); 97 + 98 + if (deleteError) { 99 + console.error( 100 + ` Error deleting tokens for identity ${identityId}: ${deleteError.message}`, 101 + ); 102 + return { 103 + valid: false, 104 + tokensDeleted: 0, 105 + error: deleteError.message, 106 + }; 107 + } 108 + 109 + console.log(` Deleted ${tokenCount} auth tokens for identity ${identityId}`); 110 + 111 + return { 112 + valid: false, 113 + tokensDeleted: tokenCount, 114 + }; 115 + }); 116 + 117 + return { 118 + identityId, 119 + did, 120 + ...result, 121 + }; 122 + }, 123 + );
+213
app/api/inngest/functions/fix_standard_document_publications.ts
···
··· 1 + import { supabaseServerClient } from "supabase/serverClient"; 2 + import { inngest } from "../client"; 3 + import { restoreOAuthSession } from "src/atproto-oauth"; 4 + import { AtpBaseClient, SiteStandardDocument } from "lexicons/api"; 5 + import { AtUri } from "@atproto/syntax"; 6 + import { Json } from "supabase/database.types"; 7 + 8 + async function createAuthenticatedAgent(did: string): Promise<AtpBaseClient> { 9 + const result = await restoreOAuthSession(did); 10 + if (!result.ok) { 11 + throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 12 + } 13 + const credentialSession = result.value; 14 + return new AtpBaseClient( 15 + credentialSession.fetchHandler.bind(credentialSession), 16 + ); 17 + } 18 + 19 + /** 20 + * Fixes site.standard.document records that have stale pub.leaflet.publication 21 + * references in their site field. Updates both the PDS record and database. 22 + */ 23 + export const fix_standard_document_publications = inngest.createFunction( 24 + { id: "fix_standard_document_publications" }, 25 + { event: "documents/fix-publication-references" }, 26 + async ({ event, step }) => { 27 + const { documentUris } = event.data as { documentUris: string[] }; 28 + 29 + const stats = { 30 + documentsFixed: 0, 31 + joinEntriesFixed: 0, 32 + errors: [] as string[], 33 + }; 34 + 35 + if (!documentUris || documentUris.length === 0) { 36 + return { success: true, stats, message: "No documents to fix" }; 37 + } 38 + 39 + // Group documents by DID (author) for efficient OAuth session handling 40 + const docsByDid = new Map<string, string[]>(); 41 + for (const uri of documentUris) { 42 + try { 43 + const aturi = new AtUri(uri); 44 + const did = aturi.hostname; 45 + const existing = docsByDid.get(did) || []; 46 + existing.push(uri); 47 + docsByDid.set(did, existing); 48 + } catch (e) { 49 + stats.errors.push(`Invalid URI: ${uri}`); 50 + } 51 + } 52 + 53 + // Process each DID's documents 54 + for (const [did, uris] of docsByDid) { 55 + // Verify OAuth session for this user 56 + const oauthValid = await step.run( 57 + `verify-oauth-${did.slice(-8)}`, 58 + async () => { 59 + const result = await restoreOAuthSession(did); 60 + return result.ok; 61 + }, 62 + ); 63 + 64 + if (!oauthValid) { 65 + stats.errors.push(`No valid OAuth session for ${did}`); 66 + continue; 67 + } 68 + 69 + // Fix each document 70 + for (const docUri of uris) { 71 + const result = await step.run( 72 + `fix-doc-${docUri.slice(-12)}`, 73 + async () => { 74 + // Fetch the document 75 + const { data: doc, error: fetchError } = await supabaseServerClient 76 + .from("documents") 77 + .select("uri, data") 78 + .eq("uri", docUri) 79 + .single(); 80 + 81 + if (fetchError || !doc) { 82 + return { 83 + success: false as const, 84 + error: `Document not found: ${fetchError?.message || "no data"}`, 85 + }; 86 + } 87 + 88 + const data = doc.data as SiteStandardDocument.Record; 89 + const oldSite = data?.site; 90 + 91 + if (!oldSite || !oldSite.includes("/pub.leaflet.publication/")) { 92 + return { 93 + success: false as const, 94 + error: "Document does not have a pub.leaflet.publication site reference", 95 + }; 96 + } 97 + 98 + // Convert to new publication URI 99 + const oldPubAturi = new AtUri(oldSite); 100 + const newSite = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 101 + 102 + // Update the record 103 + const updatedRecord: SiteStandardDocument.Record = { 104 + ...data, 105 + site: newSite, 106 + }; 107 + 108 + // Write to PDS 109 + const docAturi = new AtUri(docUri); 110 + const agent = await createAuthenticatedAgent(did); 111 + await agent.com.atproto.repo.putRecord({ 112 + repo: did, 113 + collection: "site.standard.document", 114 + rkey: docAturi.rkey, 115 + record: updatedRecord, 116 + validate: false, 117 + }); 118 + 119 + // Update database 120 + const { error: dbError } = await supabaseServerClient 121 + .from("documents") 122 + .update({ data: updatedRecord as Json }) 123 + .eq("uri", docUri); 124 + 125 + if (dbError) { 126 + return { 127 + success: false as const, 128 + error: `Database update failed: ${dbError.message}`, 129 + }; 130 + } 131 + 132 + return { 133 + success: true as const, 134 + oldSite, 135 + newSite, 136 + }; 137 + }, 138 + ); 139 + 140 + if (result.success) { 141 + stats.documentsFixed++; 142 + 143 + // Fix the documents_in_publications entry 144 + const joinResult = await step.run( 145 + `fix-join-${docUri.slice(-12)}`, 146 + async () => { 147 + // Find the publication URI that exists in the database 148 + const { data: doc } = await supabaseServerClient 149 + .from("documents") 150 + .select("data") 151 + .eq("uri", docUri) 152 + .single(); 153 + 154 + const newSite = (doc?.data as any)?.site; 155 + if (!newSite) { 156 + return { success: false as const, error: "Could not read updated document" }; 157 + } 158 + 159 + // Check which publication URI exists 160 + const newPubAturi = new AtUri(newSite); 161 + const oldPubUri = `at://${newPubAturi.hostname}/pub.leaflet.publication/${newPubAturi.rkey}`; 162 + 163 + const { data: pubs } = await supabaseServerClient 164 + .from("publications") 165 + .select("uri") 166 + .in("uri", [newSite, oldPubUri]); 167 + 168 + const existingPubUri = pubs?.find((p) => p.uri === newSite)?.uri || 169 + pubs?.find((p) => p.uri === oldPubUri)?.uri; 170 + 171 + if (!existingPubUri) { 172 + return { success: false as const, error: "No matching publication found" }; 173 + } 174 + 175 + // Delete any existing entries for this document 176 + await supabaseServerClient 177 + .from("documents_in_publications") 178 + .delete() 179 + .eq("document", docUri); 180 + 181 + // Insert the correct entry 182 + const { error: insertError } = await supabaseServerClient 183 + .from("documents_in_publications") 184 + .insert({ 185 + document: docUri, 186 + publication: existingPubUri, 187 + }); 188 + 189 + if (insertError) { 190 + return { success: false as const, error: insertError.message }; 191 + } 192 + 193 + return { success: true as const, publication: existingPubUri }; 194 + }, 195 + ); 196 + 197 + if (joinResult.success) { 198 + stats.joinEntriesFixed++; 199 + } else { 200 + stats.errors.push(`Join table fix failed for ${docUri}: ${"error" in joinResult ? joinResult.error : "unknown error"}`); 201 + } 202 + } else { 203 + stats.errors.push(`${docUri}: ${result.error}`); 204 + } 205 + } 206 + } 207 + 208 + return { 209 + success: stats.errors.length === 0, 210 + stats, 211 + }; 212 + }, 213 + );
+186 -52
app/api/inngest/functions/migrate_user_to_standard.ts
··· 38 const stats = { 39 publicationsMigrated: 0, 40 documentsMigrated: 0, 41 userSubscriptionsMigrated: 0, 42 referencesUpdated: 0, 43 errors: [] as string[], 44 }; 45 46 // Step 1: Verify OAuth session is valid 47 - await step.run("verify-oauth-session", async () => { 48 const result = await restoreOAuthSession(did); 49 if (!result.ok) { 50 - throw new Error( 51 - `Failed to restore OAuth session: ${result.error.message}`, 52 - ); 53 } 54 return { success: true }; 55 }); 56 57 // Step 2: Get user's pub.leaflet.publication records 58 const oldPublications = await step.run( ··· 109 }) 110 .filter((x) => x !== null); 111 112 - // Run all PDS writes in parallel 113 - const pubPdsResults = await Promise.all( 114 - publicationsToMigrate.map(({ pub, rkey, newRecord }) => 115 - step.run(`pds-write-publication-${pub.uri}`, async () => { 116 const agent = await createAuthenticatedAgent(did); 117 const putResult = await agent.com.atproto.repo.putRecord({ 118 repo: did, ··· 121 record: newRecord, 122 validate: false, 123 }); 124 - return { oldUri: pub.uri, newUri: putResult.data.uri }; 125 - }), 126 - ), 127 - ); 128 129 - // Run all DB writes in parallel 130 - const pubDbResults = await Promise.all( 131 - publicationsToMigrate.map(({ pub, normalized, newRecord }, index) => { 132 - const newUri = pubPdsResults[index].newUri; 133 - return step.run(`db-write-publication-${pub.uri}`, async () => { 134 const { error: dbError } = await supabaseServerClient 135 .from("publications") 136 .upsert({ ··· 149 }; 150 } 151 return { success: true as const, oldUri: pub.uri, newUri }; 152 - }); 153 - }), 154 ); 155 156 // Process results 157 - for (const result of pubDbResults) { 158 if (result.success) { 159 publicationUriMap[result.oldUri] = result.newUri; 160 stats.publicationsMigrated++; ··· 239 $type: "site.standard.document", 240 title: normalized.title || "Untitled", 241 site: siteValue, 242 - path: rkey, 243 publishedAt: normalized.publishedAt || new Date().toISOString(), 244 description: normalized.description, 245 content: normalized.content, ··· 252 }) 253 .filter((x) => x !== null); 254 255 - // Run all PDS writes in parallel 256 - const docPdsResults = await Promise.all( 257 - documentsToMigrate.map(({ doc, rkey, newRecord }) => 258 - step.run(`pds-write-document-${doc.uri}`, async () => { 259 const agent = await createAuthenticatedAgent(did); 260 const putResult = await agent.com.atproto.repo.putRecord({ 261 repo: did, ··· 264 record: newRecord, 265 validate: false, 266 }); 267 - return { oldUri: doc.uri, newUri: putResult.data.uri }; 268 - }), 269 - ), 270 - ); 271 272 - // Run all DB writes in parallel 273 - const docDbResults = await Promise.all( 274 - documentsToMigrate.map(({ doc, newRecord, oldPubUri }, index) => { 275 - const newUri = docPdsResults[index].newUri; 276 - return step.run(`db-write-document-${doc.uri}`, async () => { 277 const { error: dbError } = await supabaseServerClient 278 .from("documents") 279 .upsert({ ··· 302 } 303 304 return { success: true as const, oldUri: doc.uri, newUri }; 305 - }); 306 - }), 307 ); 308 309 // Process results 310 - for (const result of docDbResults) { 311 if (result.success) { 312 documentUriMap[result.oldUri] = result.newUri; 313 stats.documentsMigrated++; ··· 318 } 319 } 320 321 // Step 5: Update references in database tables (all in parallel) 322 await step.run("update-references", async () => { 323 const pubEntries = Object.entries(publicationUriMap); ··· 428 }) 429 .filter((x) => x !== null); 430 431 - // Run all PDS writes in parallel 432 - const subPdsResults = await Promise.all( 433 subscriptionsToMigrate.map(({ sub, rkey, newRecord }) => 434 - step.run(`pds-write-subscription-${sub.uri}`, async () => { 435 const agent = await createAuthenticatedAgent(did); 436 const putResult = await agent.com.atproto.repo.putRecord({ 437 repo: did, ··· 440 record: newRecord, 441 validate: false, 442 }); 443 - return { oldUri: sub.uri, newUri: putResult.data.uri }; 444 - }), 445 - ), 446 - ); 447 448 - // Run all DB writes in parallel 449 - const subDbResults = await Promise.all( 450 - subscriptionsToMigrate.map(({ sub, newRecord }, index) => { 451 - const newUri = subPdsResults[index].newUri; 452 - return step.run(`db-write-subscription-${sub.uri}`, async () => { 453 const { error: dbError } = await supabaseServerClient 454 .from("publication_subscriptions") 455 .update({ ··· 467 }; 468 } 469 return { success: true as const, oldUri: sub.uri, newUri }; 470 - }); 471 - }), 472 ); 473 474 // Process results 475 - for (const result of subDbResults) { 476 if (result.success) { 477 userSubscriptionUriMap[result.oldUri] = result.newUri; 478 stats.userSubscriptionsMigrated++; ··· 489 // 2. External references (e.g., from other AT Proto apps) to old URIs continue to work 490 // 3. The normalization layer handles both schemas transparently for reads 491 // Old records are also kept on the user's PDS so existing AT-URI references remain valid. 492 493 return { 494 success: stats.errors.length === 0,
··· 38 const stats = { 39 publicationsMigrated: 0, 40 documentsMigrated: 0, 41 + standardDocumentsFixed: 0, 42 userSubscriptionsMigrated: 0, 43 referencesUpdated: 0, 44 errors: [] as string[], 45 }; 46 47 // Step 1: Verify OAuth session is valid 48 + const oauthValid = await step.run("verify-oauth-session", async () => { 49 const result = await restoreOAuthSession(did); 50 if (!result.ok) { 51 + // Mark identity as needing migration so we can retry later 52 + await supabaseServerClient 53 + .from("identities") 54 + .update({ 55 + metadata: { needsStandardSiteMigration: true }, 56 + }) 57 + .eq("atp_did", did); 58 + 59 + return { success: false, error: result.error.message }; 60 } 61 return { success: true }; 62 }); 63 + 64 + if (!oauthValid.success) { 65 + return { 66 + success: false, 67 + error: `Failed to restore OAuth session`, 68 + stats, 69 + publicationUriMap: {}, 70 + documentUriMap: {}, 71 + userSubscriptionUriMap: {}, 72 + }; 73 + } 74 75 // Step 2: Get user's pub.leaflet.publication records 76 const oldPublications = await step.run( ··· 127 }) 128 .filter((x) => x !== null); 129 130 + // Run PDS + DB writes together for each publication 131 + const pubResults = await Promise.all( 132 + publicationsToMigrate.map(({ pub, rkey, normalized, newRecord }) => 133 + step.run(`migrate-publication-${pub.uri}`, async () => { 134 + // PDS write 135 const agent = await createAuthenticatedAgent(did); 136 const putResult = await agent.com.atproto.repo.putRecord({ 137 repo: did, ··· 140 record: newRecord, 141 validate: false, 142 }); 143 + const newUri = putResult.data.uri; 144 145 + // DB write 146 const { error: dbError } = await supabaseServerClient 147 .from("publications") 148 .upsert({ ··· 161 }; 162 } 163 return { success: true as const, oldUri: pub.uri, newUri }; 164 + }), 165 + ), 166 ); 167 168 // Process results 169 + for (const result of pubResults) { 170 if (result.success) { 171 publicationUriMap[result.oldUri] = result.newUri; 172 stats.publicationsMigrated++; ··· 251 $type: "site.standard.document", 252 title: normalized.title || "Untitled", 253 site: siteValue, 254 + path: "/" + rkey, 255 publishedAt: normalized.publishedAt || new Date().toISOString(), 256 description: normalized.description, 257 content: normalized.content, ··· 264 }) 265 .filter((x) => x !== null); 266 267 + // Run PDS + DB writes together for each document 268 + const docResults = await Promise.all( 269 + documentsToMigrate.map(({ doc, rkey, newRecord, oldPubUri }) => 270 + step.run(`migrate-document-${doc.uri}`, async () => { 271 + // PDS write 272 const agent = await createAuthenticatedAgent(did); 273 const putResult = await agent.com.atproto.repo.putRecord({ 274 repo: did, ··· 277 record: newRecord, 278 validate: false, 279 }); 280 + const newUri = putResult.data.uri; 281 282 + // DB write 283 const { error: dbError } = await supabaseServerClient 284 .from("documents") 285 .upsert({ ··· 308 } 309 310 return { success: true as const, oldUri: doc.uri, newUri }; 311 + }), 312 + ), 313 ); 314 315 // Process results 316 + for (const result of docResults) { 317 if (result.success) { 318 documentUriMap[result.oldUri] = result.newUri; 319 stats.documentsMigrated++; ··· 324 } 325 } 326 327 + // Step 4b: Fix existing site.standard.document records that reference pub.leaflet.publication 328 + // This handles the case where site.standard.document records were created pointing to 329 + // pub.leaflet.publication URIs before the publication was migrated to site.standard.publication 330 + const existingStandardDocs = await step.run( 331 + "fetch-existing-standard-documents", 332 + async () => { 333 + const { data, error } = await supabaseServerClient 334 + .from("documents") 335 + .select("uri, data") 336 + .like("uri", `at://${did}/site.standard.document/%`); 337 + 338 + if (error) 339 + throw new Error( 340 + `Failed to fetch existing standard documents: ${error.message}`, 341 + ); 342 + return data || []; 343 + }, 344 + ); 345 + 346 + // Find documents that reference pub.leaflet.publication and need their site field updated 347 + const standardDocsToFix = existingStandardDocs 348 + .map((doc) => { 349 + const data = doc.data as SiteStandardDocument.Record; 350 + const site = data?.site; 351 + 352 + // Check if site field references a pub.leaflet.publication 353 + if (!site || !site.includes("/pub.leaflet.publication/")) { 354 + return null; 355 + } 356 + 357 + try { 358 + const oldPubAturi = new AtUri(site); 359 + const newPubUri = `at://${oldPubAturi.hostname}/site.standard.publication/${oldPubAturi.rkey}`; 360 + 361 + // Only fix if we have the new publication in our map (meaning it was migrated) 362 + // or if the new publication exists (check against all migrated publications) 363 + if ( 364 + publicationUriMap[site] || 365 + Object.values(publicationUriMap).includes(newPubUri) 366 + ) { 367 + const docAturi = new AtUri(doc.uri); 368 + const updatedRecord: SiteStandardDocument.Record = { 369 + ...data, 370 + site: newPubUri, 371 + }; 372 + 373 + return { 374 + doc, 375 + rkey: docAturi.rkey, 376 + oldSite: site, 377 + newSite: newPubUri, 378 + updatedRecord, 379 + }; 380 + } 381 + } catch (e) { 382 + stats.errors.push(`Invalid site URI in document ${doc.uri}: ${site}`); 383 + } 384 + 385 + return null; 386 + }) 387 + .filter((x) => x !== null); 388 + 389 + // Update these documents on PDS and in database 390 + if (standardDocsToFix.length > 0) { 391 + const fixResults = await Promise.all( 392 + standardDocsToFix.map(({ doc, rkey, oldSite, newSite, updatedRecord }) => 393 + step.run(`fix-standard-document-${doc.uri}`, async () => { 394 + // PDS write to update the site field 395 + const agent = await createAuthenticatedAgent(did); 396 + await agent.com.atproto.repo.putRecord({ 397 + repo: did, 398 + collection: "site.standard.document", 399 + rkey, 400 + record: updatedRecord, 401 + validate: false, 402 + }); 403 + 404 + // DB write 405 + const { error: dbError } = await supabaseServerClient 406 + .from("documents") 407 + .update({ data: updatedRecord as Json }) 408 + .eq("uri", doc.uri); 409 + 410 + if (dbError) { 411 + return { 412 + success: false as const, 413 + uri: doc.uri, 414 + error: dbError.message, 415 + }; 416 + } 417 + 418 + // Update documents_in_publications to point to new publication URI 419 + await supabaseServerClient 420 + .from("documents_in_publications") 421 + .upsert({ 422 + publication: newSite, 423 + document: doc.uri, 424 + }); 425 + 426 + // Remove old publication reference if different 427 + if (oldSite !== newSite) { 428 + await supabaseServerClient 429 + .from("documents_in_publications") 430 + .delete() 431 + .eq("publication", oldSite) 432 + .eq("document", doc.uri); 433 + } 434 + 435 + return { success: true as const, uri: doc.uri }; 436 + }), 437 + ), 438 + ); 439 + 440 + for (const result of fixResults) { 441 + if (result.success) { 442 + stats.standardDocumentsFixed++; 443 + } else { 444 + stats.errors.push( 445 + `Fix standard document ${result.uri}: Database error: ${result.error}`, 446 + ); 447 + } 448 + } 449 + } 450 + 451 // Step 5: Update references in database tables (all in parallel) 452 await step.run("update-references", async () => { 453 const pubEntries = Object.entries(publicationUriMap); ··· 558 }) 559 .filter((x) => x !== null); 560 561 + // Run PDS + DB writes together for each subscription 562 + const subResults = await Promise.all( 563 subscriptionsToMigrate.map(({ sub, rkey, newRecord }) => 564 + step.run(`migrate-subscription-${sub.uri}`, async () => { 565 + // PDS write 566 const agent = await createAuthenticatedAgent(did); 567 const putResult = await agent.com.atproto.repo.putRecord({ 568 repo: did, ··· 571 record: newRecord, 572 validate: false, 573 }); 574 + const newUri = putResult.data.uri; 575 576 + // DB write 577 const { error: dbError } = await supabaseServerClient 578 .from("publication_subscriptions") 579 .update({ ··· 591 }; 592 } 593 return { success: true as const, oldUri: sub.uri, newUri }; 594 + }), 595 + ), 596 ); 597 598 // Process results 599 + for (const result of subResults) { 600 if (result.success) { 601 userSubscriptionUriMap[result.oldUri] = result.newUri; 602 stats.userSubscriptionsMigrated++; ··· 613 // 2. External references (e.g., from other AT Proto apps) to old URIs continue to work 614 // 3. The normalization layer handles both schemas transparently for reads 615 // Old records are also kept on the user's PDS so existing AT-URI references remain valid. 616 + 617 + // Clear the migration flag on success 618 + if (stats.errors.length === 0) { 619 + await step.run("clear-migration-flag", async () => { 620 + await supabaseServerClient 621 + .from("identities") 622 + .update({ metadata: null }) 623 + .eq("atp_did", did); 624 + }); 625 + } 626 627 return { 628 success: stats.errors.length === 0,
+8
app/api/inngest/route.tsx
··· 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 import { index_follows } from "./functions/index_follows"; 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 9 export const { GET, POST, PUT } = serve({ 10 client: inngest, ··· 14 batched_update_profiles, 15 index_follows, 16 migrate_user_to_standard, 17 ], 18 });
··· 5 import { batched_update_profiles } from "./functions/batched_update_profiles"; 6 import { index_follows } from "./functions/index_follows"; 7 import { migrate_user_to_standard } from "./functions/migrate_user_to_standard"; 8 + import { fix_standard_document_publications } from "./functions/fix_standard_document_publications"; 9 + import { 10 + cleanup_expired_oauth_sessions, 11 + check_oauth_session, 12 + } from "./functions/cleanup_expired_oauth_sessions"; 13 14 export const { GET, POST, PUT } = serve({ 15 client: inngest, ··· 19 batched_update_profiles, 20 index_follows, 21 migrate_user_to_standard, 22 + fix_standard_document_publications, 23 + cleanup_expired_oauth_sessions, 24 + check_oauth_session, 25 ], 26 });
+11
app/api/oauth/[route]/route.ts
··· 11 ActionAfterSignIn, 12 parseActionFromSearchParam, 13 } from "./afterSignInActions"; 14 15 type OauthRequestClientState = { 16 redirect: string | null; ··· 84 .single(); 85 identity = data; 86 } 87 let { data: token } = await supabaseServerClient 88 .from("email_auth_tokens") 89 .insert({
··· 11 ActionAfterSignIn, 12 parseActionFromSearchParam, 13 } from "./afterSignInActions"; 14 + import { inngest } from "app/api/inngest/client"; 15 16 type OauthRequestClientState = { 17 redirect: string | null; ··· 85 .single(); 86 identity = data; 87 } 88 + 89 + // Trigger migration if identity needs it 90 + const metadata = identity?.metadata as Record<string, unknown> | null; 91 + if (metadata?.needsStandardSiteMigration) { 92 + await inngest.send({ 93 + name: "user/migrate-to-standard", 94 + data: { did: session.did }, 95 + }); 96 + } 97 + 98 let { data: token } = await supabaseServerClient 99 .from("email_auth_tokens") 100 .insert({
+34
app/lish/[did]/[publication]/.well-known/site.standard.publication/route.ts
···
··· 1 + import { publicationNameOrUriFilter } from "src/utils/uriHelpers"; 2 + import { supabaseServerClient } from "supabase/serverClient"; 3 + 4 + export async function GET( 5 + req: Request, 6 + props: { 7 + params: Promise<{ publication: string; did: string }>; 8 + }, 9 + ) { 10 + let params = await props.params; 11 + let did = decodeURIComponent(params.did); 12 + let publication_name = decodeURIComponent(params.publication); 13 + let [{ data: publications }] = await Promise.all([ 14 + supabaseServerClient 15 + .from("publications") 16 + .select( 17 + `*, 18 + publication_subscriptions(*), 19 + documents_in_publications(documents( 20 + *, 21 + comments_on_documents(count), 22 + document_mentions_in_bsky(count) 23 + )) 24 + `, 25 + ) 26 + .eq("identity_did", did) 27 + .or(publicationNameOrUriFilter(did, publication_name)) 28 + .order("uri", { ascending: false }) 29 + .limit(1), 30 + ]); 31 + let publication = publications?.[0]; 32 + if (!did || !publication) return new Response(null, { status: 404 }); 33 + return new Response(publication.uri); 34 + }
+7 -4
app/lish/[did]/[publication]/[rkey]/page.tsx
··· 35 sizes: "32x32", 36 type: "image/png", 37 }, 38 - other: { 39 - rel: "alternate", 40 - url: document.uri, 41 - }, 42 }, 43 title: 44 docRecord.title +
··· 35 sizes: "32x32", 36 type: "image/png", 37 }, 38 + other: [ 39 + { 40 + rel: "alternate", 41 + url: document.uri, 42 + }, 43 + { rel: "site.standard.document", url: document.uri }, 44 + ], 45 }, 46 title: 47 docRecord.title +
+1
app/lish/createPub/updatePublication.ts
··· 278 return withPublicationUpdate(uri, async ({ normalizedPub, existingBasePath, publicationType, agent }) => { 279 // Build theme object 280 const themeData = { 281 backgroundImage: theme.backgroundImage 282 ? { 283 $type: "pub.leaflet.theme.backgroundImage",
··· 278 return withPublicationUpdate(uri, async ({ normalizedPub, existingBasePath, publicationType, agent }) => { 279 // Build theme object 280 const themeData = { 281 + $type: "pub.leaflet.publication#theme" as const, 282 backgroundImage: theme.backgroundImage 283 ? { 284 $type: "pub.leaflet.theme.backgroundImage",
+7 -4
app/p/[didOrHandle]/[rkey]/page.tsx
··· 38 39 return { 40 icons: { 41 - other: { 42 - rel: "alternate", 43 - url: document.uri, 44 - }, 45 }, 46 title: docRecord.title, 47 description: docRecord?.description || "",
··· 38 39 return { 40 icons: { 41 + other: [ 42 + { 43 + rel: "alternate", 44 + url: document.uri, 45 + }, 46 + { rel: "site.standard.document", url: document.uri }, 47 + ], 48 }, 49 title: docRecord.title, 50 description: docRecord?.description || "",
+1
drizzle/schema.ts
··· 140 email: text("email"), 141 atp_did: text("atp_did"), 142 interface_state: jsonb("interface_state"), 143 }, 144 (table) => { 145 return {
··· 140 email: text("email"), 141 atp_did: text("atp_did"), 142 interface_state: jsonb("interface_state"), 143 + metadata: jsonb("metadata"), 144 }, 145 (table) => { 146 return {
+2 -2
lexicons/api/lexicons.ts
··· 2215 type: 'ref', 2216 }, 2217 theme: { 2218 - type: 'ref', 2219 - ref: 'lex:pub.leaflet.publication#theme', 2220 }, 2221 description: { 2222 maxGraphemes: 300,
··· 2215 type: 'ref', 2216 }, 2217 theme: { 2218 + type: 'union', 2219 + refs: ['lex:pub.leaflet.publication#theme'], 2220 }, 2221 description: { 2222 maxGraphemes: 300,
+1 -1
lexicons/api/types/site/standard/publication.ts
··· 15 export interface Record { 16 $type: 'site.standard.publication' 17 basicTheme?: SiteStandardThemeBasic.Main 18 - theme?: PubLeafletPublication.Theme 19 description?: string 20 icon?: BlobRef 21 name: string
··· 15 export interface Record { 16 $type: 'site.standard.publication' 17 basicTheme?: SiteStandardThemeBasic.Main 18 + theme?: $Typed<PubLeafletPublication.Theme> | { $type: string } 19 description?: string 20 icon?: BlobRef 21 name: string
+2 -2
lexicons/site/standard/publication.json
··· 9 "type": "ref" 10 }, 11 "theme": { 12 - "type": "ref", 13 - "ref": "pub.leaflet.publication#theme" 14 }, 15 "description": { 16 "maxGraphemes": 300,
··· 9 "type": "ref" 10 }, 11 "theme": { 12 + "type": "union", 13 + "refs": ["pub.leaflet.publication#theme"] 14 }, 15 "description": { 16 "maxGraphemes": 300,
+40 -5
lexicons/src/normalize.ts
··· 14 */ 15 16 import type * as PubLeafletDocument from "../api/types/pub/leaflet/document"; 17 - import type * as PubLeafletPublication from "../api/types/pub/leaflet/publication"; 18 import type * as PubLeafletContent from "../api/types/pub/leaflet/content"; 19 import type * as SiteStandardDocument from "../api/types/site/standard/document"; 20 import type * as SiteStandardPublication from "../api/types/site/standard/publication"; ··· 31 }; 32 33 // Normalized publication type - uses the generated site.standard.publication type 34 - export type NormalizedPublication = SiteStandardPublication.Record; 35 36 /** 37 * Checks if the record is a pub.leaflet.document ··· 210 ): NormalizedPublication | null { 211 if (!record || typeof record !== "object") return null; 212 213 - // Pass through site.standard records directly 214 if (isStandardPublication(record)) { 215 - return record; 216 } 217 218 if (isLeafletPublication(record)) { ··· 225 226 const basicTheme = leafletThemeToBasicTheme(record.theme); 227 228 // Convert preferences to site.standard format (strip/replace $type) 229 const preferences: SiteStandardPublication.Preferences | undefined = 230 record.preferences ··· 243 description: record.description, 244 icon: record.icon, 245 basicTheme, 246 - theme: record.theme, 247 preferences, 248 }; 249 }
··· 14 */ 15 16 import type * as PubLeafletDocument from "../api/types/pub/leaflet/document"; 17 + import * as PubLeafletPublication from "../api/types/pub/leaflet/publication"; 18 import type * as PubLeafletContent from "../api/types/pub/leaflet/content"; 19 import type * as SiteStandardDocument from "../api/types/site/standard/document"; 20 import type * as SiteStandardPublication from "../api/types/site/standard/publication"; ··· 31 }; 32 33 // Normalized publication type - uses the generated site.standard.publication type 34 + // with the theme narrowed to only the valid pub.leaflet.publication#theme type 35 + // (isTheme validates that $type is present, so we use $Typed) 36 + // Note: We explicitly list fields rather than using Omit because the generated Record type 37 + // has an index signature [k: string]: unknown that interferes with property typing 38 + export type NormalizedPublication = { 39 + $type: "site.standard.publication"; 40 + name: string; 41 + url: string; 42 + description?: string; 43 + icon?: SiteStandardPublication.Record["icon"]; 44 + basicTheme?: SiteStandardThemeBasic.Main; 45 + theme?: $Typed<PubLeafletPublication.Theme>; 46 + preferences?: SiteStandardPublication.Preferences; 47 + }; 48 49 /** 50 * Checks if the record is a pub.leaflet.document ··· 223 ): NormalizedPublication | null { 224 if (!record || typeof record !== "object") return null; 225 226 + // Pass through site.standard records directly, but validate the theme 227 if (isStandardPublication(record)) { 228 + // Validate theme - only keep if it's a valid pub.leaflet.publication#theme 229 + const theme = PubLeafletPublication.isTheme(record.theme) 230 + ? (record.theme as $Typed<PubLeafletPublication.Theme>) 231 + : undefined; 232 + return { 233 + ...record, 234 + theme, 235 + }; 236 } 237 238 if (isLeafletPublication(record)) { ··· 245 246 const basicTheme = leafletThemeToBasicTheme(record.theme); 247 248 + // Validate theme - only keep if it's a valid pub.leaflet.publication#theme with $type set 249 + // For legacy records without $type, add it during normalization 250 + let theme: $Typed<PubLeafletPublication.Theme> | undefined; 251 + if (record.theme) { 252 + if (PubLeafletPublication.isTheme(record.theme)) { 253 + theme = record.theme as $Typed<PubLeafletPublication.Theme>; 254 + } else { 255 + // Legacy theme without $type - add it 256 + theme = { 257 + ...record.theme, 258 + $type: "pub.leaflet.publication#theme", 259 + }; 260 + } 261 + } 262 + 263 // Convert preferences to site.standard format (strip/replace $type) 264 const preferences: SiteStandardPublication.Preferences | undefined = 265 record.preferences ··· 278 description: record.description, 279 icon: record.icon, 280 basicTheme, 281 + theme, 282 preferences, 283 }; 284 }
+3
supabase/database.types.ts
··· 551 home_page: string 552 id: string 553 interface_state: Json | null 554 } 555 Insert: { 556 atp_did?: string | null ··· 559 home_page?: string 560 id?: string 561 interface_state?: Json | null 562 } 563 Update: { 564 atp_did?: string | null ··· 567 home_page?: string 568 id?: string 569 interface_state?: Json | null 570 } 571 Relationships: [ 572 {
··· 551 home_page: string 552 id: string 553 interface_state: Json | null 554 + metadata: Json | null 555 } 556 Insert: { 557 atp_did?: string | null ··· 560 home_page?: string 561 id?: string 562 interface_state?: Json | null 563 + metadata?: Json | null 564 } 565 Update: { 566 atp_did?: string | null ··· 569 home_page?: string 570 id?: string 571 interface_state?: Json | null 572 + metadata?: Json | null 573 } 574 Relationships: [ 575 {
+1
supabase/migrations/20260123000000_add_metadata_to_identities.sql
···
··· 1 + alter table "public"."identities" add column "metadata" jsonb;