A fullstack app for indexing standard.site documents
at main 307 lines 8.3 kB view raw
1import { resolvePds } from "./resolver"; 2import { parseAtUri } from "./at-uri"; 3import { buildBlobUrl, extractBlobCid } from "./blob"; 4import { verifyDocumentRecord } from "./verification"; 5 6// Raw document record from PDS 7interface DocumentRecord { 8 site?: string; 9 path?: string; 10 title?: string; 11 description?: string; 12 coverImage?: unknown; 13 content?: unknown; 14 textContent?: string; 15 bskyPostRef?: { uri: string; cid: string }; 16 tags?: string[]; 17 publishedAt?: string; 18 updatedAt?: string; 19} 20 21// Raw publication record from PDS 22interface PublicationRecord { 23 url?: string; 24 name?: string; 25 description?: string; 26 icon?: unknown; 27} 28 29// Resolved publication data 30interface ResolvedPublication { 31 url: string; 32 name: string; 33 description: string | null; 34 iconCid: string | null; 35 iconUrl: string | null; 36} 37 38/** 39 * Fetches a publication record from an at:// URI 40 */ 41async function fetchPublication( 42 db: D1Database, 43 siteUri: string, 44): Promise<ResolvedPublication | null> { 45 const parsed = parseAtUri(siteUri); 46 if (!parsed) return null; 47 48 try { 49 const pds = await resolvePds(db, parsed.did); 50 if (!pds) return null; 51 52 const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent( 53 parsed.did, 54 )}&collection=${encodeURIComponent(parsed.collection)}&rkey=${encodeURIComponent( 55 parsed.rkey, 56 )}`; 57 58 const response = await fetch(url); 59 if (!response.ok) return null; 60 61 const data = (await response.json()) as { value?: PublicationRecord }; 62 const pub = data.value; 63 if (!pub?.url || !pub?.name) return null; 64 65 const iconCid = extractBlobCid(pub.icon); 66 const iconUrl = iconCid ? buildBlobUrl(pds, parsed.did, iconCid) : null; 67 68 return { 69 url: pub.url, 70 name: pub.name, 71 description: pub.description || null, 72 iconCid, 73 iconUrl, 74 }; 75 } catch { 76 return null; 77 } 78} 79 80/** 81 * Resolves the view URL for a document. 82 * If site is an at:// URI, fetches the publication to get the base URL. 83 * If site is an https:// URL, uses it directly. 84 */ 85export async function resolveViewUrl( 86 db: D1Database, 87 siteUri: string, 88 path: string, 89): Promise<string | null> { 90 // Check if site is an at:// URI or direct URL 91 if (siteUri.startsWith("at://")) { 92 const pub = await fetchPublication(db, siteUri); 93 if (!pub?.url) return null; 94 const baseUrl = pub.url.startsWith("http") ? pub.url : `https://${pub.url}`; 95 return new URL(path, baseUrl).toString(); 96 } 97 98 // Direct URL 99 const baseUrl = siteUri.startsWith("http") ? siteUri : `https://${siteUri}`; 100 return new URL(path, baseUrl).toString(); 101} 102 103/** 104 * Processes a document record: fetches from PDS, resolves publication, 105 * and stores all fields in resolved_documents table. 106 */ 107export async function processDocument( 108 db: D1Database, 109 did: string, 110 collection: string, 111 rkey: string, 112) { 113 try { 114 // 1. Resolve PDS 115 const pds = await resolvePds(db, did); 116 if (!pds) { 117 console.warn(`Could not resolve PDS for ${did}`); 118 return; 119 } 120 121 // 2. Fetch Document Record 122 const url = `${pds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent( 123 did, 124 )}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}`; 125 126 const response = await fetch(url); 127 if (!response.ok) { 128 if (response.status === 404) { 129 // Record was deleted from PDS - clean up our local copy 130 console.warn( 131 `Record not found (deleted): ${did}/${collection}/${rkey}`, 132 ); 133 const uri = `at://${did}/${collection}/${rkey}`; 134 await db 135 .prepare("DELETE FROM resolved_documents WHERE uri = ?") 136 .bind(uri) 137 .run(); 138 await db 139 .prepare( 140 "DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?", 141 ) 142 .bind(did, collection, rkey) 143 .run(); 144 return; // Not an error, just cleanup 145 } 146 // Other errors (5xx, rate limits, etc.) should be retried 147 throw new Error( 148 `Failed to fetch record: ${response.status} ${response.statusText}`, 149 ); 150 } 151 152 const data = (await response.json()) as { 153 uri: string; 154 cid?: string; 155 value: DocumentRecord; 156 }; 157 158 const { value, cid } = data; 159 160 // 3. Update repo_records 161 await db 162 .prepare( 163 `INSERT INTO repo_records (did, rkey, collection, cid, synced_at) 164 VALUES (?, ?, ?, ?, datetime('now')) 165 ON CONFLICT(did, collection, rkey) DO UPDATE SET 166 cid = ?, 167 synced_at = datetime('now')`, 168 ) 169 .bind(did, rkey, collection, cid || null, cid || null) 170 .run(); 171 172 // 4. Extract document fields 173 const title = value.title || null; 174 const description = value.description || null; 175 const path = value.path || null; 176 const site = value.site || null; 177 const content = value.content ? JSON.stringify(value.content) : null; 178 const textContent = value.textContent || null; 179 const coverImageCid = extractBlobCid(value.coverImage); 180 const coverImageUrl = coverImageCid 181 ? buildBlobUrl(pds, did, coverImageCid) 182 : null; 183 const bskyPostRef = value.bskyPostRef 184 ? JSON.stringify(value.bskyPostRef) 185 : null; 186 const tags = value.tags ? JSON.stringify(value.tags) : null; 187 const publishedAt = value.publishedAt || null; 188 const updatedAt = value.updatedAt || null; 189 190 // 5. Resolve publication if site is at:// URI 191 let pubUrl: string | null = null; 192 let pubName: string | null = null; 193 let pubDescription: string | null = null; 194 let pubIconCid: string | null = null; 195 let pubIconUrl: string | null = null; 196 let viewUrl: string | null = null; 197 198 if (site) { 199 if (site.startsWith("at://")) { 200 // Fetch publication record 201 const pub = await fetchPublication(db, site); 202 if (pub) { 203 pubUrl = pub.url; 204 pubName = pub.name; 205 pubDescription = pub.description; 206 pubIconCid = pub.iconCid; 207 pubIconUrl = pub.iconUrl; 208 // Construct view URL 209 if (pubUrl && path) { 210 const baseUrl = pubUrl.startsWith("http") 211 ? pubUrl 212 : `https://${pubUrl}`; 213 viewUrl = new URL(path, baseUrl).toString(); 214 } 215 } 216 } else { 217 // Site is a direct URL (loose document) 218 pubUrl = site; 219 if (path) { 220 const baseUrl = site.startsWith("http") ? site : `https://${site}`; 221 viewUrl = new URL(path, baseUrl).toString(); 222 } 223 } 224 } 225 226 // 6. Verify the document 227 const uri = `at://${did}/${collection}/${rkey}`; 228 const verified = await verifyDocumentRecord(pubUrl, site, viewUrl, uri); 229 230 // 7. Insert/update resolved_documents 231 const STALE_OFFSET_HOURS = 24; 232 233 await db 234 .prepare( 235 `INSERT INTO resolved_documents ( 236 uri, did, rkey, title, description, path, site, content, text_content, 237 cover_image_cid, cover_image_url, bsky_post_ref, tags, 238 published_at, updated_at, pub_url, pub_name, pub_description, 239 pub_icon_cid, pub_icon_url, view_url, pds_endpoint, 240 resolved_at, stale_at, verified 241 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours'), ?) 242 ON CONFLICT(uri) DO UPDATE SET 243 title = ?, description = ?, path = ?, site = ?, content = ?, text_content = ?, 244 cover_image_cid = ?, cover_image_url = ?, bsky_post_ref = ?, tags = ?, 245 published_at = ?, updated_at = ?, pub_url = ?, pub_name = ?, pub_description = ?, 246 pub_icon_cid = ?, pub_icon_url = ?, view_url = ?, pds_endpoint = ?, 247 resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours'), verified = ?`, 248 ) 249 .bind( 250 // INSERT values 251 uri, 252 did, 253 rkey, 254 title, 255 description, 256 path, 257 site, 258 content, 259 textContent, 260 coverImageCid, 261 coverImageUrl, 262 bskyPostRef, 263 tags, 264 publishedAt, 265 updatedAt, 266 pubUrl, 267 pubName, 268 pubDescription, 269 pubIconCid, 270 pubIconUrl, 271 viewUrl, 272 pds, 273 verified ? 1 : 0, 274 // UPDATE values 275 title, 276 description, 277 path, 278 site, 279 content, 280 textContent, 281 coverImageCid, 282 coverImageUrl, 283 bskyPostRef, 284 tags, 285 publishedAt, 286 updatedAt, 287 pubUrl, 288 pubName, 289 pubDescription, 290 pubIconCid, 291 pubIconUrl, 292 viewUrl, 293 pds, 294 verified ? 1 : 0, 295 ) 296 .run(); 297 298 console.log(`Processed document: ${uri}`); 299 } catch (error) { 300 console.error( 301 `Error processing document ${did}/${collection}/${rkey}:`, 302 error, 303 ); 304 // Re-throw so the queue handler can retry 305 throw error; 306 } 307}