open source is social v-it.org
at main 323 lines 9.3 kB view raw
1// SPDX-License-Identifier: MIT 2// Copyright (c) 2026 sol pbc 3 4import { resolveHandles } from './resolve.js'; 5 6const CAP_COLLECTION = 'org.v-it.cap'; 7const VOUCH_COLLECTION = 'org.v-it.vouch'; 8const SKILL_COLLECTION = 'org.v-it.skill'; 9const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe'; 10const STREAM_DURATION_MS = 55_000; 11 12function beaconValue(value) { 13 return typeof value === 'string' && value.length > 0 ? value : null; 14} 15 16function incrementCapBeaconStatements(env, beacon) { 17 return [ 18 env.DB.prepare( 19 `INSERT INTO beacons (name, cap_count, last_activity) 20 VALUES (?, 1, datetime('now')) 21 ON CONFLICT(name) DO UPDATE SET 22 cap_count = cap_count + 1, 23 last_activity = datetime('now')`, 24 ).bind(beacon), 25 ]; 26} 27 28function incrementVouchBeaconStatements(env, beacon) { 29 return [ 30 env.DB.prepare( 31 `INSERT INTO beacons (name, vouch_count, last_activity) 32 VALUES (?, 1, datetime('now')) 33 ON CONFLICT(name) DO UPDATE SET 34 vouch_count = vouch_count + 1, 35 last_activity = datetime('now')`, 36 ).bind(beacon), 37 ]; 38} 39 40function decrementCapBeaconStatement(env, beacon) { 41 return env.DB.prepare( 42 `UPDATE beacons 43 SET cap_count = MAX(0, cap_count - 1), 44 last_activity = datetime('now') 45 WHERE name = ?`, 46 ).bind(beacon); 47} 48 49function decrementVouchBeaconStatement(env, beacon) { 50 return env.DB.prepare( 51 `UPDATE beacons 52 SET vouch_count = MAX(0, vouch_count - 1), 53 last_activity = datetime('now') 54 WHERE name = ?`, 55 ).bind(beacon); 56} 57 58async function processCapEvent(env, did, commit) { 59 const { operation, rkey, record, cid } = commit; 60 const uri = `at://${did}/${CAP_COLLECTION}/${rkey}`; 61 62 if (operation === 'create' || operation === 'update') { 63 const nextBeacon = beaconValue(record?.beacon); 64 const existing = await env.DB.prepare('SELECT beacon FROM caps WHERE did = ? AND rkey = ?') 65 .bind(did, rkey) 66 .first(); 67 const prevBeacon = beaconValue(existing?.beacon); 68 69 const capKind = typeof record?.kind === 'string' && record.kind.length > 0 ? record.kind : null; 70 71 const stmts = [ 72 env.DB.prepare( 73 `INSERT INTO caps (did, rkey, uri, cid, title, description, ref, beacon, kind, record_json, created_at) 74 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 75 ON CONFLICT(did, rkey) DO UPDATE SET 76 cid = excluded.cid, 77 title = excluded.title, 78 description = excluded.description, 79 ref = excluded.ref, 80 beacon = excluded.beacon, 81 kind = excluded.kind, 82 record_json = excluded.record_json, 83 created_at = excluded.created_at`, 84 ).bind( 85 did, 86 rkey, 87 uri, 88 cid ?? null, 89 record.title, 90 record.description || '', 91 record.ref, 92 nextBeacon, 93 capKind, 94 JSON.stringify(record), 95 record.createdAt, 96 ), 97 ]; 98 99 if (!existing && nextBeacon) { 100 stmts.push(...incrementCapBeaconStatements(env, nextBeacon)); 101 } else if (existing && prevBeacon !== nextBeacon) { 102 if (prevBeacon) { 103 stmts.push(decrementCapBeaconStatement(env, prevBeacon)); 104 } 105 if (nextBeacon) { 106 stmts.push(...incrementCapBeaconStatements(env, nextBeacon)); 107 } 108 } 109 110 await env.DB.batch(stmts); 111 return; 112 } 113 114 if (operation === 'delete') { 115 const existing = await env.DB.prepare('SELECT beacon FROM caps WHERE did = ? AND rkey = ?') 116 .bind(did, rkey) 117 .first(); 118 119 const stmts = [ 120 env.DB.prepare('DELETE FROM caps WHERE did = ? AND rkey = ?').bind(did, rkey), 121 ]; 122 123 const prevBeacon = beaconValue(existing?.beacon); 124 if (prevBeacon) { 125 stmts.unshift(decrementCapBeaconStatement(env, prevBeacon)); 126 } 127 128 await env.DB.batch(stmts); 129 } 130} 131 132async function processVouchEvent(env, did, commit) { 133 const { operation, rkey, record, cid } = commit; 134 const uri = `at://${did}/${VOUCH_COLLECTION}/${rkey}`; 135 136 if (operation === 'create' || operation === 'update') { 137 const nextBeacon = beaconValue(record?.beacon); 138 const existing = await env.DB.prepare('SELECT beacon FROM vouches WHERE did = ? AND rkey = ?') 139 .bind(did, rkey) 140 .first(); 141 const prevBeacon = beaconValue(existing?.beacon); 142 143 const vouchKind = typeof record?.kind === 'string' && record.kind.length > 0 ? record.kind : 'endorse'; 144 145 const stmts = [ 146 env.DB.prepare( 147 `INSERT INTO vouches (did, rkey, uri, cid, cap_uri, ref, beacon, kind, record_json, created_at) 148 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 149 ON CONFLICT(did, rkey) DO UPDATE SET 150 cid = excluded.cid, 151 cap_uri = excluded.cap_uri, 152 ref = excluded.ref, 153 beacon = excluded.beacon, 154 kind = excluded.kind, 155 record_json = excluded.record_json, 156 created_at = excluded.created_at`, 157 ).bind( 158 did, 159 rkey, 160 uri, 161 cid ?? null, 162 record.subject?.uri, 163 record.ref, 164 record.beacon ?? null, 165 vouchKind, 166 JSON.stringify(record), 167 record.createdAt, 168 ), 169 ]; 170 171 if (!existing && nextBeacon) { 172 stmts.push(...incrementVouchBeaconStatements(env, nextBeacon)); 173 } else if (existing && prevBeacon !== nextBeacon) { 174 if (prevBeacon) { 175 stmts.push(decrementVouchBeaconStatement(env, prevBeacon)); 176 } 177 if (nextBeacon) { 178 stmts.push(...incrementVouchBeaconStatements(env, nextBeacon)); 179 } 180 } 181 182 await env.DB.batch(stmts); 183 return; 184 } 185 186 if (operation === 'delete') { 187 const existing = await env.DB.prepare('SELECT beacon FROM vouches WHERE did = ? AND rkey = ?') 188 .bind(did, rkey) 189 .first(); 190 191 const stmts = [ 192 env.DB.prepare('DELETE FROM vouches WHERE did = ? AND rkey = ?').bind(did, rkey), 193 ]; 194 195 const prevBeacon = beaconValue(existing?.beacon); 196 if (prevBeacon) { 197 stmts.unshift(decrementVouchBeaconStatement(env, prevBeacon)); 198 } 199 200 await env.DB.batch(stmts); 201 } 202} 203 204async function processSkillEvent(env, did, commit) { 205 const { operation, rkey, record, cid } = commit; 206 const uri = `at://${did}/${SKILL_COLLECTION}/${rkey}`; 207 208 if (operation === 'create' || operation === 'update') { 209 await env.DB.batch([ 210 env.DB.prepare( 211 `INSERT INTO skills (did, rkey, uri, cid, name, description, ref, version, tags, record_json, created_at) 212 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 213 ON CONFLICT(did, rkey) DO UPDATE SET 214 cid = excluded.cid, 215 name = excluded.name, 216 description = excluded.description, 217 ref = excluded.ref, 218 version = excluded.version, 219 tags = excluded.tags, 220 record_json = excluded.record_json, 221 created_at = excluded.created_at`, 222 ).bind( 223 did, 224 rkey, 225 uri, 226 cid ?? null, 227 record.name, 228 record.description || '', 229 'skill-' + record.name, 230 record.version || null, 231 (record.tags || []).join(','), 232 JSON.stringify(record), 233 record.createdAt, 234 ), 235 ]); 236 return; 237 } 238 239 if (operation === 'delete') { 240 await env.DB.prepare('DELETE FROM skills WHERE did = ? AND rkey = ?') 241 .bind(did, rkey) 242 .run(); 243 } 244} 245 246export async function streamEvents(env, cursor) { 247 const url = new URL(JETSTREAM_URL); 248 url.searchParams.append('wantedCollections', CAP_COLLECTION); 249 url.searchParams.append('wantedCollections', VOUCH_COLLECTION); 250 url.searchParams.append('wantedCollections', SKILL_COLLECTION); 251 if (cursor) { 252 url.searchParams.set('cursor', cursor); 253 } 254 255 return await new Promise((resolve) => { 256 let latestCursor = cursor || null; 257 const newDids = new Set(); 258 const pending = new Set(); 259 const ws = new WebSocket(url.toString()); 260 261 const timeout = setTimeout(() => { 262 ws.close(); 263 }, STREAM_DURATION_MS); 264 265 const finish = async () => { 266 clearTimeout(timeout); 267 if (pending.size > 0) { 268 await Promise.allSettled([...pending]); 269 } 270 if (newDids.size > 0) { 271 await resolveHandles([...newDids], env); 272 } 273 resolve({ latestCursor }); 274 }; 275 276 ws.addEventListener('message', (event) => { 277 const task = (async () => { 278 let msg; 279 try { 280 msg = JSON.parse(event.data); 281 } catch { 282 return; 283 } 284 285 if (msg.kind !== 'commit') { 286 return; 287 } 288 289 if (msg.time_us) { 290 latestCursor = String(msg.time_us); 291 } 292 293 if (msg.did) { 294 newDids.add(msg.did); 295 } 296 297 const commit = msg.commit; 298 if (!commit) { 299 return; 300 } 301 302 if (commit.collection === CAP_COLLECTION) { 303 await processCapEvent(env, msg.did, commit); 304 } else if (commit.collection === VOUCH_COLLECTION) { 305 await processVouchEvent(env, msg.did, commit); 306 } else if (commit.collection === SKILL_COLLECTION) { 307 await processSkillEvent(env, msg.did, commit); 308 } 309 })(); 310 311 pending.add(task); 312 task.finally(() => pending.delete(task)); 313 }); 314 315 ws.addEventListener('close', () => { 316 void finish(); 317 }); 318 319 ws.addEventListener('error', () => { 320 ws.close(); 321 }); 322 }); 323}