A fullstack app for indexing standard.site documents
at main 270 lines 9.0 kB view raw
1import { Hono } from "hono"; 2import type { Bindings, TapEvent } from "../types"; 3import { resolveViewUrl } from "../utils"; 4 5const STALE_OFFSET_HOURS = 24; 6 7const webhook = new Hono<{ Bindings: Bindings }>(); 8 9webhook.post("/tap", async (c) => { 10 try { 11 const db = c.env.DB; 12 13 const secret = c.env.TAP_WEBHOOK_SECRET; 14 if (secret) { 15 const auth = c.req.header("Authorization"); 16 // Support both Bearer token (legacy) and Basic Auth (Tap default) 17 // Tap sends Basic Auth as base64("admin:password") 18 const expectedBasic = `Basic ${btoa(`admin:${secret}`)}`; 19 const expectedBearer = `Bearer ${secret}`; 20 21 if (auth !== expectedBasic && auth !== expectedBearer) { 22 return c.json({ error: "Unauthorized" }, 401); 23 } 24 } 25 26 const event = (await c.req.json()) as TapEvent; 27 28 if (event.type === "record") { 29 const { record } = event; 30 31 if (record.collection === "site.standard.document") { 32 if (record.action === "create" || record.action === "update") { 33 await db 34 .prepare( 35 `INSERT INTO repo_records (did, rkey, collection, cid, synced_at) 36 VALUES (?, ?, ?, ?, datetime('now')) 37 ON CONFLICT(did, collection, rkey) DO UPDATE SET 38 cid = ?, 39 synced_at = datetime('now')` 40 ) 41 .bind( 42 record.did, 43 record.rkey, 44 record.collection, 45 record.cid || null, 46 record.cid || null 47 ) 48 .run(); 49 50 if (record.record) { 51 const uri = `at://${record.did}/${record.collection}/${record.rkey}`; 52 const doc = record.record as { 53 title?: string; 54 path?: string; 55 site?: string; 56 content?: unknown; 57 textContent?: string; 58 publishedAt?: string; 59 }; 60 61 let viewUrl: string | null = null; 62 if (doc.site && doc.path) { 63 viewUrl = await resolveViewUrl(db, doc.site, doc.path); 64 } 65 66 await db 67 .prepare( 68 `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at) 69 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours')) 70 ON CONFLICT(uri) DO UPDATE SET 71 title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')` 72 ) 73 .bind( 74 uri, 75 record.did, 76 record.rkey, 77 doc.title || null, 78 doc.path || null, 79 doc.site || null, 80 doc.content ? JSON.stringify(doc.content) : null, 81 doc.textContent || null, 82 doc.publishedAt || null, 83 viewUrl, 84 doc.title || null, 85 doc.path || null, 86 doc.site || null, 87 doc.content ? JSON.stringify(doc.content) : null, 88 doc.textContent || null, 89 doc.publishedAt || null, 90 viewUrl 91 ) 92 .run(); 93 } 94 95 // Queue for immediate full processing (verification, publication resolution, etc.) 96 await c.env.RESOLUTION_QUEUE.send({ 97 did: record.did, 98 collection: record.collection, 99 rkey: record.rkey, 100 }); 101 } else if (record.action === "delete") { 102 await db 103 .prepare( 104 "DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?" 105 ) 106 .bind(record.did, record.collection, record.rkey) 107 .run(); 108 109 const uri = `at://${record.did}/${record.collection}/${record.rkey}`; 110 await db 111 .prepare("DELETE FROM resolved_documents WHERE uri = ?") 112 .bind(uri) 113 .run(); 114 } 115 } 116 } 117 118 return c.json({ ok: true }); 119 } catch (error) { 120 console.error("Webhook error:", error); 121 return c.json( 122 { error: "Failed to process webhook", details: String(error) }, 123 500 124 ); 125 } 126}); 127 128webhook.post("/tap/batch", async (c) => { 129 try { 130 const db = c.env.DB; 131 132 const secret = c.env.TAP_WEBHOOK_SECRET; 133 if (secret) { 134 const auth = c.req.header("Authorization"); 135 // Support both Bearer token (legacy) and Basic Auth (Tap default) 136 // Tap sends Basic Auth as base64("admin:password") 137 const expectedBasic = `Basic ${btoa(`admin:${secret}`)}`; 138 const expectedBearer = `Bearer ${secret}`; 139 140 if (auth !== expectedBasic && auth !== expectedBearer) { 141 return c.json({ error: "Unauthorized" }, 401); 142 } 143 } 144 145 const events = (await c.req.json()) as Array<{ 146 type: string; 147 did: string; 148 collection?: string; 149 rkey?: string; 150 cid?: string; 151 record?: Record<string, unknown>; 152 }>; 153 154 let processed = 0; 155 let errors = 0; 156 157 for (const event of events) { 158 try { 159 if ( 160 (event.type === "commit" || 161 event.type === "create" || 162 event.type === "update") && 163 event.collection === "site.standard.document" && 164 event.did && 165 event.rkey 166 ) { 167 await db 168 .prepare( 169 `INSERT INTO repo_records (did, rkey, collection, cid, synced_at) 170 VALUES (?, ?, ?, ?, datetime('now')) 171 ON CONFLICT(did, collection, rkey) DO UPDATE SET cid = ?, synced_at = datetime('now')` 172 ) 173 .bind( 174 event.did, 175 event.rkey, 176 event.collection, 177 event.cid || null, 178 event.cid || null 179 ) 180 .run(); 181 182 if (event.record) { 183 const uri = `at://${event.did}/${event.collection}/${event.rkey}`; 184 const doc = event.record as { 185 title?: string; 186 path?: string; 187 site?: string; 188 content?: unknown; 189 textContent?: string; 190 publishedAt?: string; 191 }; 192 193 let viewUrl: string | null = null; 194 if (doc.site && doc.path) { 195 viewUrl = await resolveViewUrl(db, doc.site, doc.path); 196 } 197 198 await db 199 .prepare( 200 `INSERT INTO resolved_documents (uri, did, rkey, title, path, site, content, text_content, published_at, view_url, resolved_at, stale_at) 201 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now', '+${STALE_OFFSET_HOURS} hours')) 202 ON CONFLICT(uri) DO UPDATE SET 203 title = ?, path = ?, site = ?, content = ?, text_content = ?, published_at = ?, view_url = ?, resolved_at = datetime('now'), stale_at = datetime('now', '+${STALE_OFFSET_HOURS} hours')` 204 ) 205 .bind( 206 uri, 207 event.did, 208 event.rkey, 209 doc.title || null, 210 doc.path || null, 211 doc.site || null, 212 doc.content ? JSON.stringify(doc.content) : null, 213 doc.textContent || null, 214 doc.publishedAt || null, 215 viewUrl, 216 doc.title || null, 217 doc.path || null, 218 doc.site || null, 219 doc.content ? JSON.stringify(doc.content) : null, 220 doc.textContent || null, 221 doc.publishedAt || null, 222 viewUrl 223 ) 224 .run(); 225 } 226 227 // Queue for immediate full processing 228 await c.env.RESOLUTION_QUEUE.send({ 229 did: event.did, 230 collection: event.collection, 231 rkey: event.rkey, 232 }); 233 234 processed++; 235 } else if ( 236 event.type === "delete" && 237 event.collection === "site.standard.document" && 238 event.did && 239 event.rkey 240 ) { 241 await db 242 .prepare( 243 "DELETE FROM repo_records WHERE did = ? AND collection = ? AND rkey = ?" 244 ) 245 .bind(event.did, event.collection, event.rkey) 246 .run(); 247 248 const uri = `at://${event.did}/${event.collection}/${event.rkey}`; 249 await db 250 .prepare("DELETE FROM resolved_documents WHERE uri = ?") 251 .bind(uri) 252 .run(); 253 processed++; 254 } 255 } catch { 256 errors++; 257 } 258 } 259 260 return c.json({ ok: true, processed, errors }); 261 } catch (error) { 262 console.error("Batch webhook error:", error); 263 return c.json( 264 { error: "Failed to process batch webhook", details: String(error) }, 265 500 266 ); 267 } 268}); 269 270export default webhook;