revamped socket protocol handling

+2
eslint.config.js
··· 50 50 '@typescript-eslint/no-unused-vars': ['warn', {varsIgnorePattern: '(?:^_)'}], 51 51 '@typescript-eslint/no-unnecessary-condition': 'off', 52 52 '@typescript-eslint/restrict-template-expressions': 'off', 53 + '@typescript-eslint/no-unnecessary-type-parameters': 'off', 54 + '@typescript-eslint/no-unnecessary-type-constraint': 'off', 53 55 'tsdoc/syntax': 'warn', 54 56 }, 55 57 },
+2 -2
readme.org
··· 22 22 Run the pwa+server with: 23 23 24 24 #+BEGIN_SRC bash 25 - $ pnpm install 26 - $ pnpm run dev # lots of stuff concurrently with wireit 25 + $ npm install 26 + $ npm run dev # lots of stuff concurrently with wireit 27 27 #+END_SRC 28 28 29 29 - Common
+3 -2
src/client/components/peer-list.tsx
··· 1 1 import {useEffect, useState} from 'preact/hooks' 2 2 3 - import {PeerState, RealmConnection} from '#client/realm/connection' 3 + import {RealmConnection} from '#client/realm/connection' 4 + import {PeerState} from '#client/realm/types' 4 5 import {IdentID} from '#common/protocol' 5 6 6 7 export const PeerList: preact.FunctionComponent<{webrtcManager: RealmConnection}> = (props) => { ··· 13 14 const updatePeers = () => { 14 15 queueMicrotask(() => { 15 16 const states = webrtcManager.getPeerStates() 16 - console.log('updating peers', states) 17 + console.debug('updating peers', states) 17 18 setPeers(states) 18 19 }) 19 20 }
+195 -244
src/client/realm/connection.ts
··· 1 1 import WebSocket from 'isomorphic-ws' 2 2 import {nanoid} from 'nanoid' 3 - import SimplePeer from 'simple-peer' 3 + import SimplePeer, {SimplePeerData} from 'simple-peer' 4 4 import {z} from 'zod/v4' 5 5 6 6 import {generateSignableJwt, jwkExport} from '#common/crypto/jwks' 7 7 import {normalizeError, normalizeProtocolError, ProtocolError} from '#common/errors' 8 8 import { 9 9 IdentID, 10 - parseJson, 11 - PreauthRegisterMessage, 12 - RealmBroadcastMessage, 13 - realmFromServerMessageSchema, 14 - RealmID, 15 - RealmRtcPeerWelcomeMessage, 16 - realmRtcPeerWelcomeMessageSchema, 17 - RealmRtcSignalMessage, 10 + PreauthRegisterRequest, 11 + preauthRespSchema, 12 + realmRtcPeerJoinedEventSchema, 13 + realmRtcPeerLeftEventSchema, 14 + realmRtcPingRequestSchema, 15 + RealmRtcPongResponse, 16 + realmRtcPongResponseSchema, 17 + RealmRtcSignalEvent, 18 + realmRtcSignalEventSchema, 18 19 } from '#common/protocol' 19 - import {sendSocket, streamSocketJson, takeSocketJson} from '#common/socket' 20 + import {streamSocketJson, takeSocketJson} from '#common/socket' 21 + import {RealmPeer} from './peer' 22 + import {ConnectionIdentity, PeerState} from './types' 20 23 21 - /** the state of a specific peer */ 22 - export interface PeerState { 23 - /** if the peer connection is active */ 24 - connected: boolean 25 - 26 - /** if the peer connection has been destroyed */ 27 - destroyed: boolean 28 - 29 - /** the peer's address (ip and port) */ 30 - address: ReturnType<SimplePeer.Instance['address']> 31 - } 32 - 33 - /** identity info for connecting to a realm */ 34 - export interface RealmIdentity { 35 - realmid: RealmID 36 - identid: IdentID 37 - keypair: CryptoKeyPair 38 - } 24 + const realmRtcMessagesSchema = z.union([ 25 + realmRtcSignalEventSchema, 26 + realmRtcPeerJoinedEventSchema, 27 + realmRtcPeerLeftEventSchema, 28 + realmRtcPongResponseSchema, 29 + ]) 39 30 40 31 /** manages websocket and webrtc connections for a realm */ 41 32 export class RealmConnection extends EventTarget { 42 33 #url: string 43 - #identity: RealmIdentity 34 + #identity: ConnectionIdentity 44 35 45 36 #socket: WebSocket 46 - #peers: Map<IdentID, SimplePeer.Instance> 37 + #peers: Map<IdentID, RealmPeer> 47 38 #nonces: Map<IdentID, string> 48 39 49 - constructor(url: string, identity: RealmIdentity) { 40 + constructor(url: string, identity: ConnectionIdentity) { 50 41 super() 51 42 52 43 this.#url = url ··· 61 52 this.#socket.onerror = this.#handleSocketError 62 53 } 63 54 64 - #handleSocketOpen: WebSocket['onopen'] = async () => { 65 - if (this.#socket == undefined) throw new Error('socket open handler called with no socket?') 66 - 67 - try { 68 - console.debug('realm connection, socket loop open') 69 - this.#dispatchCustomEvent('wsopen') 70 - 71 - // do the auth dance 72 - // TODO: this should be a state machine 73 - 74 - const pubkey = await jwkExport.parseAsync(this.#identity.keypair.publicKey) 75 - this.#socket.send( 76 - await this.#signJwt({msg: 'preauth.register', pubkey} as PreauthRegisterMessage), 77 - ) 78 - 79 - // the next message should be a welcome message 80 - // this will throw (authenticated) otherwise 81 - 82 - let welcome: RealmRtcPeerWelcomeMessage 83 - try { 84 - welcome = await takeSocketJson(this.#socket, realmRtcPeerWelcomeMessageSchema) 85 - } catch (exc) { 86 - const err = normalizeError(exc) 87 - throw new ProtocolError('failure on authentication', 401, {cause: err}) 88 - } 89 - 90 - this.#dispatchCustomEvent('wsauth', {welcome}) 91 - 92 - // we initiate connections outbound when starting up 93 - // we never initiate connections to other peers otherwise 94 - 95 - for (const peerid of welcome.peers) { 96 - if (peerid === this.#identity.identid) continue 97 - this.connectToPeer(peerid, true) 98 - } 99 - 100 - // then continue looping over messages on the socket and handle them 101 - 102 - for await (const data of streamSocketJson(this.#socket)) { 103 - const parse = realmFromServerMessageSchema.safeParse(data) 104 - 105 - // not a known rtc specific message, so we don't capture it 106 - if (!parse.success) { 107 - this.#dispatchCustomEvent('wsdata', {data}) 108 - continue 109 - } 110 - 111 - // otherwise, we handle it 112 - switch (parse.data.msg) { 113 - case 'realm.rtc.peer-joined': 114 - // new peers connect to us, we don't connect to them 115 - this.#dispatchCustomEvent('peerjoined', {identid: parse.data.identid}) 116 - continue 117 - 118 - case 'realm.rtc.peer-left': 119 - // peer is gone, disconnect 120 - this.disconnectPeer(parse.data.identid) 121 - this.#dispatchCustomEvent('peerleft', {identid: parse.data.identid}) 122 - continue 55 + get connected() { 56 + return this.#socket.readyState === this.#socket.OPEN 57 + } 123 58 124 - case 'realm.rtc.signal': { 125 - // some other peer is trying to send us some rtc data 126 - // only connect if _they_ are the initiator - our initiating conns were above 127 - let peer = this.#peers.get(parse.data.sender) 128 - if (!peer && parse.data.initiator) { 129 - peer = this.connectToPeer(parse.data.sender, false) 130 - } 59 + send<T extends unknown>(identid: IdentID, data: T) { 60 + const peer = this.#peers.get(identid) 61 + if (!peer?.connected) throw new Error(`Not connected to peer: ${identid}`) 131 62 132 - // may not have a connection yet if we're waiting for them to answer 133 - if (peer) { 134 - peer.signal(parse.data.payload) 135 - } 63 + peer.send(JSON.stringify(data)) 64 + } 136 65 137 - continue 138 - } 139 - } 140 - } 141 - } catch (exc) { 142 - const err = normalizeProtocolError(exc) 66 + sendRaw(identid: IdentID, data: SimplePeerData) { 67 + const peer = this.#peers.get(identid) 68 + if (!peer?.connected) throw new Error(`Not connected to peer: ${identid}`) 143 69 144 - console.error('realm connection, socket loop error', err) 145 - this.#dispatchCustomEvent('wserror', {error: err}) 146 - } finally { 147 - console.debug('realm connection, socket loop ended') 148 - this.destroy() 149 - } 70 + peer.send(data) 150 71 } 151 72 152 - #handleSocketError: WebSocket['onerror'] = (exc) => { 153 - this.#dispatchCustomEvent('wserror', {error: normalizeProtocolError(exc)}) 154 - this.destroy() 73 + broadcast(data: unknown, self = false) { 74 + this.#peers.forEach((peer, identid) => { 75 + if (self || identid !== this.#identity.identid) peer.send(JSON.stringify(data)) 76 + }) 155 77 } 156 78 157 - #handleSocketClose: WebSocket['onclose'] = () => { 158 - this.#dispatchCustomEvent('wsclose') 159 - this.destroy() 160 - } 79 + getPeerStates(): Record<IdentID, PeerState> { 80 + const states: Record<IdentID, PeerState> = {} 81 + for (const [identid, peer] of this.#peers) { 82 + states[identid] = { 83 + identid, 84 + address: peer.address(), 85 + connected: peer.connected, 86 + destroyed: peer.destroyed, 87 + } 88 + } 161 89 162 - get connected() { 163 - return this.#socket.readyState === this.#socket.OPEN 90 + return states 164 91 } 165 92 166 93 destroy() { 167 - console.debug('realm connection #destroy!') 94 + console.debug('realm connection destroy!') 168 95 169 96 if (this.connected) this.#socket.close() 170 - 171 97 for (const peer of this.#peers.values()) peer.destroy() 172 98 173 99 this.#peers.clear() 174 100 this.#nonces.clear() 175 101 } 176 102 177 - #dispatchCustomEvent(type: string, detail?: object) { 178 - this.dispatchEvent(new CustomEvent(type, {detail})) 179 - } 103 + // do the auth dance 104 + // TODO: this should be a state machine 105 + #connectionAuthenticate = async () => { 106 + try { 107 + const pubkey = await jwkExport.parseAsync(this.#identity.keypair.publicKey) 108 + await this.#socketSignedWrite<PreauthRegisterRequest>({ 109 + typ: 'req', 110 + msg: 'preauth.register', 111 + dat: {pubkey}, 112 + }) 180 113 181 - /** generates and signs a JWT scoped to this identity/realm containing the given payload */ 182 - async #signJwt(payload: object): Promise<string> { 183 - return await generateSignableJwt({ 184 - aud: this.#identity.realmid, 185 - iss: this.#identity.identid, 186 - payload, 187 - }).sign(this.#identity.keypair.privateKey) 114 + const resp = await takeSocketJson(this.#socket, preauthRespSchema) 115 + this.#dispatchCustomEvent('wsauth', resp) 116 + 117 + // we initiate connections outbound when starting up 118 + // we never initiate connections to other peers otherwise 119 + for (const peerid of resp.dat.activePeers) { 120 + if (peerid === this.#identity.identid) continue 121 + this.#connectPeer(peerid, true) 122 + } 123 + } catch (exc) { 124 + const err = normalizeError(exc) 125 + throw new ProtocolError('failure on authentication', 401, {cause: err}) 126 + } 188 127 } 189 128 190 - connectToPeer(remoteid: IdentID, initiator: boolean): SimplePeer.Instance { 191 - let peer = this.#peers.get(remoteid) 192 - if (peer) { 193 - console.log(`already connected to ${remoteid}`) 194 - return peer 129 + // handle some message coming in on the _websocket_ 130 + #connectionMessage = (data: unknown) => { 131 + const parse = realmRtcMessagesSchema.safeParse(data) 132 + if (!parse.success) { 133 + // publish non-handled data to the listeners 134 + this.#dispatchCustomEvent('wsdata', {data}) 135 + return 195 136 } 196 137 197 - peer = new RealmConnectionPeer(this, nanoid(), this.#identity.identid, remoteid, initiator) 138 + switch (parse.data.msg) { 139 + case 'realm.rtc.pong': 140 + console.debug('got a pong response', parse) 141 + return 198 142 199 - peer.on('connect', () => { 200 - console.log(`connected to ${remoteid}`) 143 + case 'realm.rtc.signal': { 144 + // only connect if _they_ are the initiator - our initiating conns were above 145 + let peer = this.#peers.get(parse.data.dat.localid) 146 + if (!peer && parse.data.dat.initiator) { 147 + peer = this.#connectPeer(parse.data.dat.localid, false) 148 + } 201 149 202 - this.#dispatchCustomEvent('peeropen', {remoteid}) 203 - }) 150 + // may not have a connection yet if we're waiting for them to answer 151 + peer?.signal(parse.data.dat.payload) 152 + return 153 + } 204 154 205 - peer.on('close', () => { 206 - console.log(`Disconnected from ${remoteid}`) 155 + case 'realm.rtc.peer-joined': 156 + // new peers connect to us, we don't connect to them 157 + this.#dispatchCustomEvent('peerjoined', {identid: parse.data.dat.identid}) 158 + return 207 159 208 - this.#peers.delete(remoteid) 209 - this.#nonces.delete(remoteid) 210 - this.#dispatchCustomEvent('peerclose', {remoteid}) 211 - }) 160 + case 'realm.rtc.peer-left': 161 + // peer is gone, disconnect 162 + this.#disconnectPeer(parse.data.dat.identid) 163 + this.#dispatchCustomEvent('peerleft', {identid: parse.data.dat.identid}) 164 + return 165 + } 166 + } 212 167 213 - peer.on('error', (err) => { 214 - console.error(`Error with peer ${remoteid}:`, err) 168 + #connectPeer(remoteid: IdentID, initiator: boolean): RealmPeer { 169 + let peer = this.#peers.get(remoteid) 170 + if (!peer) { 171 + peer = new RealmPeer(remoteid, nanoid(), initiator) 172 + peer.on('connect', this.#handlePeerConnect.bind(this, peer)) 173 + peer.on('close', this.#handlePeerClose.bind(this, peer)) 174 + peer.on('error', this.#handlePeerError.bind(this, peer)) 175 + peer.on('data', this.#handlePeerData.bind(this, peer)) 176 + peer.on('signal', this.#handlePeerSignal.bind(this, peer)) 215 177 216 - this.#dispatchCustomEvent('peererror', {remoteid, error: err}) 217 - }) 178 + this.#peers.set(remoteid, peer) 179 + } 218 180 219 - peer.on('message', (data: unknown) => { 220 - this.#dispatchCustomEvent('peerdata', {remoteid, data}) 221 - }) 222 - 223 - this.#peers.set(remoteid, peer) 224 181 return peer 225 182 } 226 183 227 - disconnectPeer(identid: IdentID) { 184 + #disconnectPeer(identid: IdentID) { 228 185 const peer = this.#peers.get(identid) 229 186 if (peer) { 230 187 peer.destroy() ··· 233 190 } 234 191 } 235 192 236 - sendToPeer(identid: IdentID, data: unknown) { 237 - const peer = this.#peers.get(identid) 238 - if (peer && peer.connected) { 239 - peer.send(JSON.stringify(data)) 240 - } else { 241 - throw new Error(`Not connected to peer: ${identid}`) 242 - } 243 - } 193 + // handle socket open is the main loop for the sendSocket 194 + #handleSocketOpen: WebSocket['onopen'] = async () => { 195 + if (this.#socket == undefined) throw new Error('socket open handler called with no socket?') 244 196 245 - sendToServer(data: unknown) { 246 - sendSocket(this.#socket, data) 247 - } 197 + console.debug('realm connection, socket loop open') 198 + this.#dispatchCustomEvent('wsopen') 248 199 249 - broadcast(data: unknown) { 250 - const message = JSON.stringify(data) 251 - for (const [_, peer] of this.#peers) { 252 - if (peer.connected) { 253 - peer.send(message) 200 + try { 201 + await this.#connectionAuthenticate() 202 + for await (const data of streamSocketJson(this.#socket)) { 203 + this.#connectionMessage(data) 254 204 } 205 + } catch (exc) { 206 + const err = normalizeProtocolError(exc) 207 + 208 + console.error('realm connection, socket loop error', err) 209 + this.#dispatchCustomEvent('wserror', {error: err}) 210 + } finally { 211 + this.destroy() 255 212 } 256 213 } 257 214 258 - broadcastViaServer(data: unknown) { 259 - const resp: RealmBroadcastMessage = { 260 - msg: 'realm.broadcast', 261 - payload: data, 262 - recipients: false, 263 - } 215 + #handleSocketError: WebSocket['onerror'] = (exc) => { 216 + this.#dispatchCustomEvent('wserror', {error: normalizeProtocolError(exc)}) 217 + this.destroy() 218 + } 264 219 265 - sendSocket(this.#socket, resp) 220 + #handleSocketClose: WebSocket['onclose'] = () => { 221 + this.#dispatchCustomEvent('wsclose') 222 + this.destroy() 266 223 } 267 224 268 - getPeerStates(): Record<IdentID, PeerState> { 269 - const states: Record<IdentID, PeerState> = {} 270 - for (const [peerId, peer] of this.#peers) { 271 - states[peerId] = { 272 - address: peer.address(), 273 - connected: peer.connected, 274 - destroyed: peer.destroyed, 275 - } 225 + #handlePeerSignal = (peer: RealmPeer, payload: SimplePeer.SignalData) => { 226 + const msg: RealmRtcSignalEvent = { 227 + typ: 'evt', 228 + msg: 'realm.rtc.signal', 229 + dat: { 230 + initiator: peer.initiator, 231 + payload: JSON.stringify(payload), 232 + localid: this.#identity.identid, 233 + remoteid: peer.identid, 234 + }, 276 235 } 277 236 278 - return states 237 + this.#socket.send(JSON.stringify(msg)) 279 238 } 280 - } 281 239 282 - const peerPingSchema = z.object({type: z.literal('ping'), timestamp: z.number()}) 240 + #handlePeerConnect = (peer: RealmPeer) => { 241 + console.debug(`connected to ${peer.identid}`) 242 + this.#dispatchCustomEvent('peeropen', {identid: peer.identid}) 243 + } 283 244 284 - /** a single webrtc peer connection within a realm */ 285 - export class RealmConnectionPeer extends SimplePeer { 286 - #connection: RealmConnection 245 + #handlePeerClose = (peer: RealmPeer) => { 246 + console.debug(`disconnected from ${peer.identid}`) 287 247 288 - initiator: boolean 289 - localid: IdentID 290 - remoteid: IdentID 291 - nonce: string 248 + this.#peers.delete(peer.identid) 249 + this.#nonces.delete(peer.identid) 250 + this.#dispatchCustomEvent('peerclose', {identid: peer.identid}) 251 + } 292 252 293 - constructor( 294 - connection: RealmConnection, 295 - nonce: string, 296 - localid: IdentID, 297 - remoteid: IdentID, 298 - initiator: boolean, 299 - ) { 300 - super({ 301 - initiator, 302 - config: { 303 - iceServers: [ 304 - {urls: 'stun:stun.l.google.com:19302'}, 305 - {urls: 'stun:stun1.l.google.com:19302'}, 306 - ], 307 - }, 308 - }) 253 + #handlePeerError = (peer: RealmPeer, err: Error) => { 254 + console.error(`Error with peer ${peer.identid}:`, err) 255 + this.#dispatchCustomEvent('peererror', {identid: peer.identid, error: err}) 256 + } 309 257 310 - this.initiator = initiator 311 - this.localid = localid 312 - this.remoteid = remoteid 313 - this.nonce = nonce 258 + #handlePeerData = (peer: RealmPeer, chunk: string | Uint8Array) => { 259 + const data = typeof chunk === 'string' ? chunk : new TextDecoder().decode(chunk) 260 + const parsed = realmRtcPingRequestSchema.safeParse(chunk) 261 + if (parsed.success) { 262 + // reply to pings with pongs 263 + this.send<RealmRtcPongResponse>(peer.identid, { 264 + typ: 'res', 265 + msg: 'realm.rtc.pong', 266 + seq: parsed.data.seq, 267 + }) 268 + } else { 269 + this.#dispatchCustomEvent('peerdata', {identid: peer.identid, data}) 270 + } 271 + } 314 272 315 - this.#connection = connection 273 + // helpers that are just too damn long 316 274 317 - this.on('signal', this.#handlePeerSignal) 318 - this.on('data', this.#handlePeerData) 275 + #dispatchCustomEvent(type: string, detail?: object) { 276 + this.dispatchEvent(new CustomEvent(type, {detail})) 319 277 } 320 278 321 - #handlePeerSignal = (e: SimplePeer.SignalData) => { 322 - this.#connection.sendToServer({ 323 - msg: 'realm.rtc.signal', 324 - initiator: this.initiator, 325 - sender: this.localid, 326 - recipient: this.remoteid, 327 - payload: JSON.stringify(e), 328 - } satisfies RealmRtcSignalMessage) 329 - } 279 + // typed helpers 280 + // @example 281 + // this.#socketWrite<YourType>({ 282 + // ... // <- get type errors here 283 + // }) 284 + 285 + #socketSignedWrite = async <T extends object>(payload: T) => { 286 + const token = await generateSignableJwt({ 287 + aud: this.#identity.realmid, 288 + iss: this.#identity.identid, 289 + payload, 290 + }).sign(this.#identity.keypair.privateKey) 330 291 331 - #handlePeerData = (chunk: string) => { 332 - try { 333 - const ping = parseJson.pipe(peerPingSchema).safeParse(chunk) 334 - if (ping.success) { 335 - this.send(JSON.stringify({type: 'pong', timestamp: ping.data.timestamp})) 336 - } else { 337 - this.emit('message', chunk) 338 - } 339 - } catch (err) { 340 - console.error('Failed to parse message:', err) 341 - } 292 + this.#socket.send(token) 342 293 } 343 294 }
+6 -6
src/client/realm/context.tsx
··· 1 1 import {createContext} from 'preact' 2 2 import {useCallback, useEffect, useState} from 'preact/hooks' 3 3 4 - import {RealmConnection, RealmIdentity} from '#client/realm/connection' 4 + import {RealmConnection} from '#client/realm/connection' 5 + import {ConnectionIdentity} from './types' 5 6 6 7 interface RealmConnectionContext { 7 8 realm?: RealmConnection 8 - identity?: RealmIdentity 9 - setIdentity: (ident: RealmIdentity) => void 9 + identity?: ConnectionIdentity 10 + setIdentity: (ident: ConnectionIdentity) => void 10 11 } 11 12 12 13 export const RealmConnectionContext = createContext<RealmConnectionContext | null>(null) ··· 15 16 url: string 16 17 children: preact.ComponentChildren 17 18 }> = (props) => { 18 - const [identity$, setIdentity$] = useState<RealmIdentity>() 19 + const [identity$, setIdentity$] = useState<ConnectionIdentity>() 19 20 const [connection$, setConnection$] = useState<RealmConnection>() 20 21 21 22 const connect = useCallback(() => { ··· 29 30 connection$?.destroy() 30 31 }, [connection$]) 31 32 33 + // connect on mount, or identity change 32 34 useEffect(() => { 33 - console.log('use effect in provider', identity$) 34 - // connect on mount, or identity change 35 35 if (identity$) connect() 36 36 37 37 // disconnect on unsubscribe
+25
src/client/realm/peer.ts
··· 1 + import {IdentID} from '#common/protocol.js' 2 + import SimplePeer from 'simple-peer' 3 + 4 + /** a single webrtc peer connection within a realm */ 5 + export class RealmPeer extends SimplePeer { 6 + identid: IdentID 7 + nonce: string 8 + initiator: boolean 9 + 10 + constructor(identid: IdentID, nonce: string, initiator: boolean) { 11 + super({ 12 + initiator, 13 + config: { 14 + iceServers: [ 15 + {urls: 'stun:stun.l.google.com:19302'}, 16 + {urls: 'stun:stun1.l.google.com:19302'}, 17 + ], 18 + }, 19 + }) 20 + 21 + this.identid = identid 22 + this.nonce = nonce 23 + this.initiator = initiator 24 + } 25 + }
+17
src/client/realm/types.ts
··· 1 + import {IdentID, RealmID} from '#common/protocol' 2 + import SimplePeer from 'simple-peer' 3 + 4 + /** identity info for connecting to a realm */ 5 + export interface ConnectionIdentity { 6 + realmid: RealmID 7 + identid: IdentID 8 + keypair: CryptoKeyPair 9 + } 10 + 11 + /** the state of a specific peer */ 12 + export interface PeerState { 13 + identid: IdentID 14 + connected: boolean 15 + destroyed: boolean 16 + address: ReturnType<SimplePeer.Instance['address']> 17 + }
+18 -2
src/common/protocol.ts
··· 1 1 import {z} from 'zod/v4' 2 + import {ProtocolError} from './errors' 3 + import {errorMessageSchema} from './protocol/schema' 2 4 3 5 export * from './protocol/brands' 4 6 export * from './protocol/messages' 5 - export * from './protocol/messages-preauth' 6 - export * from './protocol/messages-realm' 7 7 8 8 /** a zod transformer for parsing json */ 9 9 export const parseJson = z.transform<string, unknown>((input, ctx) => { ··· 19 19 return z.NEVER 20 20 } 21 21 }) 22 + 23 + export function makeError( 24 + error: ProtocolError, 25 + detail: string, 26 + seq?: number, 27 + ): z.infer<typeof errorMessageSchema> { 28 + return { 29 + typ: 'err', 30 + msg: error.message, 31 + seq, 32 + dat: { 33 + code: error.status, 34 + detail, 35 + }, 36 + } 37 + }
-22
src/common/protocol/messages-preauth.ts
··· 1 - import {jwkSchema} from '#common/crypto/jwks' 2 - import {z} from 'zod/v4' 3 - 4 - /** zod schema for `preauth.authn` message */ 5 - export const preauthRegisterMessageSchema = z.object({ 6 - msg: z.literal('preauth.register'), 7 - pubkey: jwkSchema, 8 - }) 9 - export type PreauthRegisterMessage = z.infer<typeof preauthRegisterMessageSchema> 10 - 11 - /** zod schema for `preauth.authn` message */ 12 - export const preauthAuthnMessageSchema = z.object({ 13 - msg: z.literal('preauth.authn'), 14 - }) 15 - export type PreauthAuthnMessage = z.infer<typeof preauthAuthnMessageSchema> 16 - 17 - /** zod schema for any `preauth` messages */ 18 - export const preauthMessageSchema = z.discriminatedUnion('msg', [ 19 - preauthRegisterMessageSchema, 20 - preauthAuthnMessageSchema, 21 - ]) 22 - export type PreauthMessage = z.infer<typeof preauthMessageSchema>
-65
src/common/protocol/messages-realm.ts
··· 1 - import {z} from 'zod/v4' 2 - 3 - import {IdentBrand} from './brands' 4 - import {responseOkSchema} from './messages' 5 - 6 - /** 7 - * zod schema for `realm.broadcast` message 8 - * 9 - * recipients = true, include self in broadcast 10 - * recipients = false, exclude self in broadcast (default) 11 - * recipients = [], use these exact recipients 12 - */ 13 - export const realmBroadcastMessageSchema = z.object({ 14 - msg: z.literal('realm.broadcast'), 15 - payload: z.any(), 16 - recipients: z.union([z.boolean(), z.array(IdentBrand.schema)]).default(false), 17 - }) 18 - 19 - export type RealmBroadcastMessage = z.infer<typeof realmBroadcastMessageSchema> 20 - 21 - // rtc messages 22 - 23 - export const realmRtcSignalMessageSchema = z.object({ 24 - msg: z.literal('realm.rtc.signal'), 25 - payload: z.string(), 26 - sender: IdentBrand.schema, 27 - recipient: IdentBrand.schema, 28 - initiator: z.boolean(), 29 - }) 30 - 31 - export const realmRtcPeerWelcomeMessageSchema = responseOkSchema.extend({ 32 - msg: z.literal('realm.rtc.peer-welcome'), 33 - peers: z.array(IdentBrand.schema), 34 - }) 35 - 36 - export const realmRtcPeerJoinedMessageSchema = responseOkSchema.extend({ 37 - msg: z.literal('realm.rtc.peer-joined'), 38 - identid: IdentBrand.schema, 39 - }) 40 - 41 - export const realmRtcPeerLeftMessageSchema = responseOkSchema.extend({ 42 - msg: z.literal('realm.rtc.peer-left'), 43 - identid: IdentBrand.schema, 44 - }) 45 - 46 - export type RealmRtcSignalMessage = z.infer<typeof realmRtcSignalMessageSchema> 47 - export type RealmRtcPeerWelcomeMessage = z.infer<typeof realmRtcPeerWelcomeMessageSchema> 48 - export type RealmRtcPeerJoinedMessage = z.infer<typeof realmRtcPeerJoinedMessageSchema> 49 - export type RealmRtcPeerLeftMessage = z.infer<typeof realmRtcPeerLeftMessageSchema> 50 - 51 - /// useful unions 52 - 53 - export const realmToServerMessageSchema = z.discriminatedUnion('msg', [ 54 - realmBroadcastMessageSchema, 55 - realmRtcSignalMessageSchema, 56 - ]) 57 - 58 - export const realmFromServerMessageSchema = z.discriminatedUnion('msg', [ 59 - realmRtcPeerJoinedMessageSchema, 60 - realmRtcPeerLeftMessageSchema, 61 - realmRtcSignalMessageSchema, 62 - ]) 63 - 64 - export type RealmToServerMessage = z.infer<typeof realmToServerMessageSchema> 65 - export type RealmFromServerMessage = z.infer<typeof realmFromServerMessageSchema>
+74 -12
src/common/protocol/messages.ts
··· 1 + import {jwkSchema} from '#common/crypto/jwks' 1 2 import {z} from 'zod/v4' 2 3 3 - /** zod schema for `ok` message */ 4 - export const responseOkSchema = z.object({ 5 - ok: z.literal(true), 6 - }) 7 - export type ResponseOk = z.infer<typeof responseOkSchema> 4 + import {IdentBrand} from './brands' 5 + import { 6 + makeEmptyRequestSchema, 7 + makeEmptyResponseSchema, 8 + makeEventSchema, 9 + makeRequestSchema, 10 + makeResponseSchema, 11 + } from './schema' 12 + 13 + /// preauth 14 + 15 + export const preauthRegisterReqSchema = makeRequestSchema( 16 + 'preauth.register', 17 + z.object({pubkey: jwkSchema}), 18 + ) 19 + 20 + export const preauthAuthnReqSchema = makeEmptyRequestSchema('preauth.authn') 21 + 22 + export const preauthRespSchema = makeResponseSchema( 23 + 'preauth.authn', 24 + z.object({ 25 + activePeers: z.array(IdentBrand.schema), 26 + }), 27 + ) 28 + 29 + export const preauthReqSchema = z.union([preauthAuthnReqSchema, preauthRegisterReqSchema]) 30 + 31 + export type PreauthRegisterRequest = z.infer<typeof preauthRegisterReqSchema> 32 + export type PreauthAuthnRequest = z.infer<typeof preauthAuthnReqSchema> 33 + export type PreauthResponse = z.infer<typeof preauthRespSchema> 8 34 9 - /** zod schema for `error` message */ 10 - export const responseErrorSchema = z.object({ 11 - ok: z.literal(false), 12 - status: z.number(), 13 - message: z.string(), 14 - }) 15 - export type ErrorResponse = z.infer<typeof responseErrorSchema> 35 + /// realms 36 + 37 + export const realmBroadcastEventSchema = makeEventSchema( 38 + 'realm.broadcast', 39 + z.object({ 40 + payload: z.unknown(), 41 + recipients: z.union([z.boolean(), z.array(IdentBrand.schema)]).default(false), 42 + }), 43 + ) 44 + 45 + export const realmRtcPingRequestSchema = makeEmptyRequestSchema('realm.rtc.ping') 46 + export const realmRtcPongResponseSchema = makeEmptyResponseSchema('realm.rtc.pong') 47 + export const realmRtcPingPongMessageSchema = z.union([ 48 + realmRtcPingRequestSchema, 49 + realmRtcPongResponseSchema, 50 + ]) 51 + 52 + export const realmRtcSignalEventSchema = makeEventSchema( 53 + 'realm.rtc.signal', 54 + z.object({ 55 + payload: z.string(), 56 + initiator: z.boolean(), 57 + localid: IdentBrand.schema, 58 + remoteid: IdentBrand.schema, 59 + }), 60 + ) 61 + 62 + export const realmRtcPeerJoinedEventSchema = makeEventSchema( 63 + 'realm.rtc.peer-joined', 64 + z.object({identid: IdentBrand.schema}), 65 + ) 66 + 67 + export const realmRtcPeerLeftEventSchema = makeEventSchema( 68 + 'realm.rtc.peer-left', 69 + z.object({identid: IdentBrand.schema}), 70 + ) 71 + 72 + export type RealmBroadcastEvent = z.infer<typeof realmBroadcastEventSchema> 73 + export type RealmRtcPingRequest = z.infer<typeof realmRtcPingRequestSchema> 74 + export type RealmRtcPongResponse = z.infer<typeof realmRtcPongResponseSchema> 75 + export type RealmRtcSignalEvent = z.infer<typeof realmRtcSignalEventSchema> 76 + export type RealmRtcPeerJoinedEvent = z.infer<typeof realmRtcPeerJoinedEventSchema> 77 + export type RealmRtcPeerLeftEvent = z.infer<typeof realmRtcPeerLeftEventSchema>
+70
src/common/protocol/schema.ts
··· 1 + import {z} from 'zod/v4' 2 + 3 + const errorSchema = z.object({code: z.number(), detail: z.string().optional()}) 4 + 5 + export const eventMessageSchema = z.object({ 6 + typ: z.literal('evt'), 7 + msg: z.string(), 8 + dat: z.unknown(), 9 + }) 10 + 11 + export const requestMessageSchema = z.object({ 12 + typ: z.literal('req'), 13 + msg: z.string(), 14 + seq: z.number().optional(), 15 + dat: z.unknown(), 16 + }) 17 + 18 + export const responseMessageSchema = z.object({ 19 + typ: z.literal('res'), 20 + msg: z.string(), 21 + seq: z.number().optional(), 22 + dat: z.unknown(), 23 + }) 24 + 25 + export const errorMessageSchema = z.object({ 26 + typ: z.literal('err'), 27 + msg: z.string(), 28 + seq: z.number().optional(), 29 + dat: errorSchema, 30 + }) 31 + 32 + export const messageSchema = z.discriminatedUnion('typ', [ 33 + eventMessageSchema, 34 + requestMessageSchema, 35 + responseMessageSchema, 36 + errorMessageSchema, 37 + ]) 38 + 39 + export const makeEventSchema = <N extends string, Z extends z.ZodType>(name: N, schema: Z) => { 40 + return eventMessageSchema.extend({ 41 + msg: z.literal(name), 42 + dat: schema, 43 + }) 44 + } 45 + 46 + export const makeEmptyEventSchema = <N extends string>(name: N) => { 47 + return eventMessageSchema.extend({msg: z.literal(name)}).omit({dat: true}) 48 + } 49 + 50 + export const makeRequestSchema = <N extends string, Z extends z.ZodType>(name: N, schema: Z) => { 51 + return requestMessageSchema.extend({ 52 + msg: z.literal(name), 53 + dat: schema, 54 + }) 55 + } 56 + 57 + export const makeEmptyRequestSchema = <N extends string>(name: N) => { 58 + return requestMessageSchema.extend({msg: z.literal(name)}).omit({dat: true}) 59 + } 60 + 61 + export const makeResponseSchema = <N extends string, Z extends z.ZodType>(name: N, schema: Z) => { 62 + return responseMessageSchema.extend({ 63 + msg: z.literal(name), 64 + dat: schema, 65 + }) 66 + } 67 + 68 + export const makeEmptyResponseSchema = <N extends string>(name: N) => { 69 + return responseMessageSchema.extend({msg: z.literal(name)}).omit({dat: true}) 70 + }
-5
src/common/socket.ts
··· 9 9 10 10 import {parseJson} from './protocol' 11 11 12 - /** send some data in json format down the wire */ 13 - export function sendSocket(ws: WebSocket, data: unknown): void { 14 - ws.send(JSON.stringify(data)) 15 - } 16 - 17 12 /** 18 13 * given a websocket, wait and take a single message off and return it 19 14 *
+1
src/common/types.ts
··· 1 + export type Replace<K extends keyof U, U, V> = Omit<U, K> & Record<K, V>
+3 -7
src/server/index.ts
··· 32 32 // WebSocket handling 33 33 const wss = new WebSocketServer({server, path: '/stream'}) 34 34 wss.on('connection', (ws) => { 35 - socketHandler(ws) 36 - .catch((e: unknown) => { 37 - console.error('uncaught error from websocket', e) 38 - }) 39 - .finally(() => { 40 - console.log('socket handler complete') 41 - }) 35 + socketHandler(ws).catch((e: unknown) => { 36 + console.error('uncaught error from websocket', e) 37 + }) 42 38 }) 43 39 44 40 return server
-1
src/server/routes-error.ts
··· 1 1 import * as express from 'express' 2 2 3 3 export const notFoundHandler: express.RequestHandler = (req, res) => { 4 - console.log(req.url) 5 4 res.status(404).send('wut') 6 5 }
+26 -4
src/server/routes-socket/handler-preauth.ts
··· 4 4 import {jwkImport} from '#common/crypto/jwks' 5 5 import {jwtPayload, verifyJwtToken} from '#common/crypto/jwts' 6 6 import {normalizeError, ProtocolError} from '#common/errors' 7 - import {IdentBrand, IdentID, preauthMessageSchema, RealmBrand, RealmID} from '#common/protocol' 7 + import { 8 + IdentBrand, 9 + IdentID, 10 + preauthReqSchema, 11 + PreauthResponse, 12 + RealmBrand, 13 + RealmID, 14 + } from '#common/protocol' 8 15 import {takeSocket} from '#common/socket' 9 16 10 17 import * as realms from './state' ··· 23 30 24 31 try { 25 32 const data = await takeSocket(ws, combinedSignal) 33 + const jwt = await jwtPayload(preauthReqSchema).parseAsync(data) 26 34 27 35 // if any of the parsing fails, it'll throw a zod error 28 - const jwt = await jwtPayload(preauthMessageSchema).parseAsync(data) 29 36 const identid = IdentBrand.parse(jwt.claims.iss) 30 37 const realmid = RealmBrand.parse(jwt.claims.aud) 31 38 32 39 // if we're registering, make sure the realm exists 33 40 if (jwt.payload.msg === 'preauth.register') { 34 - const registrantkey = await jwkImport.parseAsync(jwt.payload.pubkey) 41 + const registrantkey = await jwkImport.parseAsync(jwt.payload.dat.pubkey) 35 42 realms.ensureRegisteredRealm(realmid, identid, registrantkey) 36 43 } 37 44 38 - return await authenticatePreauth(realmid, identid, jwt.token) 45 + const auth = await authenticatePreauth(realmid, identid, jwt.token) 46 + const msg = preauthResponse(auth, jwt.payload.seq) 47 + ws.send(JSON.stringify(msg)) 48 + 49 + return auth 39 50 } finally { 40 51 timeout.cancel() 41 52 } ··· 59 70 throw new ProtocolError('jwt verification failed', 401, {cause: err}) 60 71 } 61 72 } 73 + 74 + function preauthResponse(auth: realms.AuthenticatedIdentity, seq?: number): PreauthResponse { 75 + return { 76 + typ: 'res', 77 + msg: 'preauth.authn', 78 + dat: { 79 + activePeers: Array.from(auth.realm.sockets.keys()), 80 + }, 81 + seq, 82 + } 83 + }
+24 -36
src/server/routes-socket/handler-realm.ts
··· 1 1 import {WebSocket} from 'isomorphic-ws' 2 + import {z} from 'zod/v4' 2 3 3 4 import {normalizeProtocolError, ProtocolError} from '#common/errors' 4 5 import * as protocol from '#common/protocol' 5 - import {sendSocket, streamSocket} from '#common/socket' 6 + import {streamSocket} from '#common/socket' 6 7 import * as realm from '#server/routes-socket/state' 7 8 9 + // what can the server handle? 10 + const incomingMessageSchema = z.union([ 11 + protocol.realmBroadcastEventSchema, 12 + protocol.realmRtcSignalEventSchema, 13 + ]) 14 + 8 15 /** 9 16 * ance we've retrieved authentication details, we go into the main realm loop. 10 17 * read messages as they come in and dispatch actions. ··· 15 22 signal?: AbortSignal, 16 23 ) { 17 24 realmBroadcast(auth, buildRtcPeerJoined(auth)) 18 - sendSocket(ws, buildRtcPeerWelcome(auth)) 19 - 20 - const parser = protocol.parseJson.pipe(protocol.realmToServerMessageSchema) 21 25 22 26 try { 23 - for await (const data of streamSocket(ws, {signal})) { 27 + const incomingParser = protocol.parseJson.pipe(incomingMessageSchema) 28 + for await (const msg of streamSocket(ws, {signal})) { 24 29 try { 25 - const msg = await parser.parseAsync(data) 26 - switch (msg.msg) { 30 + const data = await incomingParser.parseAsync(msg) 31 + switch (data.msg) { 27 32 case 'realm.broadcast': 28 - realmBroadcast(auth, msg.payload, msg.recipients) 33 + realmBroadcast(auth, data.dat, data.dat.recipients) 29 34 continue 30 35 31 36 case 'realm.rtc.signal': 32 - realmBroadcast(auth, msg, [msg.recipient]) 37 + realmBroadcast(auth, data, [data.dat.remoteid]) 33 38 continue 34 39 35 40 default: ··· 41 46 if (error.status >= 500) throw error 42 47 43 48 if (ws.readyState === ws.OPEN) { 44 - sendSocket(ws, buildRealmError(error)) 49 + const err = protocol.makeError(error, 'error in realm loop') 50 + ws.send(JSON.stringify(err)) 45 51 } 46 52 } 47 53 } 48 54 } finally { 49 - console.log('client left!', auth) 50 55 realmBroadcast(auth, buildRtcPeerLeft(auth)) 51 56 } 52 57 } 53 58 54 - function buildRtcPeerWelcome( 55 - auth: realm.AuthenticatedIdentity, 56 - ): protocol.RealmRtcPeerWelcomeMessage { 59 + function buildRtcPeerJoined(auth: realm.AuthenticatedIdentity): protocol.RealmRtcPeerJoinedEvent { 57 60 return { 58 - ok: true, 59 - msg: 'realm.rtc.peer-welcome', 60 - peers: Array.from(auth.realm.sockets.keys()), 61 - } 62 - } 63 - 64 - function buildRtcPeerJoined(auth: realm.AuthenticatedIdentity): protocol.RealmRtcPeerJoinedMessage { 65 - return { 66 - ok: true, 61 + typ: 'evt', 67 62 msg: 'realm.rtc.peer-joined', 68 - identid: auth.identid, 63 + dat: {identid: auth.identid}, 69 64 } 70 65 } 71 66 72 - function buildRtcPeerLeft(auth: realm.AuthenticatedIdentity): protocol.RealmRtcPeerLeftMessage { 67 + function buildRtcPeerLeft(auth: realm.AuthenticatedIdentity): protocol.RealmRtcPeerLeftEvent { 73 68 return { 74 - ok: true, 69 + typ: 'evt', 75 70 msg: 'realm.rtc.peer-left', 76 - identid: auth.identid, 77 - } 78 - } 79 - 80 - function buildRealmError(error: ProtocolError): protocol.ErrorResponse { 81 - return { 82 - ok: false, 83 - message: error.message, 84 - status: error.status, 71 + dat: {identid: auth.identid}, 85 72 } 86 73 } 87 74 ··· 100 87 ) { 101 88 const echo = recipients === true || Array.isArray(recipients) 102 89 const recips = Array.isArray(recipients) ? recipients : Array.from(auth.realm.identities.keys()) 90 + const json = JSON.stringify(payload) 103 91 104 92 for (const recip of recips) { 105 93 if (!echo && recip === auth.identid) continue ··· 107 95 const sockets = auth.realm.sockets.get(recip) 108 96 if (sockets) { 109 97 for (const socket of sockets) { 110 - sendSocket(socket, payload) 98 + socket.send(json) 111 99 } 112 100 } 113 101 }
-2
src/server/routes-socket/handler.ts
··· 9 9 10 10 /** when the socket connects, we drive our protocol through handlers */ 11 11 export async function socketHandler(ws: WebSocket) { 12 - console.log('WebSocket connection established') 13 - 14 12 try { 15 13 const auth = await preauthHandler(ws) 16 14 try {