···11/** @module common/socket */
2233+import * as z_types from 'zod/v4'
44+35import { combineSignals } from '#common/async/aborts.js'
46import { BlockingAtom } from '#common/async/blocking-atom.js'
57import { BlockingQueue } from '#common/async/blocking-queue.js'
68import { Breaker } from '#common/breaker.js'
79import { ProtocolError } from '#common/errors.js'
81099-import * as protocol_types from './protocol.js'
1111+import { parseJson } from './protocol.js'
10121113/**
1214 * Send some data in JSON format down the wire.
1315 *
1416 * @param {WebSocket} ws the socket to send on
1515- * @param {protocol_types.OkResponse | protocol_types.ErrorResponse} data the data to send
1717+ * @param {unknown} data the data to send
1618 */
1719export function sendSocket(ws, data) {
1820 ws.send(JSON.stringify(data))
···6870 ws.removeEventListener('error', onError)
6971 ws.removeEventListener('close', onClose)
7072 }
7373+}
7474+7575+/**
7676+ * exactly take socket, but will additionally apply a json decoding
7777+ *
7878+ * @template T the schema's type
7979+ * @param {WebSocket} ws the socket to read
8080+ * @param {z_types.ZodSchema<T>} schema an a schema to execute
8181+ * @param {AbortSignal} [signal] an abort signal to cancel the block
8282+ * @returns {Promise<T>} the message off the socket
8383+ */
8484+export async function takeSocketJson(ws, schema, signal) {
8585+ const data = await takeSocket(ws, signal)
8686+ return parseJson.pipe(schema).parseAsync(data)
7187}
72887389/**
···189205 ws.removeEventListener('close', onClose)
190206 }
191207}
208208+209209+/**
210210+ * exactly stream socket, but will additionally apply a json decoding
211211+ * messages not validating will end the stream with an error
212212+ *
213213+ * @param {WebSocket} ws the socket to read
214214+ * @param {Partial<ConfigProps>} [config] stream configuration to merge into defaults
215215+ * @returns {AsyncGenerator<unknown>} an async generator
216216+ * @yields the message off the socket
217217+ */
218218+export async function* streamSocketJson(ws, config) {
219219+ for await (const message of streamSocket(ws, config)) {
220220+ yield parseJson.parseAsync(message)
221221+ }
222222+}
223223+224224+/**
225225+ * exactly stream socket, but will additionally apply a json decoding
226226+ * messages not validating will end the stream with an error
227227+ *
228228+ * @template T the schema's type
229229+ * @param {WebSocket} ws the socket to read
230230+ * @param {z_types.ZodSchema<T>} schema an a schema to execute
231231+ * @param {Partial<ConfigProps>} [config] stream configuration to merge into defaults
232232+ * @returns {AsyncGenerator<T>} an async generator
233233+ * @yields the message off the socket
234234+ */
235235+export async function* streamSocketSchema(ws, schema, config) {
236236+ const parser = parseJson.pipe(schema)
237237+238238+ for await (const message of streamSocket(ws, config)) {
239239+ yield await parser.parseAsync(message)
240240+ }
241241+}
+12-3
src/common/strict-map.js
···35353636 /**
3737 * @param {K} key to update in the map
3838- * @param {function(V=): V} update function which returns the new value for the map
3838+ * @param {function(V=): V | undefined} update
3939+ * function which returns the new value for the map
4040+ * if the return value is REMOVE_KEY, the whole entry in the map will be removed.
3941 */
4042 update(key, update) {
4141- const current = this.get(key)
4242- this.set(key, update(current))
4343+ const prev = this.get(key)
4444+ const next = update(prev)
4545+4646+ if (next === undefined) {
4747+ this.delete(key)
4848+ }
4949+ else {
5050+ this.set(key, next)
5151+ }
4352 }
44534554}
+6-7
src/server/routes-socket/handler-preauth.js
···11import { combineSignals, timeoutSignal } from '#common/async/aborts.js'
22import { jwkImport } from '#common/crypto/jwks.js'
33-import { jwtSchema, verifyJwtToken } from '#common/crypto/jwts.js'
33+import { jwtPayload, verifyJwtToken } from '#common/crypto/jwts.js'
44import { normalizeError, ProtocolError } from '#common/errors.js'
55import { IdentBrand, preauthMessageSchema, RealmBrand } from '#common/protocol.js'
66import { takeSocket } from '#common/socket.js'
···2525 const data = await takeSocket(ws, combinedSignal)
26262727 // if any of the parsing fails, it'll throw a zod error
2828- const jwt = jwtSchema.parse(data)
2929- const msg = await preauthMessageSchema.parseAsync(jwt.payload)
3030- const identid = IdentBrand.parse(jwt.payload.iss)
3131- const realmid = RealmBrand.parse(jwt.payload.aud)
2828+ const jwt = await jwtPayload(preauthMessageSchema).parseAsync(data)
2929+ const identid = IdentBrand.parse(jwt.claims.iss)
3030+ const realmid = RealmBrand.parse(jwt.claims.aud)
32313332 // if we're registering, make sure the realm exists
3434- if (msg.msg === 'preauth.register') {
3535- const registrantkey = await jwkImport.parseAsync(msg.pubkey)
3333+ if (jwt.payload.msg === 'preauth.register') {
3434+ const registrantkey = await jwkImport.parseAsync(jwt.payload.pubkey)
3635 realms.ensureRegisteredRealm(realmid, identid, registrantkey)
3736 }
3837
+82-51
src/server/routes-socket/handler-realm.js
···11import { normalizeProtocolError, ProtocolError } from '#common/errors.js'
22-import { realmMessageSchema, parseJson } from '#common/protocol.js'
22+import { realmToServerMessageSchema, parseJson } from '#common/protocol.js'
33import { sendSocket, streamSocket } from '#common/socket.js'
44-import { format } from 'node:util'
5465import * as protocol_types from '#common/protocol.js'
76import * as realm_types from '#server/routes-socket/state.js'
···1514 * @param {AbortSignal} [signal] an optional signal to abort the blocking loop
1615 */
1716export async function realmHandler(ws, auth, signal) {
1818- respondWithRealmStatus(ws, auth)
1919- broadcastToRealm(undefined, { ok: true, msg: 'welcome', ident: auth.identid }, auth)
1717+ realmBroadcast(auth, buildRtcPeerJoined(auth))
1818+ sendSocket(ws, buildRtcPeerWelcome(auth))
20192121- for await (const data of streamSocket(ws, { signal })) {
2222- try {
2323- const msg = await parseJson.pipe(realmMessageSchema).parseAsync(data)
2424- switch (msg.msg) {
2525- case 'realm.status':
2626- respondWithRealmStatus(ws, auth)
2727- continue
2020+ try {
2121+ for await (const data of streamSocket(ws, { signal })) {
2222+ try {
2323+ const msg = await parseJson.pipe(realmToServerMessageSchema).parseAsync(data)
2424+ switch (msg.msg) {
2525+ case 'realm.broadcast':
2626+ realmBroadcast(auth, msg.payload, msg.recipients)
2727+ continue
28282929- case 'realm.broadcast':
3030- broadcastToRealm(msg.recipients, msg.payload, auth)
3131- continue
2929+ case 'realm.rtc.signal':
3030+ realmBroadcast(auth, msg, [msg.recipient])
3131+ continue
32323333- default:
3434- throw new ProtocolError('unknown message type: ${msg.msg}', 400)
3333+ default:
3434+ throw new ProtocolError(`unknown message type: ${msg}`, 400)
3535+ }
3536 }
3636- }
3737- catch (exc) {
3838- const error = normalizeProtocolError(exc)
3939- if (error.status >= 500) throw error
3737+ catch (exc) {
3838+ const error = normalizeProtocolError(exc)
3939+ if (error.status >= 500)
4040+ throw error
40414141- if (ws.readyState === ws.OPEN) {
4242- /** @type {protocol_types.ErrorResponse} */
4343- const resp = {
4444- ok: false,
4545- message: format('Error: %s', error.message),
4646- status: error.status,
4242+ if (ws.readyState === ws.OPEN) {
4343+ sendSocket(ws, buildRealmError(error))
4744 }
4848-4949- sendSocket(ws, resp)
5045 }
5146 }
5247 }
4848+ finally {
4949+ console.log('client left!', auth)
5050+ realmBroadcast(auth, buildRtcPeerLeft(auth))
5151+ }
5352}
54535554/**
5655 * @private
5757- * @param {WebSocket} ws the socket to communicate on
5856 * @param {realm_types.AuthenticatedConnection} auth the current identity
5757+ * @returns {protocol_types.RealmRtcPeerWelcomeMessage} a realm welcome response
5958 */
6060-function respondWithRealmStatus(ws, auth) {
6161- /** @type {protocol_types.RealmStatusResponse} */
6262- const resp = {
5959+function buildRtcPeerWelcome(auth) {
6060+ return {
6361 ok: true,
6464- msg: 'realm.status',
6565- realm: auth.realmid,
6666- identities: Array.from(auth.realm.identities.keys()),
6262+ msg: 'realm.rtc.peer-welcome',
6363+ peers: Array.from(auth.realm.sockets.keys()),
6764 }
6565+}
68666969- sendSocket(ws, resp)
6767+/**
6868+ * @private
6969+ * @param {realm_types.AuthenticatedConnection} auth the current identity
7070+ * @returns {protocol_types.RealmRtcPeerJoinedMessage} a realm status response
7171+ */
7272+function buildRtcPeerJoined(auth) {
7373+ return {
7474+ ok: true,
7575+ msg: 'realm.rtc.peer-joined',
7676+ identid: auth.identid,
7777+ }
7078}
71797280/**
7381 * @private
7474- * @param {protocol_types.IdentID[] | undefined} recipients array of recips, or undef to send all
7575- * @param {unknown} payload the payload to send
7682 * @param {realm_types.AuthenticatedConnection} auth the current identity
8383+ * @returns {protocol_types.RealmRtcPeerLeftMessage} a realm status response
7784 */
7878-function broadcastToRealm(recipients, payload, auth) {
7979- /** @type {protocol_types.RealmBroadcastResponse} */
8080- const resp = {
8585+function buildRtcPeerLeft(auth) {
8686+ return {
8187 ok: true,
8282- msg: 'realm.broadcast',
8383- sender: auth.identid,
8484- payload: payload,
8888+ msg: 'realm.rtc.peer-left',
8989+ identid: auth.identid,
8590 }
9191+}
86928787- recipients ??= Array.from(auth.realm.identities.keys())
8888- for (const recip of recipients) {
8989- if (recip === auth.identid) continue
9393+/**
9494+ * @private
9595+ * @param {ProtocolError} error the error to report
9696+ * @returns {protocol_types.ErrorResponse} a realm error response
9797+ */
9898+function buildRealmError(error) {
9999+ return {
100100+ ok: false,
101101+ message: error.message,
102102+ status: error.status,
103103+ }
104104+}
901059191- const sockets = auth.realm.sockets.get(recip)
9292- if (sockets == null) continue
106106+/**
107107+ * @private
108108+ * @param {realm_types.AuthenticatedConnection} auth the current identity
109109+ * @param {unknown} payload the payload to send
110110+ * @param {protocol_types.IdentID[] | boolean} [recipients]
111111+ * when true, send to the whole realm, including self
112112+ * when false, send to the whole realm, excluding self
113113+ * when an array of recipients, send to those recipients explicitly
114114+ */
115115+function realmBroadcast(auth, payload, recipients) {
116116+ const echo = recipients === true || Array.isArray(recipients)
117117+ const recips = Array.isArray(recipients) ? recipients : Array.from(auth.realm.identities.keys())
931189494- for (const socket of sockets) {
9595- sendSocket(socket, resp)
119119+ for (const recip of recips) {
120120+ if (!echo && recip === auth.identid) continue
121121+122122+ const sockets = auth.realm.sockets.get(recip)
123123+ if (sockets) {
124124+ for (const socket of sockets) {
125125+ sendSocket(socket, payload)
126126+ }
96127 }
97128 }
98129}
+9-2
src/server/routes-socket/state.js
···3434 * @returns {Realm} a registered realm, possibly newly created with the registrant
3535 */
3636export function ensureRegisteredRealm(realmid, registrantid, registrantkey) {
3737- return realmMap.ensure(realmid, () => ({
3737+ const realm = realmMap.ensure(realmid, () => ({
3838 realmid,
3939 sockets: new StrictMap(),
4040 identities: new StrictMap([[registrantid, registrantkey]]),
4141 }))
4242+4343+ // hack for now, allow any registration to work
4444+ realm.identities.ensure(registrantid, () => registrantkey)
4545+ return realm
4246}
43474448/**
···5660 * @param {WebSocket} socket the socket to dettach
5761 */
5862export function detachSocket(realm, ident, socket) {
5959- realm.sockets.update(ident, ss => ss ? ss.filter(s => s !== socket) : [])
6363+ realm.sockets.update(ident, (sockets) => {
6464+ const next = sockets?.filter(s => s !== socket)
6565+ return next?.length ? next : undefined
6666+ })
6067}