at main 5.8 kB view raw
1import { default as lexicons, getBits } from 'lexicons'; 2import psl from 'psl'; 3import webpush from 'web-push'; 4import WebSocket from 'ws'; 5 6// kind of silly but right now there's no way to tell spacedust that we want an alive connection 7// but don't want the notification firehose (everything filtered out) 8// so... the final filter is an absolute on this fake did, effectively filtering all notifs. 9// (this is only used when there are no subscribers registered) 10const DUMMY_DID = 'did:plc:zzzzzzzzzzzzzzzzzzzzzzzz'; 11 12let spacedust; 13let spacedustEverStarted = false; 14 15const updateSubs = db => { 16 if (!spacedust) { 17 console.warn('not updating subscription, no spacedust (reconnecting?)'); 18 return; 19 } 20 const wantedSubjectDids = db.getSubscribedDids(); 21 if (wantedSubjectDids.length === 0) { 22 wantedSubjectDids.push(DUMMY_DID); 23 } 24 console.log('updating for wantedSubjectDids', wantedSubjectDids); 25 spacedust.send(JSON.stringify({ 26 type: 'options_update', 27 payload: { 28 wantedSubjectDids, 29 }, 30 })); 31}; 32 33const push = async (db, pushSubscription, payload) => { 34 const { session, subscription, since_last_push } = pushSubscription; 35 if (since_last_push !== null && since_last_push < 1.618) { 36 console.warn(`rate limiter: dropping too-soon push (${since_last_push})`); 37 return; 38 } 39 40 let sub; 41 try { 42 sub = JSON.parse(subscription); 43 } catch (e) { 44 console.error('failed to parse subscription json, dropping session', e); 45 db.deleteSub(session); 46 return; 47 } 48 49 try { 50 await webpush.sendNotification(sub, payload); 51 } catch (err) { 52 if (400 <= err.statusCode && err.statusCode < 500) { 53 console.info(`removing sub for ${err.statusCode}`); 54 db.deleteSub(session); 55 return; 56 } else { 57 console.warn('something went wrong for another reason', err); 58 } 59 } 60 61 db.updateLastPush(session); 62}; 63 64const isTorment = source => { 65 try { 66 const [nsid, ...rp] = source.split(':'); 67 68 let parts = nsid.split('.'); 69 parts.reverse(); 70 parts = parts.join('.'); 71 72 const app = psl.parse(parts)?.domain ?? 'unknown'; 73 74 let appPrefix = app.split('.'); 75 appPrefix.reverse(); 76 appPrefix = appPrefix.join('.') 77 78 return source.slice(app.length + 1) in lexicons[appPrefix]?.torment_sources; 79 } catch (e) { 80 console.error('checking tormentedness failed, allowing through', e); 81 return false; 82 } 83}; 84 85const extractUriDid = at_uri => { 86 if (!at_uri.startsWith('at://')) { 87 console.warn(`ignoring non-at-uri: ${at_uri}`); 88 return null; 89 } 90 const [id, ..._] = at_uri.slice('at://'.length).split('/'); 91 if (!id) { 92 console.warn(`ignoring at-uri with missing id segment: ${at_uri}`); 93 return null; 94 } 95 if (id.startsWith('@')) { 96 console.warn(`ignoring @handle at-uri: ${at_uri}`); 97 return null; 98 } 99 if (!id.startsWith('did:')) { 100 console.warn(`ignoring non-did at-uri: ${at_uri}`); 101 return null; 102 } 103 return id; 104}; 105 106const handleDust = db => async event => { 107 console.log('got', event.data); 108 let data; 109 try { 110 data = JSON.parse(event.data); 111 } catch (err) { 112 console.error(err); 113 return; 114 } 115 const { link: { subject, source, source_record } } = data; 116 if (isTorment(source)) { 117 console.log('nope! not today,', source); 118 return; 119 } 120 const timestamp = +new Date(); 121 122 const did = subject.startsWith('did:') ? subject : extractUriDid(subject); 123 if (!did) { 124 console.warn(`ignoring link with non-DID subject: ${subject}`) 125 return; 126 } 127 128 // this works for now since only the account owner is assumed to be a notification target 129 // but for "replies on post" etc that won't hold 130 const { notify_enabled, notify_self } = db.getNotifyAccountGlobals(did); 131 if (!notify_enabled) { 132 console.warn('dropping notification for global not-enabled setting'); 133 return; 134 } 135 if (!notify_self) { 136 const source_did = extractUriDid(source_record); 137 if (!source_did) { 138 console.warn(`ignoring link with non-DID source_record: ${source_record}`) 139 return; 140 } 141 if (source_did === did) { 142 console.warn(`ignoring self-notification`); 143 return; 144 } 145 } 146 147 // like above, this over-assumes that did is the only recipient we could care about for now 148 const { app, group } = getBits(source); 149 for (const [selector, selection] of [ 150 ['source', source], 151 ['group', group], 152 ['app', app], 153 ]) { 154 const notify = db.getNotificationFilter(did, selector, selection); 155 if (notify === true) { 156 console.info(`explicitly allowing notification by filter for ${selector}=${selection}`); 157 break; 158 }; 159 if (notify === false) { 160 console.warn(`ignoring filtered notification for ${selector}=${selection}`); 161 return; 162 } 163 } 164 165 const subs = db.getSubsByDid(did); 166 const payload = JSON.stringify({ subject, source, source_record, timestamp }); 167 try { 168 await Promise.all(subs.map(pushSubscription => push(db, pushSubscription, payload))); 169 } catch (e) { 170 console.warn('at least one notification send failed', e); 171 } 172}; 173 174export const connectSpacedust = (db, host) => { 175 spacedust = new WebSocket(`${host}/subscribe?instant=true&wantedSubjectDids=${DUMMY_DID}`); 176 let restarting = false; 177 178 const restart = () => { 179 if (restarting) return; 180 restarting = true; 181 let wait = Math.round(500 + (Math.random() * 1000)); 182 console.info(`restarting spacedust connection in ${wait}ms...`); 183 setTimeout(() => connectSpacedust(db, host), wait); 184 spacedust = null; 185 } 186 187 spacedust.onopen = () => updateSubs(db); 188 spacedust.onmessage = handleDust(db); 189 190 spacedust.onerror = e => { 191 console.error('spacedust errored:', e); 192 restart(); 193 }; 194 195 spacedust.onclose = () => { 196 console.log('spacedust closed'); 197 restart(); 198 }; 199 200 return { updateSubs, push }; 201};