open source is social v-it.org
1// SPDX-License-Identifier: MIT
2// Copyright (c) 2026 sol pbc
3
4import { resolveHandles } from './resolve.js';
5
6const CAP_COLLECTION = 'org.v-it.cap';
7const VOUCH_COLLECTION = 'org.v-it.vouch';
8const SKILL_COLLECTION = 'org.v-it.skill';
9const JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe';
10const STREAM_DURATION_MS = 55_000;
11
12function beaconValue(value) {
13 return typeof value === 'string' && value.length > 0 ? value : null;
14}
15
16function incrementCapBeaconStatements(env, beacon) {
17 return [
18 env.DB.prepare(
19 `INSERT INTO beacons (name, cap_count, last_activity)
20 VALUES (?, 1, datetime('now'))
21 ON CONFLICT(name) DO UPDATE SET
22 cap_count = cap_count + 1,
23 last_activity = datetime('now')`,
24 ).bind(beacon),
25 ];
26}
27
28function incrementVouchBeaconStatements(env, beacon) {
29 return [
30 env.DB.prepare(
31 `INSERT INTO beacons (name, vouch_count, last_activity)
32 VALUES (?, 1, datetime('now'))
33 ON CONFLICT(name) DO UPDATE SET
34 vouch_count = vouch_count + 1,
35 last_activity = datetime('now')`,
36 ).bind(beacon),
37 ];
38}
39
40function decrementCapBeaconStatement(env, beacon) {
41 return env.DB.prepare(
42 `UPDATE beacons
43 SET cap_count = MAX(0, cap_count - 1),
44 last_activity = datetime('now')
45 WHERE name = ?`,
46 ).bind(beacon);
47}
48
49function decrementVouchBeaconStatement(env, beacon) {
50 return env.DB.prepare(
51 `UPDATE beacons
52 SET vouch_count = MAX(0, vouch_count - 1),
53 last_activity = datetime('now')
54 WHERE name = ?`,
55 ).bind(beacon);
56}
57
58async function processCapEvent(env, did, commit) {
59 const { operation, rkey, record, cid } = commit;
60 const uri = `at://${did}/${CAP_COLLECTION}/${rkey}`;
61
62 if (operation === 'create' || operation === 'update') {
63 const nextBeacon = beaconValue(record?.beacon);
64 const existing = await env.DB.prepare('SELECT beacon FROM caps WHERE did = ? AND rkey = ?')
65 .bind(did, rkey)
66 .first();
67 const prevBeacon = beaconValue(existing?.beacon);
68
69 const capKind = typeof record?.kind === 'string' && record.kind.length > 0 ? record.kind : null;
70
71 const stmts = [
72 env.DB.prepare(
73 `INSERT INTO caps (did, rkey, uri, cid, title, description, ref, beacon, kind, record_json, created_at)
74 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
75 ON CONFLICT(did, rkey) DO UPDATE SET
76 cid = excluded.cid,
77 title = excluded.title,
78 description = excluded.description,
79 ref = excluded.ref,
80 beacon = excluded.beacon,
81 kind = excluded.kind,
82 record_json = excluded.record_json,
83 created_at = excluded.created_at`,
84 ).bind(
85 did,
86 rkey,
87 uri,
88 cid ?? null,
89 record.title,
90 record.description || '',
91 record.ref,
92 nextBeacon,
93 capKind,
94 JSON.stringify(record),
95 record.createdAt,
96 ),
97 ];
98
99 if (!existing && nextBeacon) {
100 stmts.push(...incrementCapBeaconStatements(env, nextBeacon));
101 } else if (existing && prevBeacon !== nextBeacon) {
102 if (prevBeacon) {
103 stmts.push(decrementCapBeaconStatement(env, prevBeacon));
104 }
105 if (nextBeacon) {
106 stmts.push(...incrementCapBeaconStatements(env, nextBeacon));
107 }
108 }
109
110 await env.DB.batch(stmts);
111 return;
112 }
113
114 if (operation === 'delete') {
115 const existing = await env.DB.prepare('SELECT beacon FROM caps WHERE did = ? AND rkey = ?')
116 .bind(did, rkey)
117 .first();
118
119 const stmts = [
120 env.DB.prepare('DELETE FROM caps WHERE did = ? AND rkey = ?').bind(did, rkey),
121 ];
122
123 const prevBeacon = beaconValue(existing?.beacon);
124 if (prevBeacon) {
125 stmts.unshift(decrementCapBeaconStatement(env, prevBeacon));
126 }
127
128 await env.DB.batch(stmts);
129 }
130}
131
132async function processVouchEvent(env, did, commit) {
133 const { operation, rkey, record, cid } = commit;
134 const uri = `at://${did}/${VOUCH_COLLECTION}/${rkey}`;
135
136 if (operation === 'create' || operation === 'update') {
137 const nextBeacon = beaconValue(record?.beacon);
138 const existing = await env.DB.prepare('SELECT beacon FROM vouches WHERE did = ? AND rkey = ?')
139 .bind(did, rkey)
140 .first();
141 const prevBeacon = beaconValue(existing?.beacon);
142
143 const vouchKind = typeof record?.kind === 'string' && record.kind.length > 0 ? record.kind : 'endorse';
144
145 const stmts = [
146 env.DB.prepare(
147 `INSERT INTO vouches (did, rkey, uri, cid, cap_uri, ref, beacon, kind, record_json, created_at)
148 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
149 ON CONFLICT(did, rkey) DO UPDATE SET
150 cid = excluded.cid,
151 cap_uri = excluded.cap_uri,
152 ref = excluded.ref,
153 beacon = excluded.beacon,
154 kind = excluded.kind,
155 record_json = excluded.record_json,
156 created_at = excluded.created_at`,
157 ).bind(
158 did,
159 rkey,
160 uri,
161 cid ?? null,
162 record.subject?.uri,
163 record.ref,
164 record.beacon ?? null,
165 vouchKind,
166 JSON.stringify(record),
167 record.createdAt,
168 ),
169 ];
170
171 if (!existing && nextBeacon) {
172 stmts.push(...incrementVouchBeaconStatements(env, nextBeacon));
173 } else if (existing && prevBeacon !== nextBeacon) {
174 if (prevBeacon) {
175 stmts.push(decrementVouchBeaconStatement(env, prevBeacon));
176 }
177 if (nextBeacon) {
178 stmts.push(...incrementVouchBeaconStatements(env, nextBeacon));
179 }
180 }
181
182 await env.DB.batch(stmts);
183 return;
184 }
185
186 if (operation === 'delete') {
187 const existing = await env.DB.prepare('SELECT beacon FROM vouches WHERE did = ? AND rkey = ?')
188 .bind(did, rkey)
189 .first();
190
191 const stmts = [
192 env.DB.prepare('DELETE FROM vouches WHERE did = ? AND rkey = ?').bind(did, rkey),
193 ];
194
195 const prevBeacon = beaconValue(existing?.beacon);
196 if (prevBeacon) {
197 stmts.unshift(decrementVouchBeaconStatement(env, prevBeacon));
198 }
199
200 await env.DB.batch(stmts);
201 }
202}
203
204async function processSkillEvent(env, did, commit) {
205 const { operation, rkey, record, cid } = commit;
206 const uri = `at://${did}/${SKILL_COLLECTION}/${rkey}`;
207
208 if (operation === 'create' || operation === 'update') {
209 await env.DB.batch([
210 env.DB.prepare(
211 `INSERT INTO skills (did, rkey, uri, cid, name, description, ref, version, tags, record_json, created_at)
212 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
213 ON CONFLICT(did, rkey) DO UPDATE SET
214 cid = excluded.cid,
215 name = excluded.name,
216 description = excluded.description,
217 ref = excluded.ref,
218 version = excluded.version,
219 tags = excluded.tags,
220 record_json = excluded.record_json,
221 created_at = excluded.created_at`,
222 ).bind(
223 did,
224 rkey,
225 uri,
226 cid ?? null,
227 record.name,
228 record.description || '',
229 'skill-' + record.name,
230 record.version || null,
231 (record.tags || []).join(','),
232 JSON.stringify(record),
233 record.createdAt,
234 ),
235 ]);
236 return;
237 }
238
239 if (operation === 'delete') {
240 await env.DB.prepare('DELETE FROM skills WHERE did = ? AND rkey = ?')
241 .bind(did, rkey)
242 .run();
243 }
244}
245
246export async function streamEvents(env, cursor) {
247 const url = new URL(JETSTREAM_URL);
248 url.searchParams.append('wantedCollections', CAP_COLLECTION);
249 url.searchParams.append('wantedCollections', VOUCH_COLLECTION);
250 url.searchParams.append('wantedCollections', SKILL_COLLECTION);
251 if (cursor) {
252 url.searchParams.set('cursor', cursor);
253 }
254
255 return await new Promise((resolve) => {
256 let latestCursor = cursor || null;
257 const newDids = new Set();
258 const pending = new Set();
259 const ws = new WebSocket(url.toString());
260
261 const timeout = setTimeout(() => {
262 ws.close();
263 }, STREAM_DURATION_MS);
264
265 const finish = async () => {
266 clearTimeout(timeout);
267 if (pending.size > 0) {
268 await Promise.allSettled([...pending]);
269 }
270 if (newDids.size > 0) {
271 await resolveHandles([...newDids], env);
272 }
273 resolve({ latestCursor });
274 };
275
276 ws.addEventListener('message', (event) => {
277 const task = (async () => {
278 let msg;
279 try {
280 msg = JSON.parse(event.data);
281 } catch {
282 return;
283 }
284
285 if (msg.kind !== 'commit') {
286 return;
287 }
288
289 if (msg.time_us) {
290 latestCursor = String(msg.time_us);
291 }
292
293 if (msg.did) {
294 newDids.add(msg.did);
295 }
296
297 const commit = msg.commit;
298 if (!commit) {
299 return;
300 }
301
302 if (commit.collection === CAP_COLLECTION) {
303 await processCapEvent(env, msg.did, commit);
304 } else if (commit.collection === VOUCH_COLLECTION) {
305 await processVouchEvent(env, msg.did, commit);
306 } else if (commit.collection === SKILL_COLLECTION) {
307 await processSkillEvent(env, msg.did, commit);
308 }
309 })();
310
311 pending.add(task);
312 task.finally(() => pending.delete(task));
313 });
314
315 ws.addEventListener('close', () => {
316 void finish();
317 });
318
319 ws.addEventListener('error', () => {
320 ws.close();
321 });
322 });
323}