demos for spacedust
1import { default as lexicons, getBits } from 'lexicons';
2import psl from 'psl';
3import webpush from 'web-push';
4import WebSocket from 'ws';
5
6// kind of silly but right now there's no way to tell spacedust that we want an alive connection
7// but don't want the notification firehose (everything filtered out)
8// so... the final filter is an absolute on this fake did, effectively filtering all notifs.
9// (this is only used when there are no subscribers registered)
10const DUMMY_DID = 'did:plc:zzzzzzzzzzzzzzzzzzzzzzzz';
11
12let spacedust;
13let spacedustEverStarted = false;
14
15const updateSubs = db => {
16 if (!spacedust) {
17 console.warn('not updating subscription, no spacedust (reconnecting?)');
18 return;
19 }
20 const wantedSubjectDids = db.getSubscribedDids();
21 if (wantedSubjectDids.length === 0) {
22 wantedSubjectDids.push(DUMMY_DID);
23 }
24 console.log('updating for wantedSubjectDids', wantedSubjectDids);
25 spacedust.send(JSON.stringify({
26 type: 'options_update',
27 payload: {
28 wantedSubjectDids,
29 },
30 }));
31};
32
33const push = async (db, pushSubscription, payload) => {
34 const { session, subscription, since_last_push } = pushSubscription;
35 if (since_last_push !== null && since_last_push < 1.618) {
36 console.warn(`rate limiter: dropping too-soon push (${since_last_push})`);
37 return;
38 }
39
40 let sub;
41 try {
42 sub = JSON.parse(subscription);
43 } catch (e) {
44 console.error('failed to parse subscription json, dropping session', e);
45 db.deleteSub(session);
46 return;
47 }
48
49 try {
50 await webpush.sendNotification(sub, payload);
51 } catch (err) {
52 if (400 <= err.statusCode && err.statusCode < 500) {
53 console.info(`removing sub for ${err.statusCode}`);
54 db.deleteSub(session);
55 return;
56 } else {
57 console.warn('something went wrong for another reason', err);
58 }
59 }
60
61 db.updateLastPush(session);
62};
63
64const isTorment = source => {
65 try {
66 const [nsid, ...rp] = source.split(':');
67
68 let parts = nsid.split('.');
69 parts.reverse();
70 parts = parts.join('.');
71
72 const app = psl.parse(parts)?.domain ?? 'unknown';
73
74 let appPrefix = app.split('.');
75 appPrefix.reverse();
76 appPrefix = appPrefix.join('.')
77
78 return source.slice(app.length + 1) in lexicons[appPrefix]?.torment_sources;
79 } catch (e) {
80 console.error('checking tormentedness failed, allowing through', e);
81 return false;
82 }
83};
84
85const extractUriDid = at_uri => {
86 if (!at_uri.startsWith('at://')) {
87 console.warn(`ignoring non-at-uri: ${at_uri}`);
88 return null;
89 }
90 const [id, ..._] = at_uri.slice('at://'.length).split('/');
91 if (!id) {
92 console.warn(`ignoring at-uri with missing id segment: ${at_uri}`);
93 return null;
94 }
95 if (id.startsWith('@')) {
96 console.warn(`ignoring @handle at-uri: ${at_uri}`);
97 return null;
98 }
99 if (!id.startsWith('did:')) {
100 console.warn(`ignoring non-did at-uri: ${at_uri}`);
101 return null;
102 }
103 return id;
104};
105
106const handleDust = db => async event => {
107 console.log('got', event.data);
108 let data;
109 try {
110 data = JSON.parse(event.data);
111 } catch (err) {
112 console.error(err);
113 return;
114 }
115 const { link: { subject, source, source_record } } = data;
116 if (isTorment(source)) {
117 console.log('nope! not today,', source);
118 return;
119 }
120 const timestamp = +new Date();
121
122 const did = subject.startsWith('did:') ? subject : extractUriDid(subject);
123 if (!did) {
124 console.warn(`ignoring link with non-DID subject: ${subject}`)
125 return;
126 }
127
128 // this works for now since only the account owner is assumed to be a notification target
129 // but for "replies on post" etc that won't hold
130 const { notify_enabled, notify_self } = db.getNotifyAccountGlobals(did);
131 if (!notify_enabled) {
132 console.warn('dropping notification for global not-enabled setting');
133 return;
134 }
135 if (!notify_self) {
136 const source_did = extractUriDid(source_record);
137 if (!source_did) {
138 console.warn(`ignoring link with non-DID source_record: ${source_record}`)
139 return;
140 }
141 if (source_did === did) {
142 console.warn(`ignoring self-notification`);
143 return;
144 }
145 }
146
147 // like above, this over-assumes that did is the only recipient we could care about for now
148 const { app, group } = getBits(source);
149 for (const [selector, selection] of [
150 ['source', source],
151 ['group', group],
152 ['app', app],
153 ]) {
154 const notify = db.getNotificationFilter(did, selector, selection);
155 if (notify === true) {
156 console.info(`explicitly allowing notification by filter for ${selector}=${selection}`);
157 break;
158 };
159 if (notify === false) {
160 console.warn(`ignoring filtered notification for ${selector}=${selection}`);
161 return;
162 }
163 }
164
165 const subs = db.getSubsByDid(did);
166 const payload = JSON.stringify({ subject, source, source_record, timestamp });
167 try {
168 await Promise.all(subs.map(pushSubscription => push(db, pushSubscription, payload)));
169 } catch (e) {
170 console.warn('at least one notification send failed', e);
171 }
172};
173
174export const connectSpacedust = (db, host) => {
175 spacedust = new WebSocket(`${host}/subscribe?instant=true&wantedSubjectDids=${DUMMY_DID}`);
176 let restarting = false;
177
178 const restart = () => {
179 if (restarting) return;
180 restarting = true;
181 let wait = Math.round(500 + (Math.random() * 1000));
182 console.info(`restarting spacedust connection in ${wait}ms...`);
183 setTimeout(() => connectSpacedust(db, host), wait);
184 spacedust = null;
185 }
186
187 spacedust.onopen = () => updateSubs(db);
188 spacedust.onmessage = handleDust(db);
189
190 spacedust.onerror = e => {
191 console.error('spacedust errored:', e);
192 restart();
193 };
194
195 spacedust.onclose = () => {
196 console.log('spacedust closed');
197 restart();
198 };
199
200 return { updateSubs, push };
201};