offline-first, p2p synced, atproto enabled, feed reader
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

most of the driver is tested

+1340 -78
+1
__mocks__/isomorphic-ws.js
··· 1 + export {WebSocket} from 'mock-socket'
+1 -1
package-lock.json
··· 40 40 "ts-pattern": "^5.9.0", 41 41 "tsx": "^4.19.2", 42 42 "webtorrent": "^2.6.8", 43 - "ws": "^8.18.2", 43 + "ws": "^8.18.3", 44 44 "zod": "^4.1.12" 45 45 }, 46 46 "devDependencies": {
+1 -1
package.json
··· 52 52 "ts-pattern": "^5.9.0", 53 53 "tsx": "^4.19.2", 54 54 "webtorrent": "^2.6.8", 55 - "ws": "^8.18.2", 55 + "ws": "^8.18.3", 56 56 "zod": "^4.1.12" 57 57 }, 58 58 "devDependencies": {
+1 -1
src/lib/crypto/errors.ts
··· 6 6 /** Thrown when failing to verify a JWT signature */ 7 7 export class JWTBadSignatureError extends CryptoError { 8 8 constructor(options?: BaseErrorOpts) { 9 - super('could not verify token signature', options) 9 + super('bad signature', options) 10 10 } 11 11 }
+3 -9
src/lib/crypto/jwts.spec.ts
··· 39 39 describe('jwtPayload', () => { 40 40 it('extracts and validates payload', async () => { 41 41 const pair = await generateSigningJwkPair() 42 - const token = await generateSignableJwt({ 43 - sub: 'test', 44 - payload: {userId: '123', name: 'Alice'}, 45 - }).sign(pair.privateKey) 42 + const token = await generateSignableJwt({sub: 'test', userId: '123', name: 'Alice'}).sign(pair.privateKey) 46 43 47 44 const schema = jwtPayload(z.object({userId: z.string(), name: z.string()})) 48 45 const result = await schema.parseAsync(token) ··· 64 61 65 62 it('includes token and claims in result', async () => { 66 63 const pair = await generateSigningJwkPair() 67 - const token = await generateSignableJwt({ 68 - sub: 'test', 69 - payload: {data: 'value'}, 70 - }).sign(pair.privateKey) 64 + const token = await generateSignableJwt({sub: 'test', data: 'value'}).sign(pair.privateKey) 71 65 72 66 const schema = jwtPayload(z.object({data: z.string()})) 73 67 const result = await schema.parseAsync(token) ··· 131 125 throw new Error('Should have thrown') 132 126 } catch (err) { 133 127 expect(err).toHaveProperty('message') 134 - expect((err as Error).message).toContain('could not verify token signature') 128 + expect((err as Error).message).toContain('bad signature') 135 129 } 136 130 }) 137 131 })
+3 -4
src/lib/crypto/jwts.ts
··· 40 40 * schema describing a verified payload in a JWT. 41 41 * **important** - this does no claims validation, only decoding from string to JWT! 42 42 */ 43 - export const jwtPayload = <T>(schema: z.ZodType<T>): z.ZodType<JWTTokenPayload<T>> => { 44 - const parser = z.looseObject({payload: schema}) 43 + export const jwtPayload = <T>(schema: z.ZodType<T>): z.ZodType<JWTTokenPayload<T>, string> => { 45 44 return jwtSchema.transform(async (payload, ctx) => { 46 - const result = await parser.safeParseAsync(payload.claims) 47 - if (result.success) return {...payload, payload: result.data.payload} 45 + const result = await schema.safeParseAsync(payload.claims) 46 + if (result.success) return {...payload, payload: result.data} 48 47 49 48 ctx.addIssue({code: 'invalid_type', expected: 'custom'}) 50 49 return z.NEVER
+3 -3
src/lib/errors.ts
··· 55 55 * normalizes the given error into a protocol error 56 56 * passes through input that is already protocol errors. 57 57 */ 58 - export function normalizeProtocolError(cause: unknown): ProtocolError { 58 + export function normalizeProtocolError(cause: unknown, status = 500): ProtocolError { 59 59 if (cause instanceof ProtocolError) return cause 60 60 if (cause instanceof ZodError) return new ProtocolError(prettifyError(cause), 400, {cause}) 61 61 ··· 63 63 if (cause.name === 'TimeoutError') return new ProtocolError('operation timed out', 408, {cause}) 64 64 if (cause.name === 'AbortError') return new ProtocolError('operation was aborted', 499, {cause}) 65 65 66 - return new ProtocolError(cause.message, 500, {cause}) 66 + return new ProtocolError(cause.message, status, {cause}) 67 67 } 68 68 69 69 // fallback, unknown 70 70 const options = cause == undefined ? undefined : {cause: normalizeError(cause)} 71 - return new ProtocolError(`Error! ${cause}`, 500, options) 71 + return new ProtocolError(`Error! ${cause}`, status, options) 72 72 } 73 73 74 74 /** error wrapper for unknown errors (not an Error?) */
+10 -6
src/lib/socket.spec.ts
··· 1 1 import {afterEach, beforeEach, describe, expect, it} from '@jest/globals' 2 + import {WebSocket} from 'isomorphic-ws' 2 3 import WS from 'jest-websocket-mock' 3 4 import {z} from 'zod/v4' 4 5 5 6 import {streamSocket, streamSocketSchema, takeSocket, takeSocketSchema} from '#lib/socket' 6 7 import {jsonCodec} from './schema' 8 + import {assertParsed} from './utils' 7 9 8 10 let server: WS 9 11 const TEST_URL = 'ws://localhost:1234' ··· 11 13 beforeEach(() => { 12 14 server = new WS(TEST_URL) 13 15 }) 16 + 14 17 afterEach(() => { 15 18 WS.clean() 16 19 }) ··· 208 211 209 212 const streamPromise = (async () => { 210 213 for await (const msg of streamSocketSchema(ws, jsonCodec(schema))) { 211 - messages.push(msg) 214 + messages.push(assertParsed(msg)) 212 215 if (messages.length === 2) { 213 216 break 214 217 } ··· 223 226 expect(messages).toEqual([{id: 1}, {id: 2}]) 224 227 }) 225 228 226 - it('rejects invalid messages', async () => { 229 + it('sends errors for invalid messages', async () => { 227 230 const ws = await createConnectedWebSocket() 228 231 const schema = z.object({id: z.number()}) 229 232 230 233 const streamPromise = (async () => { 231 - for await (const _ of streamSocketSchema(ws, jsonCodec(schema))) { 232 - // Should reject 234 + for await (const msg of streamSocketSchema(ws, jsonCodec(schema))) { 235 + expect(msg.success).toBe(false) 236 + expect(msg.error?.message).toContain('id') 237 + return true 233 238 } 234 239 })() 235 240 236 241 server.send('{"id":"not a number"}') 237 - 238 - await expect(streamPromise).rejects.toThrow() 242 + await expect(streamPromise).resolves.toBe(true) 239 243 }) 240 244 })
+6 -2
src/lib/socket.ts
··· 1 + import {MessageEvent, WebSocket} from 'isomorphic-ws' 1 2 import {z} from 'zod/v4' 2 3 3 4 import {combineSignals} from '#lib/async/aborts' ··· 87 88 export interface ConfigProps { 88 89 /** maximum size of the stream buffer */ 89 90 maxDepth: number 91 + 92 + /** skip errors, or raise? */ 93 + skipErrors?: boolean 90 94 91 95 /** signal for abort */ 92 96 signal?: AbortSignal ··· 201 205 ws: WebSocket, 202 206 schema: z.ZodType<T>, 203 207 config?: Partial<ConfigProps>, 204 - ): AsyncGenerator<T> { 208 + ): AsyncGenerator<z.ZodSafeParseResult<T>> { 205 209 for await (const message of streamSocket(ws, config)) { 206 - yield await schema.parseAsync(message) 210 + yield await schema.safeParseAsync(message) 207 211 } 208 212 }
+57
src/lib/specs/helpers-socket.ts
··· 1 + import {afterEach, beforeEach} from '@jest/globals' 2 + import {WebSocket} from 'isomorphic-ws' 3 + import WS from 'jest-websocket-mock' 4 + 5 + export type SocketServerState = { 6 + url: string 7 + server?: WS 8 + } 9 + 10 + export function mockSocketServer(url = 'ws://localhost:1234') { 11 + const state: SocketServerState = {url} 12 + 13 + beforeEach(() => { 14 + state.server = new WS(url) 15 + }) 16 + 17 + afterEach(() => { 18 + WS.clean() 19 + }) 20 + 21 + return { 22 + url, 23 + state, 24 + 25 + get nextMessage() { 26 + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion 27 + return this.state.server!.nextMessage 28 + }, 29 + 30 + send(data: string | object) { 31 + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion 32 + this.state.server!.send(data) 33 + }, 34 + 35 + async connect(): Promise<WebSocket> { 36 + const {promise, resolve, reject} = Promise.withResolvers<WebSocket>() 37 + const client = new WebSocket(this.state.url) 38 + 39 + const open = () => { 40 + resolve(client) 41 + client.removeEventListener('open', open) 42 + client.removeEventListener('error', error) 43 + } 44 + 45 + const error = (e: unknown) => { 46 + reject(e) 47 + client.removeEventListener('open', open) 48 + client.removeEventListener('error', error) 49 + } 50 + 51 + client.addEventListener('open', open) 52 + client.addEventListener('error', error) 53 + 54 + return promise 55 + }, 56 + } 57 + }
+7
src/lib/utils.ts
··· 1 + import {z} from 'zod/v4' 2 + 1 3 export function assert<V>(value: V | undefined | null): V { 2 4 if (value == null) throw new Error('unexpected nullish!') 3 5 return value 4 6 } 7 + 8 + export function assertParsed<T>(value: z.ZodSafeParseResult<T>): T { 9 + if (!value.success) throw value.error 10 + return value.data 11 + }
+1 -7
src/realm/protocol/messages-realm.ts
··· 1 1 import {jwkSchema} from '#lib/crypto/jwks' 2 - import {jwtPayload} from '#lib/crypto/jwts' 3 2 import {z} from 'zod/v4' 4 3 5 4 import {makeEventSchema, makeRequestSchema, makeResponseSchema} from '#lib/schema' ··· 24 23 z.object({ 25 24 localid: IdentBrand.schema, 26 25 remoteid: IdentBrand.schema, 27 - signed: jwtPayload( 28 - z.object({ 29 - signal: z.string(), 30 - initiator: z.boolean(), 31 - }), 32 - ), 26 + signed: z.string(), 33 27 }), 34 28 ) 35 29
+682
src/realm/server/driver-preauth.spec.ts
··· 1 + import {describe, expect, it} from '@jest/globals' 2 + 3 + import {generateSignableJwt, generateSigningJwkPair, jwkExport} from '#lib/crypto/jwks' 4 + import {jsonCodec} from '#lib/schema' 5 + import {mockSocketServer} from '#lib/specs/helpers-socket' 6 + 7 + import { 8 + PreauthAuthnRequest, 9 + PreauthExchangeInviteRequest, 10 + PreauthRegisterRequest, 11 + preauthResSchema, 12 + } from '#realm/protocol/messages-preauth' 13 + import {IdentBrand, RealmBrand} from '#realm/schema/brands' 14 + 15 + import {drivePreauth} from './driver-preauth' 16 + import {ensureRealm, fetchRealm} from './storage' 17 + 18 + // Note: With jest-websocket-mock, the "server" is our test harness (WS), 19 + // and the "client" is what we're testing (the drivePreauth function). 20 + // This feels backwards because we're testing a server-side component. 21 + 22 + const socket = mockSocketServer() 23 + 24 + describe('drivePreauth', () => { 25 + describe('preauth.register', () => { 26 + it('should successfully register a new realm', async () => { 27 + const ws = await socket.connect() 28 + const realmid = RealmBrand.generate() 29 + const identid = IdentBrand.generate() 30 + const keyPair = await generateSigningJwkPair() 31 + 32 + // Export public key as JWK 33 + const pubkeyJwk = await jwkExport.parseAsync(keyPair.publicKey) 34 + 35 + // Create and sign the registration request 36 + const payload = { 37 + typ: 'req', 38 + msg: 'preauth.register', 39 + seq: 'sequence', 40 + dat: { 41 + pubkey: pubkeyJwk, 42 + }, 43 + } satisfies PreauthRegisterRequest 44 + 45 + const jwt = await generateSignableJwt(payload) 46 + .setAudience(realmid) 47 + .setIssuer(identid) 48 + .setJti('nonce') 49 + .sign(keyPair.privateKey) 50 + 51 + // Start the preauth driver 52 + const authPromise = drivePreauth(ws) 53 + 54 + // Send the JWT from the "server" (test harness) to "client" (code under test) 55 + socket.send(jwt) 56 + 57 + // Wait for response 58 + const response = await socket.nextMessage 59 + const result = await authPromise 60 + 61 + // Verify the response 62 + expect(result).not.toBeNull() 63 + expect(result?.realm.realmid).toBe(realmid) 64 + expect(result?.identid).toBe(identid) 65 + 66 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 67 + expect(responseData.success).toBeTruthy() 68 + expect(responseData.data).toMatchObject({ 69 + typ: 'res', 70 + msg: 'preauth.authn', 71 + dat: { 72 + peers: ['server'], 73 + identities: { 74 + [identid]: pubkeyJwk, 75 + }, 76 + }, 77 + }) 78 + 79 + // Verify realm was created in storage 80 + const realm = await fetchRealm(realmid) 81 + expect(realm).not.toBeNull() 82 + expect(realm?.realmid).toBe(realmid) 83 + }, 1000) 84 + 85 + it('should return existing realm if already registered', async () => { 86 + const ws = await socket.connect() 87 + const realmid = RealmBrand.generate() 88 + const identid = IdentBrand.generate() 89 + const keyPair = await generateSigningJwkPair() 90 + 91 + // Pre-create the realm 92 + await ensureRealm(realmid, identid, keyPair.publicKey) 93 + const pubkeyJwk = await crypto.subtle.exportKey('jwk', keyPair.publicKey) 94 + const payload = { 95 + typ: 'req', 96 + msg: 'preauth.register', 97 + seq: 'sequence', 98 + dat: { 99 + pubkey: pubkeyJwk, 100 + }, 101 + } satisfies PreauthRegisterRequest 102 + 103 + const jwt = await generateSignableJwt(payload) 104 + .setAudience(realmid) 105 + .setIssuer(identid) 106 + .setJti('nonce') 107 + .sign(keyPair.privateKey) 108 + 109 + const authPromise = drivePreauth(ws) 110 + socket.send(jwt) 111 + 112 + await socket.nextMessage 113 + const result = await authPromise 114 + 115 + expect(result).not.toBeNull() 116 + expect(result?.realm.realmid).toBe(realmid) 117 + expect(result?.identid).toBe(identid) 118 + }, 1000) 119 + }) 120 + 121 + describe('preauth.authn', () => { 122 + it('should successfully authenticate an existing identity', async () => { 123 + const ws = await socket.connect() 124 + const realmid = RealmBrand.generate() 125 + const identid = IdentBrand.generate() 126 + const keyPair = await generateSigningJwkPair() 127 + const pubkeyJwk = await jwkExport.parseAsync(keyPair.publicKey) 128 + 129 + // Pre-create the realm and identity 130 + await ensureRealm(realmid, identid, keyPair.publicKey) 131 + 132 + const payload = { 133 + typ: 'req', 134 + msg: 'preauth.authn', 135 + seq: 'sequence', 136 + } satisfies PreauthAuthnRequest 137 + 138 + const jwt = await generateSignableJwt(payload) 139 + .setAudience(realmid) 140 + .setIssuer(identid) 141 + .setJti('nonce') 142 + .sign(keyPair.privateKey) 143 + 144 + const authPromise = drivePreauth(ws) 145 + socket.send(jwt) 146 + 147 + const response = await socket.nextMessage 148 + const result = await authPromise 149 + 150 + expect(result).not.toBeNull() 151 + expect(result?.realm.realmid).toBe(realmid) 152 + expect(result?.identid).toBe(identid) 153 + expect(result?.pubkey).toBeDefined() 154 + 155 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 156 + expect(responseData.success).toBeTruthy() 157 + expect(responseData.data).toMatchObject({ 158 + typ: 'res', 159 + msg: 'preauth.authn', 160 + dat: expect.objectContaining({ 161 + peers: expect.arrayContaining(['server']), 162 + identities: expect.objectContaining({ 163 + [identid]: pubkeyJwk, 164 + }), 165 + }), 166 + }) 167 + }, 1000) 168 + 169 + it('should reject authentication for unknown realm', async () => { 170 + const ws = await socket.connect() 171 + const realmid = RealmBrand.generate() 172 + const identid = IdentBrand.generate() 173 + const keyPair = await generateSigningJwkPair() 174 + 175 + const payload = { 176 + typ: 'req', 177 + msg: 'preauth.authn', 178 + seq: 'sequence', 179 + } satisfies PreauthAuthnRequest 180 + 181 + const jwt = await generateSignableJwt(payload) 182 + .setAudience(realmid) 183 + .setIssuer(identid) 184 + .setJti('nonce') 185 + .sign(keyPair.privateKey) 186 + 187 + const authPromise = drivePreauth(ws) 188 + socket.send(jwt) 189 + 190 + const response = await socket.nextMessage 191 + const result = await authPromise 192 + 193 + expect(result).toBeNull() 194 + 195 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 196 + expect(responseData.success).toBeTruthy() 197 + expect(responseData.data).toMatchObject({ 198 + typ: 'err', 199 + msg: 'preauth.authn', 200 + err: { 201 + code: 404, 202 + detail: '404 Not Found: unknown realm', 203 + }, 204 + }) 205 + }, 1000) 206 + 207 + it('should reject authentication for unknown identity', async () => { 208 + const ws = await socket.connect() 209 + const realmid = RealmBrand.generate() 210 + const ownerIdentid = IdentBrand.generate() 211 + const unknownIdentid = IdentBrand.generate() 212 + const ownerKeyPair = await generateSigningJwkPair() 213 + const unknownKeyPair = await generateSigningJwkPair() 214 + 215 + // Create realm with owner identity 216 + await ensureRealm(realmid, ownerIdentid, ownerKeyPair.publicKey) 217 + 218 + // Try to authenticate with unknown identity 219 + const payload = { 220 + typ: 'req', 221 + msg: 'preauth.authn', 222 + seq: 'sequence', 223 + } satisfies PreauthAuthnRequest 224 + 225 + const jwt = await generateSignableJwt(payload) 226 + .setAudience(realmid) 227 + .setIssuer(unknownIdentid) 228 + .setJti('nonce') 229 + .sign(unknownKeyPair.privateKey) 230 + 231 + const authPromise = drivePreauth(ws) 232 + socket.send(jwt) 233 + 234 + const response = await socket.nextMessage 235 + const result = await authPromise 236 + 237 + expect(result).toBeNull() 238 + 239 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 240 + expect(responseData.success).toBeTruthy() 241 + expect(responseData.data).toMatchObject({ 242 + typ: 'err', 243 + msg: 'preauth.authn', 244 + err: { 245 + code: 401, 246 + detail: '401 Unauthorized: bad signature', 247 + }, 248 + }) 249 + }, 1000) 250 + 251 + it('should reject authentication with invalid signature', async () => { 252 + const ws = await socket.connect() 253 + const realmid = RealmBrand.generate() 254 + const identid = IdentBrand.generate() 255 + const keyPair = await generateSigningJwkPair() 256 + const wrongKeyPair = await generateSigningJwkPair() 257 + 258 + // Create realm with correct identity 259 + await ensureRealm(realmid, identid, keyPair.publicKey) 260 + 261 + // Sign with wrong private key 262 + const payload = { 263 + typ: 'req', 264 + msg: 'preauth.authn', 265 + seq: 'sequence', 266 + } satisfies PreauthAuthnRequest 267 + 268 + const jwt = await generateSignableJwt(payload) 269 + .setAudience(realmid) 270 + .setIssuer(identid) 271 + .setJti('nonce') 272 + .sign(wrongKeyPair.privateKey) 273 + 274 + const authPromise = drivePreauth(ws) 275 + socket.send(jwt) 276 + 277 + const response = await socket.nextMessage 278 + const result = await authPromise 279 + 280 + expect(result).toBeNull() 281 + 282 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 283 + expect(responseData.success).toBeTruthy() 284 + expect(responseData.data).toMatchObject({ 285 + typ: 'err', 286 + msg: 'preauth.authn', 287 + err: { 288 + code: 401, 289 + detail: '401 Unauthorized: bad signature', 290 + }, 291 + }) 292 + }, 1000) 293 + }) 294 + 295 + describe('preauth.exchange', () => { 296 + it('should successfully exchange an invitation', async () => { 297 + const ws = await socket.connect() 298 + const realmid = RealmBrand.generate() 299 + const inviterIdentid = IdentBrand.generate() 300 + const inviteeIdentid = IdentBrand.generate() 301 + const inviterKeyPair = await generateSigningJwkPair() 302 + const inviteeKeyPair = await generateSigningJwkPair() 303 + 304 + // create realm with inviter identity 305 + const realm = await ensureRealm(realmid, inviterIdentid, inviterKeyPair.publicKey) 306 + const nonce = crypto.randomUUID() 307 + 308 + // create the invitation; payload doesn't matter, just claims 309 + const inviteJwt = await generateSignableJwt({}) 310 + .setAudience(realmid) 311 + .setIssuer(inviterIdentid) 312 + .setJti(nonce) 313 + .sign(inviterKeyPair.privateKey) 314 + 315 + // create exchange request signed by invitee 316 + const inviteePubkeyJwk = await jwkExport.parseAsync(inviteeKeyPair.publicKey) 317 + const payload = { 318 + typ: 'req', 319 + msg: 'preauth.exchange', 320 + seq: 'sequence', 321 + dat: { 322 + pubkey: inviteePubkeyJwk, 323 + inviteJwt: inviteJwt, 324 + }, 325 + } satisfies PreauthExchangeInviteRequest 326 + 327 + const jwt = await generateSignableJwt(payload) 328 + .setAudience(realmid) 329 + .setIssuer(inviteeIdentid) 330 + .sign(inviteeKeyPair.privateKey) 331 + 332 + const authPromise = drivePreauth(ws) 333 + socket.send(jwt) 334 + 335 + const result = await authPromise 336 + expect(result).not.toBeNull() 337 + expect(result?.realm.realmid).toBe(realmid) 338 + expect(result?.identid).toBe(inviteeIdentid) 339 + 340 + const response = await socket.nextMessage 341 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 342 + expect(responseData.success).toBeTruthy() 343 + expect(responseData.data).toMatchObject({ 344 + typ: 'res', 345 + msg: 'preauth.authn', 346 + dat: expect.objectContaining({ 347 + peers: expect.arrayContaining(['server']), 348 + identities: expect.objectContaining({ 349 + [inviterIdentid]: expect.anything(), 350 + }), 351 + }), 352 + }) 353 + 354 + // verify nonce was consumed 355 + const nonceValid = realm.validateNonce(nonce) 356 + expect(nonceValid).toBe(false) 357 + }, 1000) 358 + 359 + it('should reject invitation without nonce', async () => { 360 + const ws = await socket.connect() 361 + const realmid = RealmBrand.generate() 362 + const inviterIdentid = IdentBrand.generate() 363 + const inviteeIdentid = IdentBrand.generate() 364 + const inviterKeyPair = await generateSigningJwkPair() 365 + const inviteeKeyPair = await generateSigningJwkPair() 366 + 367 + await ensureRealm(realmid, inviterIdentid, inviterKeyPair.publicKey) 368 + 369 + // create the invitation; payload doesn't matter, just claims 370 + // NO NONCE FOR THIS TEST 371 + const inviteJwt = await generateSignableJwt({}) 372 + .setAudience(realmid) 373 + .setIssuer(inviterIdentid) 374 + .sign(inviterKeyPair.privateKey) 375 + 376 + const inviteePubkeyJwk = await crypto.subtle.exportKey('jwk', inviteeKeyPair.publicKey) 377 + const payload = { 378 + typ: 'req', 379 + msg: 'preauth.exchange', 380 + seq: 'sequence', 381 + dat: { 382 + pubkey: inviteePubkeyJwk, 383 + inviteJwt: inviteJwt, 384 + }, 385 + } satisfies PreauthExchangeInviteRequest 386 + 387 + const jwt = await generateSignableJwt(payload) 388 + .setAudience(realmid) 389 + .setIssuer(inviteeIdentid) 390 + .sign(inviteeKeyPair.privateKey) 391 + 392 + const authPromise = drivePreauth(ws) 393 + socket.send(jwt) 394 + 395 + const response = await socket.nextMessage 396 + const result = await authPromise 397 + 398 + expect(result).toBeNull() 399 + 400 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 401 + expect(responseData.success).toBeTruthy() 402 + expect(responseData.data).toMatchObject({ 403 + typ: 'err', 404 + msg: 'preauth.authn', 405 + err: { 406 + code: 400, 407 + detail: '400 Bad Request: missing nonce', 408 + }, 409 + }) 410 + }, 1000) 411 + 412 + it('should reject invitation with reused nonce', async () => { 413 + const ws = await socket.connect() 414 + const realmid = RealmBrand.generate() 415 + const inviterIdentid = IdentBrand.generate() 416 + const inviteeIdentid = IdentBrand.generate() 417 + const inviterKeyPair = await generateSigningJwkPair() 418 + const inviteeKeyPair = await generateSigningJwkPair() 419 + 420 + const realm = await ensureRealm(realmid, inviterIdentid, inviterKeyPair.publicKey) 421 + 422 + // consume the nonce 423 + const nonce = crypto.randomUUID() 424 + realm.validateNonce(nonce) 425 + 426 + // create the invitation; payload doesn't matter, just claims 427 + const inviteJwt = await generateSignableJwt({}) 428 + .setAudience(realmid) 429 + .setIssuer(inviterIdentid) 430 + .setJti(nonce) 431 + .sign(inviterKeyPair.privateKey) 432 + 433 + const inviteePubkeyJwk = await crypto.subtle.exportKey('jwk', inviteeKeyPair.publicKey) 434 + const payload = { 435 + typ: 'req', 436 + msg: 'preauth.exchange', 437 + seq: 'sequence', 438 + dat: { 439 + pubkey: inviteePubkeyJwk, 440 + inviteJwt: inviteJwt, 441 + }, 442 + } satisfies PreauthExchangeInviteRequest 443 + 444 + const jwt = await generateSignableJwt(payload) 445 + .setAudience(realmid) 446 + .setIssuer(inviteeIdentid) 447 + .sign(inviteeKeyPair.privateKey) 448 + 449 + const authPromise = drivePreauth(ws) 450 + socket.send(jwt) 451 + 452 + const response = await socket.nextMessage 453 + const result = await authPromise 454 + 455 + expect(result).toBeNull() 456 + 457 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 458 + expect(responseData.success).toBeTruthy() 459 + expect(responseData.data).toMatchObject({ 460 + typ: 'err', 461 + msg: 'preauth.authn', 462 + err: { 463 + code: 403, 464 + detail: '403 Forbidden: nonce already seen', 465 + }, 466 + }) 467 + }, 1000) 468 + 469 + it('should reject invitation from unknown identity', async () => { 470 + const ws = await socket.connect() 471 + const realmid = RealmBrand.generate() 472 + const ownerIdentid = IdentBrand.generate() 473 + const unknownInviterIdentid = IdentBrand.generate() 474 + const inviteeIdentid = IdentBrand.generate() 475 + const ownerKeyPair = await generateSigningJwkPair() 476 + const unknownInviterKeyPair = await generateSigningJwkPair() 477 + const inviteeKeyPair = await generateSigningJwkPair() 478 + const nonce = crypto.randomUUID() 479 + 480 + // create realm with owner only 481 + await ensureRealm(realmid, ownerIdentid, ownerKeyPair.publicKey) 482 + 483 + // create the invitation; payload doesn't matter, just claims 484 + const inviteJwt = await generateSignableJwt({}) 485 + .setAudience(realmid) 486 + .setIssuer(unknownInviterIdentid) 487 + .setJti(nonce) 488 + .sign(unknownInviterKeyPair.privateKey) 489 + 490 + const inviteePubkeyJwk = await crypto.subtle.exportKey('jwk', inviteeKeyPair.publicKey) 491 + const payload = { 492 + typ: 'req', 493 + msg: 'preauth.exchange', 494 + seq: 'sequence', 495 + dat: { 496 + pubkey: inviteePubkeyJwk, 497 + inviteJwt: inviteJwt, 498 + }, 499 + } satisfies PreauthExchangeInviteRequest 500 + 501 + const jwt = await generateSignableJwt(payload) 502 + .setAudience(realmid) 503 + .setIssuer(inviteeIdentid) 504 + .sign(inviteeKeyPair.privateKey) 505 + 506 + const authPromise = drivePreauth(ws) 507 + socket.send(jwt) 508 + 509 + const response = await socket.nextMessage 510 + const result = await authPromise 511 + 512 + expect(result).toBeNull() 513 + 514 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 515 + expect(responseData.success).toBeTruthy() 516 + expect(responseData.data).toMatchObject({ 517 + typ: 'err', 518 + msg: 'preauth.authn', 519 + err: { 520 + code: 401, 521 + detail: '401 Unauthorized: bad signature', 522 + }, 523 + }) 524 + }, 1000) 525 + 526 + it('should reject invitation with invalid signature', async () => { 527 + const ws = await socket.connect() 528 + const realmid = RealmBrand.generate() 529 + const inviterIdentid = IdentBrand.generate() 530 + const inviteeIdentid = IdentBrand.generate() 531 + const inviterKeyPair = await generateSigningJwkPair() 532 + const wrongKeyPair = await generateSigningJwkPair() 533 + const inviteeKeyPair = await generateSigningJwkPair() 534 + 535 + await ensureRealm(realmid, inviterIdentid, inviterKeyPair.publicKey) 536 + const nonce = crypto.randomUUID() 537 + 538 + // create the invitation; payload doesn't matter, just claims 539 + const inviteJwt = await generateSignableJwt({}) 540 + .setAudience(realmid) 541 + .setIssuer(inviterIdentid) 542 + .setJti(nonce) 543 + .sign(wrongKeyPair.privateKey) 544 + 545 + const inviteePubkeyJwk = await crypto.subtle.exportKey('jwk', inviteeKeyPair.publicKey) 546 + const payload = { 547 + typ: 'req', 548 + msg: 'preauth.exchange', 549 + seq: 'sequence', 550 + dat: { 551 + pubkey: inviteePubkeyJwk, 552 + inviteJwt: inviteJwt, 553 + }, 554 + } satisfies PreauthExchangeInviteRequest 555 + 556 + const jwt = await generateSignableJwt(payload) 557 + .setAudience(realmid) 558 + .setIssuer(inviteeIdentid) 559 + .sign(inviteeKeyPair.privateKey) 560 + 561 + const authPromise = drivePreauth(ws) 562 + socket.send(jwt) 563 + 564 + const response = await socket.nextMessage 565 + const result = await authPromise 566 + 567 + expect(result).toBeNull() 568 + 569 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 570 + expect(responseData.success).toBeTruthy() 571 + expect(responseData.data).toMatchObject({ 572 + typ: 'err', 573 + msg: 'preauth.authn', 574 + err: { 575 + code: 401, 576 + detail: '401 Unauthorized: bad signature', 577 + }, 578 + }) 579 + }, 1000) 580 + 581 + it('should reject exchange for unknown realm', async () => { 582 + const ws = await socket.connect() 583 + const realmid = RealmBrand.generate() 584 + const inviterIdentid = IdentBrand.generate() 585 + const inviteeIdentid = IdentBrand.generate() 586 + const inviterKeyPair = await generateSigningJwkPair() 587 + const inviteeKeyPair = await generateSigningJwkPair() 588 + 589 + // don't create realm 590 + const nonce = crypto.randomUUID() 591 + 592 + // create the invitation; payload doesn't matter, just claims 593 + const inviteJwt = await generateSignableJwt({}) 594 + .setAudience(realmid) 595 + .setIssuer(inviterIdentid) 596 + .setJti(nonce) 597 + .sign(inviterKeyPair.privateKey) 598 + 599 + const inviteePubkeyJwk = await crypto.subtle.exportKey('jwk', inviteeKeyPair.publicKey) 600 + const payload = { 601 + typ: 'req', 602 + msg: 'preauth.exchange', 603 + seq: 'sequence', 604 + dat: { 605 + pubkey: inviteePubkeyJwk, 606 + inviteJwt: inviteJwt, 607 + }, 608 + } satisfies PreauthExchangeInviteRequest 609 + 610 + const jwt = await generateSignableJwt(payload) 611 + .setAudience(realmid) 612 + .setIssuer(inviteeIdentid) 613 + .sign(inviteeKeyPair.privateKey) 614 + 615 + const authPromise = drivePreauth(ws) 616 + socket.send(jwt) 617 + 618 + const response = await socket.nextMessage 619 + const result = await authPromise 620 + 621 + expect(result).toBeNull() 622 + 623 + const responseData = jsonCodec(preauthResSchema).safeParse(response) 624 + expect(responseData.success).toBeTruthy() 625 + expect(responseData.data).toMatchObject({ 626 + typ: 'err', 627 + msg: 'preauth.authn', 628 + err: { 629 + code: 404, 630 + detail: '404 Not Found: unknown realm', 631 + }, 632 + }) 633 + }, 1000) 634 + }) 635 + 636 + describe('timeout handling', () => { 637 + it('should timeout if no message received within timeout', async () => { 638 + const ws = await socket.connect() 639 + const authPromise = drivePreauth(ws, 250) 640 + await expect(authPromise).rejects.toThrow() 641 + }, 500) 642 + 643 + it('should respect external abort signal', async () => { 644 + const ws = await socket.connect() 645 + const controller = new AbortController() 646 + const authPromise = drivePreauth(ws, undefined, controller.signal) 647 + 648 + setTimeout(() => { 649 + controller.abort() 650 + }, 100) 651 + await expect(authPromise).rejects.toThrow() 652 + }, 500) 653 + }) 654 + 655 + describe('malformed requests', () => { 656 + it('should reject non-JWT messages', async () => { 657 + const ws = await socket.connect() 658 + const authPromise = drivePreauth(ws) 659 + 660 + socket.send('not a jwt') 661 + 662 + await expect(authPromise).rejects.toThrow() 663 + }, 1000) 664 + 665 + it('should reject JWT with invalid payload schema', async () => { 666 + const ws = await socket.connect() 667 + const realmid = RealmBrand.generate() 668 + const identid = IdentBrand.generate() 669 + const keyPair = await generateSigningJwkPair() 670 + 671 + const jwt = await generateSignableJwt({huh: 'wat'}) 672 + .setAudience(realmid) 673 + .setIssuer(identid) 674 + .sign(keyPair.privateKey) 675 + 676 + const authPromise = drivePreauth(ws) 677 + socket.send(jwt) 678 + 679 + await expect(authPromise).rejects.toThrow() 680 + }, 1000) 681 + }) 682 + })
+56 -14
src/realm/server/driver-preauth.ts
··· 1 + import WebSocket from 'isomorphic-ws' 1 2 import {match} from 'ts-pattern' 2 3 3 4 import {combineSignals, timeoutSignal} from '#lib/async/aborts' 4 - import {jwkImport} from '#lib/crypto/jwks' 5 + import {JWK, jwkExport, jwkImport} from '#lib/crypto/jwks' 5 6 import {jwtPayload, jwtSchema, JWTTokenPayload, verifyJwtToken} from '#lib/crypto/jwts' 6 7 import {normalizeProtocolError, ProtocolError} from '#lib/errors' 7 8 import {putSocket, takeSocket} from '#lib/socket' ··· 18 19 import {Realm} from './realm' 19 20 import {ensureRealm, fetchRealm} from './storage' 20 21 21 - export async function drivePreauth(ws: WebSocket, _signal?: AbortSignal) { 22 - const timeout = timeoutSignal(3_000) 22 + export type AuthenticatedIdentity = { 23 + realm: Realm 24 + identid: IdentID 25 + pubkey: CryptoKey 26 + pubjwk: JWK 27 + } 28 + 29 + export type PreauthOptions = { 30 + timeout?: number 31 + signal?: AbortSignal 32 + } 33 + 34 + /** 35 + * drives a websocket through the authentication state machine 36 + * 37 + * - within timeout (default 3 seconds) 38 + * - expect an authn, invite, or registration message, and handle appriately 39 + * 40 + * @param ws - the socket to drive 41 + * @param timeout - options 42 + * @param signal - an optional abort signal to cancel the machine 43 + * @returns a promise which resolves when the machine has completed (socket has disconnected) 44 + */ 45 + export async function drivePreauth( 46 + ws: WebSocket, 47 + timeoutms = 3_000, 48 + _signal?: AbortSignal, 49 + ): Promise<AuthenticatedIdentity | null> { 50 + const timeout = timeoutSignal(timeoutms) 23 51 const signal = combineSignals(_signal, timeout.signal) 24 52 25 53 try { ··· 54 82 }) 55 83 return auth 56 84 } catch (exc: unknown) { 57 - const error = normalizeProtocolError(exc) 85 + const error = normalizeProtocolError(exc, 401) 86 + if (error.status >= 500) throw error 87 + 58 88 putSocket<PreauthErrorResponse>(ws, { 59 89 typ: 'err', 60 90 msg: 'preauth.authn', ··· 64 94 detail: error.message, 65 95 }, 66 96 }) 67 - 68 - throw error 97 + return null 69 98 } 70 99 } finally { 71 100 timeout.cancel() 72 101 } 73 102 } 74 103 75 - async function preauthAuthenticate(realm: Realm, identid: IdentID, token: string) { 104 + async function preauthAuthenticate( 105 + realm: Realm, 106 + identid: IdentID, 107 + token: string, 108 + ): Promise<AuthenticatedIdentity> { 76 109 const pubkey = await realm.fetchIdentity(identid) 77 - if (pubkey == null) throw new ProtocolError('unknown identity', 401) 110 + if (pubkey == null) throw new ProtocolError('bad signature', 401) 78 111 79 112 await verifyJwtToken(token, pubkey) 80 - return {realm, identid, pubkey} 113 + 114 + const pubjwk = await jwkExport.parseAsync(pubkey) 115 + return {realm, identid, pubkey, pubjwk} 81 116 } 82 117 83 118 async function preauthRegister( ··· 95 130 realmid: RealmID, 96 131 jwt: JWTTokenPayload<PreauthExchangeInviteRequest>, 97 132 ): Promise<Realm | null> { 98 - const token = jwtSchema.parse(jwt.payload.dat.inviteJwt) 99 133 const realm = await fetchRealm(realmid) 100 134 if (!realm) return null 101 135 102 - // invitation has an unused nonce 103 - if (!token.claims.jti) throw new ProtocolError('invitation missing nonce!', 400) 104 - if (!realm.validateNonce(token.claims.jti)) throw new ProtocolError('invitation nonce already used!', 403) 136 + // invitation must have an unused nonce 137 + const token = jwtSchema.parse(jwt.payload.dat.inviteJwt) 138 + if (!token.claims.jti) throw new ProtocolError('missing nonce', 400) 139 + if (!realm.validateNonce(token.claims.jti)) throw new ProtocolError('nonce already seen', 403) 105 140 106 141 // invitation is issued and signed by the inviter 107 142 const inviter = IdentBrand.parse(token.claims.iss) 108 143 const inviterkey = await realm.fetchIdentity(inviter) 109 - if (inviterkey == null) throw new ProtocolError('invitation from unknown identity?', 403) 144 + if (inviterkey == null) throw new ProtocolError('bad signature', 401) 110 145 111 146 // looked everything up, but we still need check the signature 112 147 await verifyJwtToken(jwt.payload.dat.inviteJwt, inviterkey) 148 + 149 + // invitation is valid, add them to the realm! 150 + const invitee = IdentBrand.parse(jwt.claims.iss) 151 + const inviteekey = await jwkImport.parseAsync(jwt.payload.dat.pubkey) 152 + await realm.admitIdentity(invitee, inviteekey) 153 + 154 + // invitee is valid, add them to the realm! 113 155 return realm 114 156 }
+471
src/realm/server/driver-realm.spec.ts
··· 1 + import {afterEach, beforeEach, describe, expect, it} from '@jest/globals' 2 + import {WebSocket} from 'isomorphic-ws' 3 + import {z} from 'zod/v4' 4 + 5 + import {sleep} from '#lib/async/sleep' 6 + import {generateSignableJwt, generateSigningJwkPair, jwkExport} from '#lib/crypto/jwks' 7 + import {jwtPayload} from '#lib/crypto/jwts' 8 + import {exceptionMessageSchema, jsonCodec} from '#lib/schema' 9 + import {mockSocketServer} from '#lib/specs/helpers-socket' 10 + 11 + import { 12 + RealmAnnounceRequest, 13 + realmAnnounceResponseSchema, 14 + RealmBroadcastEvent, 15 + RealmPeerLeftEvent, 16 + realmPeerLeftEventSchema, 17 + RealmPingRequest, 18 + RealmPingResponse, 19 + realmPingResponseSchema, 20 + RealmSignalEvent, 21 + realmSignalEventSchema, 22 + } from '#realm/protocol/messages-realm' 23 + import {actionSchema, RealmAction} from '#realm/schema' 24 + import {IdentBrand, RealmBrand, RealmID} from '#realm/schema/brands' 25 + import {generate as generateTimestamp} from '#realm/schema/timestamp' 26 + 27 + import {AuthenticatedIdentity} from './driver-preauth' 28 + import {driveRealm} from './driver-realm' 29 + import {Realm} from './realm' 30 + 31 + const socket = mockSocketServer() 32 + 33 + async function createAuthenticatedConnection( 34 + realm: Realm, 35 + signal?: AbortSignal, 36 + ): Promise<{ws: WebSocket; auth: AuthenticatedIdentity; promise: Promise<unknown>}> { 37 + const identid = IdentBrand.generate() 38 + const keyPair = await generateSigningJwkPair() 39 + const pubkey = keyPair.publicKey 40 + const pubjwk = await jwkExport.parseAsync(pubkey) 41 + await realm.admitIdentity(identid, pubkey) 42 + 43 + const ws = await socket.connect() 44 + realm.attachSocket(identid, ws) 45 + 46 + const auth = {realm, identid, pubkey, pubjwk} 47 + const promise = driveRealm(ws, auth, signal) 48 + 49 + return {ws, auth, promise} 50 + } 51 + 52 + describe('driveRealm', () => { 53 + let realm: Realm 54 + let realmid: RealmID 55 + 56 + beforeEach(() => { 57 + realmid = RealmBrand.generate() 58 + realm = new Realm(realmid, ':memory:') 59 + }) 60 + 61 + afterEach(() => { 62 + realm.shutdown() 63 + }) 64 + 65 + describe('connection lifecycle', () => { 66 + it('should broadcast peer-left event when connection closes', async () => { 67 + const {ws: ws1, auth: auth1, promise: promise1} = await createAuthenticatedConnection(realm) 68 + const {ws: ws2, promise: promise2} = await createAuthenticatedConnection(realm) 69 + await socket.nextMessage // peer2 -> 1 70 + 71 + // close first connection 72 + ws1.close() 73 + await promise1 74 + 75 + // second peer should receive peer-left event 76 + const message = await socket.nextMessage 77 + const event = jsonCodec(realmPeerLeftEventSchema).parse(message) 78 + expect(event).toMatchObject({ 79 + typ: 'evt', 80 + msg: 'realm.peer-left', 81 + dat: { 82 + identid: auth1.identid, 83 + }, 84 + } satisfies RealmPeerLeftEvent) 85 + 86 + // cleanup 87 + ws2.close() 88 + await promise2 89 + }, 1000) 90 + }) 91 + 92 + describe('realm.ping', () => { 93 + it('should respond to ping request with current clocks', async () => { 94 + const {ws, auth, promise} = await createAuthenticatedConnection(realm) 95 + 96 + // record some actions to populate clocks 97 + const timestamp = generateTimestamp(auth.identid, Math.floor(Date.now() / 1000), 0) 98 + realm.recordActions([ 99 + { 100 + typ: 'act', 101 + msg: 'test.action', 102 + clk: timestamp, 103 + dat: {value: 1}, 104 + } satisfies RealmAction, 105 + ]) 106 + 107 + const pingRequest: RealmPingRequest = { 108 + typ: 'req', 109 + msg: 'realm.ping', 110 + seq: 'ping-1', 111 + dat: { 112 + clocks: {}, 113 + requestsSync: false, 114 + }, 115 + } 116 + socket.send(JSON.stringify(pingRequest)) 117 + 118 + // check response 119 + 120 + const message = await socket.nextMessage 121 + const response = jsonCodec(realmPingResponseSchema).parse(message) 122 + expect(response).toMatchObject({ 123 + typ: 'res', 124 + msg: 'realm.ping', 125 + seq: 'ping-1', 126 + dat: { 127 + clocks: { 128 + [auth.identid]: timestamp, 129 + }, 130 + }, 131 + } satisfies RealmPingResponse) 132 + 133 + ws.close() 134 + await promise 135 + }, 1000) 136 + 137 + it('should send sync delta when ping requests sync', async () => { 138 + const {ws, auth, promise} = await createAuthenticatedConnection(realm) 139 + const timestamp = generateTimestamp(auth.identid, Math.floor(Date.now() / 1000), 0) 140 + realm.recordActions([ 141 + { 142 + typ: 'act', 143 + msg: 'test.action', 144 + clk: timestamp, 145 + dat: {value: 1}, 146 + } satisfies RealmAction, 147 + ]) 148 + 149 + const pingRequest: RealmPingRequest = { 150 + typ: 'req', 151 + msg: 'realm.ping', 152 + seq: 'ping-sync', 153 + dat: { 154 + clocks: {}, 155 + requestsSync: true, 156 + }, 157 + } 158 + socket.send(JSON.stringify(pingRequest)) 159 + 160 + // should receive ping response 161 + const pingResponseMessage = await socket.nextMessage 162 + const pingResponse = jsonCodec(realmPingResponseSchema).safeParse(pingResponseMessage) 163 + expect(pingResponse.success).toBeTruthy() 164 + 165 + // should also receive sync delta (array of actions) 166 + const syncDeltaMessage = await socket.nextMessage 167 + const syncDeltaResponse = jsonCodec(z.array(actionSchema)).safeParse(syncDeltaMessage) 168 + expect(syncDeltaResponse.success).toBeTruthy() 169 + expect(syncDeltaResponse.data).toHaveLength(1) 170 + expect(syncDeltaResponse.data).toContainEqual({ 171 + typ: 'act', 172 + msg: 'test.action', 173 + clk: timestamp, 174 + dat: {value: 1}, 175 + }) 176 + 177 + ws.close() 178 + await promise 179 + }, 1000) 180 + }) 181 + 182 + describe('realm.announce', () => { 183 + it('should respond to announce request with server info', async () => { 184 + const {ws, promise} = await createAuthenticatedConnection(realm) 185 + 186 + // send announce request 187 + const announceRequest: RealmAnnounceRequest = { 188 + typ: 'req', 189 + msg: 'realm.announce', 190 + seq: 'announce-1', 191 + dat: { 192 + clocks: {}, 193 + requestsSync: false, 194 + deviceCaps: { 195 + corsFetch: false, 196 + networkQuality: 3, 197 + }, 198 + deviceInfo: { 199 + name: 'test-client', 200 + ua: 'test-ua', 201 + }, 202 + }, 203 + } 204 + socket.send(JSON.stringify(announceRequest)) 205 + 206 + // should receive announce response 207 + const message = await socket.nextMessage 208 + const response = jsonCodec(realmAnnounceResponseSchema).parse(message) 209 + 210 + expect(response).toMatchObject({ 211 + typ: 'res', 212 + msg: 'realm.announce', 213 + seq: 'announce-1', 214 + dat: expect.objectContaining({ 215 + clocks: expect.any(Object), 216 + deviceCaps: { 217 + corsFetch: true, 218 + networkQuality: 5, 219 + }, 220 + deviceInfo: expect.objectContaining({ 221 + name: expect.any(String), 222 + ua: expect.any(String), 223 + }), 224 + }), 225 + }) 226 + 227 + ws.close() 228 + await promise 229 + }, 1000) 230 + 231 + it('should send sync delta when announce requests sync', async () => { 232 + const {ws, auth, promise} = await createAuthenticatedConnection(realm) 233 + const timestamp = generateTimestamp(auth.identid, Math.floor(Date.now() / 1000), 0) 234 + realm.recordActions([ 235 + { 236 + typ: 'act', 237 + msg: 'test.action', 238 + clk: timestamp, 239 + dat: {value: 1}, 240 + } satisfies RealmAction, 241 + ]) 242 + 243 + const announceRequest: RealmAnnounceRequest = { 244 + typ: 'req', 245 + msg: 'realm.announce', 246 + seq: 'announce-sync', 247 + dat: { 248 + clocks: {}, 249 + requestsSync: true, 250 + }, 251 + } 252 + socket.send(JSON.stringify(announceRequest)) 253 + 254 + // Should receive announce response 255 + const announceMessage = await socket.nextMessage 256 + const announceResponse = jsonCodec(realmAnnounceResponseSchema).safeParse(announceMessage) 257 + expect(announceResponse.success).toBeTruthy() 258 + 259 + // Should also receive sync delta 260 + const syncMessage = await socket.nextMessage 261 + const syncResponse = jsonCodec(z.array(actionSchema)).safeParse(syncMessage) 262 + expect(syncResponse.success).toBeTruthy() 263 + expect(syncResponse.data).toHaveLength(1) 264 + 265 + ws.close() 266 + await promise 267 + }, 1000) 268 + }) 269 + 270 + describe('action recording', () => { 271 + it('should record actions sent by client', async () => { 272 + const {ws, auth, promise} = await createAuthenticatedConnection(realm) 273 + 274 + // send actions 275 + const timestamp = generateTimestamp(auth.identid, Math.floor(Date.now() / 1000), 0) 276 + const actions: RealmAction[] = [ 277 + { 278 + typ: 'act', 279 + msg: 'test.action', 280 + clk: timestamp, 281 + dat: {value: 1}, 282 + }, 283 + ] 284 + socket.send(JSON.stringify(actions)) 285 + 286 + // verify actions were recorded 287 + await sleep(50) // let it flow 288 + const syncState = realm.buildSyncState() 289 + expect(syncState[auth.identid]).toBe(timestamp) 290 + 291 + ws.close() 292 + await promise 293 + }, 1000) 294 + 295 + it('should handle multiple actions in a batch', async () => { 296 + const {ws, auth, promise} = await createAuthenticatedConnection(realm) 297 + 298 + // send multiple actions 299 + const ts1 = generateTimestamp(auth.identid, Math.floor(Date.now() / 1000), 0) 300 + const ts2 = generateTimestamp(auth.identid, Math.floor(Date.now() / 1000), 1) 301 + const actions: RealmAction[] = [ 302 + {typ: 'act', msg: 'test.action1', clk: ts1, dat: {value: 1}}, 303 + {typ: 'act', msg: 'test.action2', clk: ts2, dat: {value: 2}}, 304 + ] 305 + socket.send(JSON.stringify(actions)) 306 + 307 + // Give it a moment to process 308 + await new Promise((resolve) => setTimeout(resolve, 50)) 309 + 310 + // verify both actions were recorded 311 + const syncState = realm.buildSyncState() 312 + expect(syncState[auth.identid]).toBe(ts2) 313 + 314 + const delta = realm.buildSyncDelta({}) 315 + expect(delta).toHaveLength(2) 316 + 317 + ws.close() 318 + await promise 319 + }, 1000) 320 + }) 321 + 322 + describe('realm.broadcast', () => { 323 + it('should broadcast message to other peers', async () => { 324 + const {ws: ws1, promise: promise1} = await createAuthenticatedConnection(realm) 325 + const {ws: ws2, promise: promise2} = await createAuthenticatedConnection(realm) 326 + await socket.nextMessage // peer-joined (peer2 -> peer1) 327 + 328 + // send broadcast from peer1 329 + const broadcastEvent: RealmBroadcastEvent = { 330 + typ: 'evt', 331 + msg: 'realm.broadcast', 332 + dat: { 333 + payload: {message: 'hello world'}, 334 + recipients: false, // broadcast to all except sender 335 + }, 336 + } 337 + socket.send(JSON.stringify(broadcastEvent)) 338 + await sleep(50) 339 + 340 + // TODO: make sure we only get one more message on the socket 341 + 342 + ws1.close() 343 + ws2.close() 344 + await promise1 345 + await promise2 346 + }, 1000) 347 + 348 + it('should broadcast to specific recipients', async () => { 349 + const {ws: ws1, promise: promise1} = await createAuthenticatedConnection(realm) 350 + const {ws: ws2, auth: auth2, promise: promise2} = await createAuthenticatedConnection(realm) 351 + await socket.nextMessage // peer joined -> 1 352 + 353 + const {ws: ws3, promise: promise3} = await createAuthenticatedConnection(realm) 354 + await socket.nextMessage // peer joined -> 2 355 + await socket.nextMessage // peer joined -> 1 356 + 357 + // Send broadcast to specific recipient 358 + const broadcastEvent: RealmBroadcastEvent = { 359 + typ: 'evt', 360 + msg: 'realm.broadcast', 361 + dat: { 362 + payload: {message: 'direct message'}, 363 + recipients: [auth2.identid], 364 + }, 365 + } 366 + 367 + socket.send(JSON.stringify(broadcastEvent)) 368 + await sleep(50) 369 + 370 + // TODO: make sure there's only one message on the server 371 + 372 + ws1.close() 373 + ws2.close() 374 + ws3.close() 375 + await promise1 376 + await promise2 377 + await promise3 378 + }, 1000) 379 + }) 380 + 381 + describe('realm.signal', () => { 382 + it('should relay signal to specific peer', async () => { 383 + const {ws: ws1, auth: auth1, promise: promise1} = await createAuthenticatedConnection(realm) 384 + const {ws: ws2, auth: auth2, promise: promise2} = await createAuthenticatedConnection(realm) 385 + await socket.nextMessage // peer joined -> 1 386 + 387 + // create a signed signal 388 + const keyPair = await generateSigningJwkPair() 389 + const signedPayload = {initiator: true} 390 + const signedJwt = await generateSignableJwt(signedPayload) 391 + .setIssuer(auth1.identid) 392 + .setAudience(auth2.identid) 393 + .sign(keyPair.privateKey) 394 + 395 + // send signal event 396 + const signalEvent = { 397 + typ: 'evt', 398 + msg: 'realm.signal', 399 + dat: { 400 + localid: auth1.identid, 401 + remoteid: auth2.identid, 402 + signed: signedJwt, 403 + }, 404 + } satisfies RealmSignalEvent 405 + socket.send(JSON.stringify(signalEvent)) 406 + await sleep(50) 407 + 408 + // we got the signal 409 + const signalData = await socket.nextMessage 410 + const signalMessage = jsonCodec(realmSignalEventSchema).safeParse(signalData) 411 + expect(signalMessage.success).toBeTruthy() 412 + 413 + // and it's got the payload in the signed jwt 414 + const message = await jwtPayload(z.object({initiator: z.boolean()})).safeParseAsync( 415 + signalMessage.data?.dat.signed, 416 + ) 417 + expect(message.success).toBeTruthy() 418 + expect(message.data?.payload.initiator).toBe(true) 419 + 420 + ws1.close() 421 + ws2.close() 422 + await promise1 423 + await promise2 424 + }, 1000) 425 + }) 426 + 427 + describe('error handling', () => { 428 + it('should handle malformed messages gracefully', async () => { 429 + const {ws, promise} = await createAuthenticatedConnection(realm) 430 + 431 + socket.send('not valid json') 432 + 433 + const errorMessage = await socket.nextMessage 434 + const errorResult = jsonCodec(exceptionMessageSchema).safeParse(errorMessage) 435 + expect(errorResult.success).toBeTruthy() 436 + 437 + ws.close() 438 + await promise 439 + }, 1000) 440 + 441 + it('should handle unknown message types', async () => { 442 + const {ws, promise} = await createAuthenticatedConnection(realm) 443 + 444 + socket.send( 445 + JSON.stringify({ 446 + typ: 'req', 447 + msg: 'realm.unknown', 448 + seq: 'unknown-1', 449 + dat: {}, 450 + }), 451 + ) 452 + 453 + const errorMessage = await socket.nextMessage 454 + const errorResult = jsonCodec(exceptionMessageSchema).safeParse(errorMessage) 455 + expect(errorResult.success).toBeTruthy() 456 + 457 + ws.close() 458 + await promise 459 + }, 1000) 460 + }) 461 + 462 + describe('abort signal handling', () => { 463 + it('should stop processing when abort signal is triggered', async () => { 464 + const controller = new AbortController() 465 + const {promise} = await createAuthenticatedConnection(realm, controller.signal) 466 + 467 + controller.abort() 468 + await expect(promise).rejects.toThrow() 469 + }, 1000) 470 + }) 471 + })
+22 -24
src/realm/server/driver-realm.ts
··· 1 - import {JWK, jwkExport} from '#lib/crypto/jwks' 1 + import WebSocket from 'isomorphic-ws' 2 + 3 + import {JWK} from '#lib/crypto/jwks' 2 4 import {normalizeProtocolError, ProtocolError} from '#lib/errors' 3 - import {makeExceptionMessage} from '#lib/schema' 5 + import {jsonCodec, makeExceptionMessage} from '#lib/schema' 4 6 import {putSocket, streamSocketSchema} from '#lib/socket' 5 7 6 8 import { ··· 16 18 import {IdentID} from '#realm/schema/brands' 17 19 import {PeerClocks} from '#realm/schema/timestamp' 18 20 21 + import {AuthenticatedIdentity} from './driver-preauth' 19 22 import {Realm} from './realm' 20 23 21 - export async function driveRealm( 22 - ws: WebSocket, 23 - realm: Realm, 24 - identid: IdentID, 25 - pubkey: CryptoKey, 26 - signal?: AbortSignal, 27 - ) { 28 - const pubjwk = await jwkExport.parseAsync(pubkey) 29 - broadcastPeerJoined(realm, identid, pubjwk) 24 + const realmServerParser = jsonCodec(realmServerMessageSchema) 30 25 26 + export async function driveRealm(ws: WebSocket, auth: AuthenticatedIdentity, signal?: AbortSignal) { 31 27 try { 32 - const stream = streamSocketSchema(ws, realmServerMessageSchema, {signal}) 28 + broadcastPeerJoined(auth.realm, auth.identid, auth.pubjwk) 29 + 30 + const stream = streamSocketSchema(ws, realmServerParser, {signal, skipErrors: true}) 33 31 for await (const data of stream) { 34 - driveMessage(ws, realm, identid, data, signal) 32 + if (data.success) { 33 + driveMessage(ws, auth.realm, auth.identid, data.data) 34 + } else { 35 + putError(ws, data.error) 36 + } 35 37 } 36 38 } catch (exc: unknown) { 37 39 putError(ws, exc) // error in socket stream 40 + throw exc 38 41 } finally { 39 - broadcastPeerLeft(realm, identid) 42 + broadcastPeerLeft(auth.realm, auth.identid) 40 43 } 41 44 } 42 45 ··· 51 54 return 52 55 53 56 case 'realm.signal': 54 - realmBroadcast(realm, identid, data.dat, [data.dat.remoteid]) 57 + realmBroadcast(realm, identid, data, [data.dat.remoteid]) 55 58 return 56 59 57 60 case 'realm.announce': 58 - realmAnnounce(ws, realm, identid, data) 61 + realmAnnounce(ws, realm, data) 59 62 return 60 63 61 64 case 'realm.ping': 62 - realmPing(ws, realm, identid, data) 65 + realmPing(ws, realm, data) 63 66 return 64 67 65 68 default: ··· 107 110 } 108 111 } 109 112 110 - function realmPing(ws: WebSocket, realm: Realm, identid: IdentID, req: RealmPingRequest) { 111 - console.debug('ping from', identid, req.dat) 112 - 113 - // respond immediately 113 + function realmPing(ws: WebSocket, realm: Realm, req: RealmPingRequest) { 114 114 const clocks = realm.buildSyncState() 115 115 putSocket<RealmPingResponse>(ws, { 116 116 typ: 'res', ··· 125 125 } 126 126 } 127 127 128 - function realmAnnounce(ws: WebSocket, realm: Realm, identid: IdentID, req: RealmAnnounceRequest) { 129 - console.debug('announce from', identid, req.dat) 128 + function realmAnnounce(ws: WebSocket, realm: Realm, req: RealmAnnounceRequest) { 130 129 // TODO: store device caps in the identities table? 131 130 132 - // respond immediately 133 131 const clocks = realm.buildSyncState() 134 132 putSocket<RealmAnnounceResponse>(ws, { 135 133 typ: 'res',
+11 -5
src/realm/server/driver.ts
··· 1 1 import {normalizeProtocolError} from '#lib/errors' 2 2 import {makeExceptionMessage} from '#lib/schema' 3 3 import {putSocket} from '#lib/socket' 4 + import {WebSocket} from 'isomorphic-ws' 4 5 5 6 import {drivePreauth} from './driver-preauth' 6 7 import {driveRealm} from './driver-realm' ··· 12 13 * @param ws - the socket to drive through the machine 13 14 * @returns a promise which resolves when the socket session completes 14 15 */ 15 - export async function machine(ws: WebSocket, signal?: AbortSignal) { 16 + export async function driveSocket(ws: WebSocket, signal?: AbortSignal) { 16 17 try { 17 - const {realm, identid, pubkey} = await drivePreauth(ws, signal) 18 + const auth = await drivePreauth(ws, 3_000, signal) 19 + if (auth == null) return 20 + 21 + const {realm} = auth 18 22 try { 19 - realm.attachSocket(identid, ws) 20 - await driveRealm(ws, realm, identid, pubkey, signal) 23 + realm.attachSocket(auth.identid, ws) 24 + console.log('attached socket:', auth.identid) 25 + await driveRealm(ws, auth, signal) 21 26 } finally { 22 - realm.detachSocket(identid, ws) 27 + console.log('detached socket:', auth.identid) 28 + realm.detachSocket(auth.identid, ws) 23 29 } 24 30 } catch (exc: unknown) { 25 31 const error = normalizeProtocolError(exc)
+3 -1
src/realm/server/realm.spec.ts
··· 1 + import {beforeEach, describe, expect, it} from '@jest/globals' 2 + import {WebSocket} from 'isomorphic-ws' 3 + 1 4 import {RealmAction} from '#realm/schema' 2 5 import {IdentBrand, RealmBrand} from '#realm/schema/brands' 3 6 import {generate as generateTimestamp} from '#realm/schema/timestamp' 4 - import {beforeEach, describe, expect, it} from '@jest/globals' 5 7 6 8 import {Realm} from './realm' 7 9
+1
src/realm/server/realm.ts
··· 1 + import {WebSocket} from 'isomorphic-ws' 1 2 import {DatabaseSync} from 'node:sqlite' 2 3 3 4 import {JWK, jwkExport, jwkImport, jwkSchema} from '#lib/crypto/jwks'