a demonstration replicated social networking web app built with anproto wiredove.net/
social ed25519 protocols
at master 360 lines 10 kB view raw
1import { apds } from 'apds' 2import { render } from './render.js' 3import { noteReceived, registerNetworkSenders } from './network_queue.js' 4import { getModerationState, isBlockedAuthor } from './moderation.js' 5import { adaptiveConcurrency } from './adaptive_concurrency.js' 6import { perfMeasure, perfStart, perfEnd } from './perf.js' 7 8const pubs = new Set() 9const wsBackoff = new Map() 10const HTTP_POLL_INTERVAL_MS = 5000 11const RECENT_LATEST_WINDOW_MS = 24 * 60 * 60 * 1000 12const INCOMING_BATCH_CONCURRENCY = adaptiveConcurrency({ base: 6, min: 2, max: 10, type: 'network' }) 13const httpState = { 14 baseUrl: null, 15 ready: false, 16 pollTimer: null, 17 lastSince: 0 18} 19const DEFAULT_CONFIRM_TIMEOUT_MS = 12000 20const DEFAULT_CONFIRM_INTERVAL_MS = 600 21const DEFAULT_CONFIRM_LOOKBACK_MS = 2 * 60 * 1000 22 23let wsReadyResolver 24const createWsReadyPromise = () => new Promise(resolve => { 25 wsReadyResolver = resolve 26}) 27export let wsReady = createWsReadyPromise() 28 29const isWsOpen = (ws) => ws && ws.readyState === WebSocket.OPEN 30 31const safeWsSend = (ws, msg) => { 32 if (!isWsOpen(ws)) { return false } 33 try { 34 ws.send(msg) 35 return true 36 } catch (err) { 37 console.warn('ws send failed', err) 38 return false 39 } 40} 41 42const deliverWs = (msg) => { 43 pubs.forEach(pub => { 44 const sent = safeWsSend(pub, msg) 45 if (!sent && pub.readyState !== WebSocket.CONNECTING) { 46 pubs.delete(pub) 47 } 48 }) 49} 50 51const isHash = (msg) => typeof msg === 'string' && msg.length === 44 52const parseOpenedTimestamp = (opened) => { 53 if (typeof opened !== 'string' || opened.length < 13) { return 0 } 54 const ts = Number.parseInt(opened.substring(0, 13), 10) 55 return Number.isFinite(ts) ? ts : 0 56} 57 58const mapLimit = async (items, limit, worker) => { 59 if (!Array.isArray(items) || !items.length) { return [] } 60 const results = new Array(items.length) 61 let cursor = 0 62 const lanes = Math.max(1, Math.min(limit, items.length)) 63 const runLane = async () => { 64 while (true) { 65 const index = cursor 66 if (index >= items.length) { return } 67 cursor += 1 68 results[index] = await worker(items[index], index) 69 } 70 } 71 await Promise.all(Array.from({ length: lanes }, () => runLane())) 72 return results 73} 74 75const processIncomingBatch = async (messages) => { 76 if (!Array.isArray(messages) || !messages.length) { return } 77 const token = perfStart('net.batch.process') 78 const seen = new Set() 79 const deduped = messages.filter((msg) => { 80 if (typeof msg !== 'string' || !msg.length) { return false } 81 if (seen.has(msg)) { return false } 82 seen.add(msg) 83 return true 84 }) 85 await mapLimit(deduped, INCOMING_BATCH_CONCURRENCY, async (msg) => { 86 await handleIncoming(msg) 87 }) 88 perfEnd(token) 89} 90 91const handleIncoming = async (msg) => { 92 noteReceived(msg) 93 if (isHash(msg)) { 94 const blob = await apds.get(msg) 95 if (blob) { 96 if (pubs.size) { 97 deliverWs(blob) 98 } else { 99 await sendHttp(blob) 100 } 101 } 102 return 103 } 104 const author = msg.substring(0, 44) 105 if (await isBlockedAuthor(author)) { return } 106 await render.shouldWe(msg) 107 await apds.make(msg) 108 await apds.add(msg) 109 await render.blob(msg) 110} 111 112const toHttpBase = (wsUrl) => wsUrl.replace(/^ws:/, 'http:').replace(/^wss:/, 'https:') 113const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)) 114const normalizeSince = (value) => { 115 const parsed = Number(value) 116 if (!Number.isFinite(parsed)) { return 0 } 117 return Math.max(0, Math.floor(parsed)) 118} 119 120const pollHttpSince = async (since) => { 121 if (!httpState.ready || !httpState.baseUrl) { return null } 122 try { 123 const url = new URL('/gossip/poll', httpState.baseUrl) 124 url.searchParams.set('since', String(normalizeSince(since))) 125 const res = await perfMeasure('net.http.poll', async () => fetch(url.toString(), { cache: 'no-store' })) 126 if (!res.ok) { return { ok: false, status: res.status } } 127 const data = await res.json() 128 return { 129 ok: true, 130 messages: Array.isArray(data.messages) ? data.messages : [], 131 nextSince: Number.isFinite(data.nextSince) ? data.nextSince : null 132 } 133 } catch (err) { 134 console.warn('http gossip poll failed', err) 135 return { ok: false, error: err } 136 } 137} 138 139const scheduleHttpPoll = () => { 140 if (httpState.pollTimer) { return } 141 httpState.pollTimer = setTimeout(pollHttp, HTTP_POLL_INTERVAL_MS) 142} 143 144const pollHttp = async () => { 145 httpState.pollTimer = null 146 if (!httpState.ready || pubs.size) { 147 scheduleHttpPoll() 148 return 149 } 150 try { 151 const polled = await pollHttpSince(httpState.lastSince) 152 if (polled?.ok) { 153 await processIncomingBatch(polled.messages) 154 if (Number.isFinite(polled.nextSince)) { 155 httpState.lastSince = Math.max(httpState.lastSince, polled.nextSince) 156 } 157 } 158 } finally { 159 scheduleHttpPoll() 160 } 161} 162 163const sendHttp = async (msg) => { 164 if (!httpState.ready) { return } 165 try { 166 const url = new URL('/gossip', httpState.baseUrl) 167 const res = await perfMeasure('net.http.send', async () => fetch(url.toString(), { 168 method: 'POST', 169 headers: { 'Content-Type': 'text/plain' }, 170 body: msg 171 })) 172 if (!res.ok) { return } 173 const data = await res.json() 174 const messages = Array.isArray(data.messages) ? data.messages : [] 175 await processIncomingBatch(messages) 176 } catch (err) { 177 console.warn('http gossip send failed', err) 178 } 179} 180 181export const startHttpGossip = async (baseUrl) => { 182 if (httpState.ready) { return } 183 httpState.baseUrl = baseUrl 184 httpState.ready = true 185 try { 186 const q = await apds.query() 187 if (q && q.length) { 188 let ts = 0 189 q.forEach((entry) => { 190 const value = Number.parseInt(entry?.ts || '0', 10) 191 if (Number.isFinite(value) && value > ts) { ts = value } 192 }) 193 if (Number.isFinite(ts)) { 194 httpState.lastSince = ts 195 } 196 } 197 } catch (err) { 198 console.warn('http gossip seed failed', err) 199 } 200 void pollHttp() 201 scheduleHttpPoll() 202} 203 204export const sendWs = async (msg) => { 205 if (pubs.size) { 206 deliverWs(msg) 207 } else { 208 await sendHttp(msg) 209 } 210} 211 212export const hasWs = () => pubs.size > 0 || httpState.ready 213 214export const confirmMessagesPersisted = async (messages, options = {}) => { 215 const targets = Array.from(new Set((messages || []).filter((msg) => typeof msg === 'string' && msg.length))) 216 if (!targets.length) { return { ok: true, missing: [], attempts: 0 } } 217 if (!httpState.ready || !httpState.baseUrl) { 218 return { ok: false, missing: targets, attempts: 0, reason: 'unconfirmed' } 219 } 220 221 const timeoutMs = Number.isFinite(options.timeoutMs) ? options.timeoutMs : DEFAULT_CONFIRM_TIMEOUT_MS 222 const intervalMs = Number.isFinite(options.intervalMs) ? options.intervalMs : DEFAULT_CONFIRM_INTERVAL_MS 223 const lookbackMs = Number.isFinite(options.lookbackMs) ? options.lookbackMs : DEFAULT_CONFIRM_LOOKBACK_MS 224 const deadline = Date.now() + Math.max(500, timeoutMs) 225 const pending = new Set(targets) 226 let cursor = normalizeSince(options.since ?? (Date.now() - lookbackMs)) 227 let attempts = 0 228 229 while (Date.now() <= deadline && pending.size) { 230 attempts += 1 231 const polled = await pollHttpSince(cursor) 232 if (polled?.ok) { 233 polled.messages.forEach((msg) => pending.delete(msg)) 234 if (Number.isFinite(polled.nextSince)) { 235 cursor = Math.max(cursor, normalizeSince(polled.nextSince)) 236 } 237 if (!pending.size) { 238 return { ok: true, missing: [], attempts } 239 } 240 } 241 if (Date.now() + intervalMs > deadline) { break } 242 await sleep(intervalMs) 243 } 244 245 return { ok: false, missing: Array.from(pending), attempts, reason: 'unconfirmed' } 246} 247 248registerNetworkSenders({ 249 sendWs, 250 hasWs 251}) 252 253export const makeWs = async (pub) => { 254 const httpBase = toHttpBase(pub) 255 await startHttpGossip(httpBase) 256 257 const getBackoff = () => { 258 let state = wsBackoff.get(pub) 259 if (!state) { 260 state = { delayMs: 1000, timer: null } 261 wsBackoff.set(pub, state) 262 } 263 return state 264 } 265 266 const scheduleReconnect = () => { 267 const state = getBackoff() 268 if (state.timer) return 269 state.timer = setTimeout(() => { 270 state.timer = null 271 connectWs() 272 state.delayMs *= 2 273 }, state.delayMs) 274 } 275 276 const resetBackoff = () => { 277 const state = getBackoff() 278 if (state.timer) { 279 clearTimeout(state.timer) 280 state.timer = null 281 } 282 state.delayMs = 1000 283 } 284 285 const connectWs = () => { 286 const ws = new WebSocket(pub) 287 288 ws.onopen = async () => { 289 console.log('OPEN') 290 pubs.add(ws) 291 resetBackoff() 292 wsReadyResolver?.() 293 const now = Date.now() 294 let pubkeys = [] 295 try { 296 const next = await apds.getPubkeys() 297 pubkeys = Array.isArray(next) ? next : [] 298 } catch (err) { 299 console.warn('getPubkeys failed', err) 300 pubkeys = [] 301 } 302 const moderation = await getModerationState() 303 const blocked = new Set(moderation.blockedAuthors || []) 304 const announceable = pubkeys.filter((pub) => !blocked.has(pub)) 305 await mapLimit(announceable, INCOMING_BATCH_CONCURRENCY, async (pub) => { 306 if (!safeWsSend(ws, pub)) { return } 307 const latest = await apds.getLatest(pub) 308 if (!latest) { return } 309 const openedTs = parseOpenedTimestamp(latest.opened) 310 if (!openedTs || now - openedTs > RECENT_LATEST_WINDOW_MS) { return } 311 if (!latest.sig) { return } 312 safeWsSend(ws, latest.sig) 313 }) 314 //below sends everything in the client to a dovepub pds server 315 //const log = await apds.query() 316 //if (log) { 317 // const ar = [] 318 // for (const msg of log) { 319 // ws.send(msg.sig) 320 // if (msg.text) { 321 // ws.send(msg.text) 322 // const yaml = await apds.parseYaml(msg.text) 323 // //console.log(yaml) 324 // if (yaml.image && !ar.includes(yaml.image)) { 325 // const get = await apds.get(yaml.image) 326 // if (get) { 327 // ws.send(get) 328 // ar.push(yaml.image) 329 // } 330 // } 331 // } 332 // if (!msg.text) { 333 // const get = await apds.get(msg.opened.substring(13)) 334 // if (get) {ws.send(get)} 335 // } 336 // } 337 //} 338 } 339 340 ws.onmessage = async (m) => { 341 await handleIncoming(m.data) 342 } 343 344 ws.onerror = () => { 345 scheduleReconnect() 346 } 347 348 ws.onclose = async () => { 349 console.log('CLOSED') 350 pubs.delete(ws) 351 if (!pubs.size) { 352 wsReady = createWsReadyPromise() 353 } 354 scheduleReconnect() 355 } 356 } 357 358 connectWs() 359 return wsReady 360}