a demonstration replicated social networking web app built with anproto
wiredove.net/
social
ed25519
protocols
1import { apds } from 'apds'
2import { render } from './render.js'
3import { noteReceived, registerNetworkSenders } from './network_queue.js'
4import { getModerationState, isBlockedAuthor } from './moderation.js'
5import { adaptiveConcurrency } from './adaptive_concurrency.js'
6import { perfMeasure, perfStart, perfEnd } from './perf.js'
7
8const pubs = new Set()
9const wsBackoff = new Map()
10const HTTP_POLL_INTERVAL_MS = 5000
11const RECENT_LATEST_WINDOW_MS = 24 * 60 * 60 * 1000
12const INCOMING_BATCH_CONCURRENCY = adaptiveConcurrency({ base: 6, min: 2, max: 10, type: 'network' })
13const httpState = {
14 baseUrl: null,
15 ready: false,
16 pollTimer: null,
17 lastSince: 0
18}
19const DEFAULT_CONFIRM_TIMEOUT_MS = 12000
20const DEFAULT_CONFIRM_INTERVAL_MS = 600
21const DEFAULT_CONFIRM_LOOKBACK_MS = 2 * 60 * 1000
22
23let wsReadyResolver
24const createWsReadyPromise = () => new Promise(resolve => {
25 wsReadyResolver = resolve
26})
27export let wsReady = createWsReadyPromise()
28
29const isWsOpen = (ws) => ws && ws.readyState === WebSocket.OPEN
30
31const safeWsSend = (ws, msg) => {
32 if (!isWsOpen(ws)) { return false }
33 try {
34 ws.send(msg)
35 return true
36 } catch (err) {
37 console.warn('ws send failed', err)
38 return false
39 }
40}
41
42const deliverWs = (msg) => {
43 pubs.forEach(pub => {
44 const sent = safeWsSend(pub, msg)
45 if (!sent && pub.readyState !== WebSocket.CONNECTING) {
46 pubs.delete(pub)
47 }
48 })
49}
50
51const isHash = (msg) => typeof msg === 'string' && msg.length === 44
52const parseOpenedTimestamp = (opened) => {
53 if (typeof opened !== 'string' || opened.length < 13) { return 0 }
54 const ts = Number.parseInt(opened.substring(0, 13), 10)
55 return Number.isFinite(ts) ? ts : 0
56}
57
58const mapLimit = async (items, limit, worker) => {
59 if (!Array.isArray(items) || !items.length) { return [] }
60 const results = new Array(items.length)
61 let cursor = 0
62 const lanes = Math.max(1, Math.min(limit, items.length))
63 const runLane = async () => {
64 while (true) {
65 const index = cursor
66 if (index >= items.length) { return }
67 cursor += 1
68 results[index] = await worker(items[index], index)
69 }
70 }
71 await Promise.all(Array.from({ length: lanes }, () => runLane()))
72 return results
73}
74
75const processIncomingBatch = async (messages) => {
76 if (!Array.isArray(messages) || !messages.length) { return }
77 const token = perfStart('net.batch.process')
78 const seen = new Set()
79 const deduped = messages.filter((msg) => {
80 if (typeof msg !== 'string' || !msg.length) { return false }
81 if (seen.has(msg)) { return false }
82 seen.add(msg)
83 return true
84 })
85 await mapLimit(deduped, INCOMING_BATCH_CONCURRENCY, async (msg) => {
86 await handleIncoming(msg)
87 })
88 perfEnd(token)
89}
90
91const handleIncoming = async (msg) => {
92 noteReceived(msg)
93 if (isHash(msg)) {
94 const blob = await apds.get(msg)
95 if (blob) {
96 if (pubs.size) {
97 deliverWs(blob)
98 } else {
99 await sendHttp(blob)
100 }
101 }
102 return
103 }
104 const author = msg.substring(0, 44)
105 if (await isBlockedAuthor(author)) { return }
106 await render.shouldWe(msg)
107 await apds.make(msg)
108 await apds.add(msg)
109 await render.blob(msg)
110}
111
112const toHttpBase = (wsUrl) => wsUrl.replace(/^ws:/, 'http:').replace(/^wss:/, 'https:')
113const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))
114const normalizeSince = (value) => {
115 const parsed = Number(value)
116 if (!Number.isFinite(parsed)) { return 0 }
117 return Math.max(0, Math.floor(parsed))
118}
119
120const pollHttpSince = async (since) => {
121 if (!httpState.ready || !httpState.baseUrl) { return null }
122 try {
123 const url = new URL('/gossip/poll', httpState.baseUrl)
124 url.searchParams.set('since', String(normalizeSince(since)))
125 const res = await perfMeasure('net.http.poll', async () => fetch(url.toString(), { cache: 'no-store' }))
126 if (!res.ok) { return { ok: false, status: res.status } }
127 const data = await res.json()
128 return {
129 ok: true,
130 messages: Array.isArray(data.messages) ? data.messages : [],
131 nextSince: Number.isFinite(data.nextSince) ? data.nextSince : null
132 }
133 } catch (err) {
134 console.warn('http gossip poll failed', err)
135 return { ok: false, error: err }
136 }
137}
138
139const scheduleHttpPoll = () => {
140 if (httpState.pollTimer) { return }
141 httpState.pollTimer = setTimeout(pollHttp, HTTP_POLL_INTERVAL_MS)
142}
143
144const pollHttp = async () => {
145 httpState.pollTimer = null
146 if (!httpState.ready || pubs.size) {
147 scheduleHttpPoll()
148 return
149 }
150 try {
151 const polled = await pollHttpSince(httpState.lastSince)
152 if (polled?.ok) {
153 await processIncomingBatch(polled.messages)
154 if (Number.isFinite(polled.nextSince)) {
155 httpState.lastSince = Math.max(httpState.lastSince, polled.nextSince)
156 }
157 }
158 } finally {
159 scheduleHttpPoll()
160 }
161}
162
163const sendHttp = async (msg) => {
164 if (!httpState.ready) { return }
165 try {
166 const url = new URL('/gossip', httpState.baseUrl)
167 const res = await perfMeasure('net.http.send', async () => fetch(url.toString(), {
168 method: 'POST',
169 headers: { 'Content-Type': 'text/plain' },
170 body: msg
171 }))
172 if (!res.ok) { return }
173 const data = await res.json()
174 const messages = Array.isArray(data.messages) ? data.messages : []
175 await processIncomingBatch(messages)
176 } catch (err) {
177 console.warn('http gossip send failed', err)
178 }
179}
180
181export const startHttpGossip = async (baseUrl) => {
182 if (httpState.ready) { return }
183 httpState.baseUrl = baseUrl
184 httpState.ready = true
185 try {
186 const q = await apds.query()
187 if (q && q.length) {
188 let ts = 0
189 q.forEach((entry) => {
190 const value = Number.parseInt(entry?.ts || '0', 10)
191 if (Number.isFinite(value) && value > ts) { ts = value }
192 })
193 if (Number.isFinite(ts)) {
194 httpState.lastSince = ts
195 }
196 }
197 } catch (err) {
198 console.warn('http gossip seed failed', err)
199 }
200 void pollHttp()
201 scheduleHttpPoll()
202}
203
204export const sendWs = async (msg) => {
205 if (pubs.size) {
206 deliverWs(msg)
207 } else {
208 await sendHttp(msg)
209 }
210}
211
212export const hasWs = () => pubs.size > 0 || httpState.ready
213
214export const confirmMessagesPersisted = async (messages, options = {}) => {
215 const targets = Array.from(new Set((messages || []).filter((msg) => typeof msg === 'string' && msg.length)))
216 if (!targets.length) { return { ok: true, missing: [], attempts: 0 } }
217 if (!httpState.ready || !httpState.baseUrl) {
218 return { ok: false, missing: targets, attempts: 0, reason: 'unconfirmed' }
219 }
220
221 const timeoutMs = Number.isFinite(options.timeoutMs) ? options.timeoutMs : DEFAULT_CONFIRM_TIMEOUT_MS
222 const intervalMs = Number.isFinite(options.intervalMs) ? options.intervalMs : DEFAULT_CONFIRM_INTERVAL_MS
223 const lookbackMs = Number.isFinite(options.lookbackMs) ? options.lookbackMs : DEFAULT_CONFIRM_LOOKBACK_MS
224 const deadline = Date.now() + Math.max(500, timeoutMs)
225 const pending = new Set(targets)
226 let cursor = normalizeSince(options.since ?? (Date.now() - lookbackMs))
227 let attempts = 0
228
229 while (Date.now() <= deadline && pending.size) {
230 attempts += 1
231 const polled = await pollHttpSince(cursor)
232 if (polled?.ok) {
233 polled.messages.forEach((msg) => pending.delete(msg))
234 if (Number.isFinite(polled.nextSince)) {
235 cursor = Math.max(cursor, normalizeSince(polled.nextSince))
236 }
237 if (!pending.size) {
238 return { ok: true, missing: [], attempts }
239 }
240 }
241 if (Date.now() + intervalMs > deadline) { break }
242 await sleep(intervalMs)
243 }
244
245 return { ok: false, missing: Array.from(pending), attempts, reason: 'unconfirmed' }
246}
247
248registerNetworkSenders({
249 sendWs,
250 hasWs
251})
252
253export const makeWs = async (pub) => {
254 const httpBase = toHttpBase(pub)
255 await startHttpGossip(httpBase)
256
257 const getBackoff = () => {
258 let state = wsBackoff.get(pub)
259 if (!state) {
260 state = { delayMs: 1000, timer: null }
261 wsBackoff.set(pub, state)
262 }
263 return state
264 }
265
266 const scheduleReconnect = () => {
267 const state = getBackoff()
268 if (state.timer) return
269 state.timer = setTimeout(() => {
270 state.timer = null
271 connectWs()
272 state.delayMs *= 2
273 }, state.delayMs)
274 }
275
276 const resetBackoff = () => {
277 const state = getBackoff()
278 if (state.timer) {
279 clearTimeout(state.timer)
280 state.timer = null
281 }
282 state.delayMs = 1000
283 }
284
285 const connectWs = () => {
286 const ws = new WebSocket(pub)
287
288 ws.onopen = async () => {
289 console.log('OPEN')
290 pubs.add(ws)
291 resetBackoff()
292 wsReadyResolver?.()
293 const now = Date.now()
294 let pubkeys = []
295 try {
296 const next = await apds.getPubkeys()
297 pubkeys = Array.isArray(next) ? next : []
298 } catch (err) {
299 console.warn('getPubkeys failed', err)
300 pubkeys = []
301 }
302 const moderation = await getModerationState()
303 const blocked = new Set(moderation.blockedAuthors || [])
304 const announceable = pubkeys.filter((pub) => !blocked.has(pub))
305 await mapLimit(announceable, INCOMING_BATCH_CONCURRENCY, async (pub) => {
306 if (!safeWsSend(ws, pub)) { return }
307 const latest = await apds.getLatest(pub)
308 if (!latest) { return }
309 const openedTs = parseOpenedTimestamp(latest.opened)
310 if (!openedTs || now - openedTs > RECENT_LATEST_WINDOW_MS) { return }
311 if (!latest.sig) { return }
312 safeWsSend(ws, latest.sig)
313 })
314 //below sends everything in the client to a dovepub pds server
315 //const log = await apds.query()
316 //if (log) {
317 // const ar = []
318 // for (const msg of log) {
319 // ws.send(msg.sig)
320 // if (msg.text) {
321 // ws.send(msg.text)
322 // const yaml = await apds.parseYaml(msg.text)
323 // //console.log(yaml)
324 // if (yaml.image && !ar.includes(yaml.image)) {
325 // const get = await apds.get(yaml.image)
326 // if (get) {
327 // ws.send(get)
328 // ar.push(yaml.image)
329 // }
330 // }
331 // }
332 // if (!msg.text) {
333 // const get = await apds.get(msg.opened.substring(13))
334 // if (get) {ws.send(get)}
335 // }
336 // }
337 //}
338 }
339
340 ws.onmessage = async (m) => {
341 await handleIncoming(m.data)
342 }
343
344 ws.onerror = () => {
345 scheduleReconnect()
346 }
347
348 ws.onclose = async () => {
349 console.log('CLOSED')
350 pubs.delete(ws)
351 if (!pubs.size) {
352 wsReady = createWsReadyPromise()
353 }
354 scheduleReconnect()
355 }
356 }
357
358 connectWs()
359 return wsReady
360}