offline-first, p2p synced, atproto enabled, feed reader
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactoring of client side connection stuff

+1664 -233
+13 -3
src/feedline/client/context/identity.tsx
··· 1 - import {ParentProps, createContext, createMemo, createResource, useContext} from 'solid-js' 1 + import {ParentProps, createContext, createMemo, createResource, onCleanup, useContext} from 'solid-js' 2 2 3 - import {RealmIdentityStore} from '#realm/client/identity-store.js' 3 + import {RealmIdentityStore} from '#realm/client/identity-store' 4 4 5 5 const makeIdentityContext = () => { 6 6 const store = createMemo(() => new RealmIdentityStore()) 7 7 const [identity] = createResource(() => store().ensure()) 8 - return identity 8 + 9 + onCleanup(() => { 10 + identity()?.destroy() 11 + }) 12 + 13 + return { 14 + identity, 15 + store, 16 + register: (upstream: string) => identity()?.register(upstream), 17 + exchange: (upstream: string, invitation: string) => identity()?.exchangeInvite(upstream, invitation), 18 + } 9 19 } 10 20 11 21 export type IdentityContext = ReturnType<typeof makeIdentityContext>
+14 -3
src/feedline/client/index.tsx
··· 12 12 type MyAction = z.infer<typeof myActionSchema> 13 13 14 14 function Name() { 15 + const {identity, store} = useIdentityContext() 16 + 15 17 createEffect(() => { 16 18 console.log('identity:', identity.loading, identity.error, identity()) 17 19 }) 18 20 19 - const identity = useIdentityContext() 20 - const addAction = async () => { 21 - await identity()?.dispatchAction<MyAction>('my-action', identity()?.identid ?? IdentBrand.generate()) 21 + const addAction = () => { 22 + identity() 23 + ?.dispatchAction<MyAction>('my-action', identity()?.identid ?? IdentBrand.generate()) 24 + .catch((exc: unknown) => { 25 + console.error('error dispatching action', exc) 26 + }) 27 + } 28 + 29 + const registerAction = () => { 30 + identity()?.register('ws://localhost:4000/stream') 22 31 } 23 32 24 33 return ( ··· 26 35 <h2>loading: {identity.loading}</h2> 27 36 <h2>error: {identity.error}</h2> 28 37 <button onClick={addAction}>Add</button> 38 + <button onClick={registerAction}>Register</button> 29 39 </> 30 40 ) 31 41 } ··· 35 45 <IdentityProvider> 36 46 <Layout> 37 47 <h2>Hi</h2> 48 + <Name /> 38 49 </Layout> 39 50 </IdentityProvider> 40 51 )
-4
src/feedline/client/layout.tsx
··· 1 1 import {ParentProps} from 'solid-js' 2 2 3 - import {useIdentityContext} from './context/identity' 4 - 5 3 export default function Layout(props: ParentProps) { 6 - const identity = useIdentityContext() 7 4 return ( 8 5 <> 9 6 <header> 10 7 <span>feedline</span> 11 - <span>{identity()?.identid}</span> 12 8 </header> 13 9 <nav>sidebar nav in desktop</nav> 14 10 <main>
+11
src/lib/async/aborts.ts
··· 23 23 return {signal: controller.signal, cancel} 24 24 } 25 25 26 + export function timeoutAbort(fn: () => void, ms: number, signal?: AbortSignal): {cancel: () => void} { 27 + const timeout = setTimeout(fn, ms) 28 + const cancel = () => { 29 + clearTimeout(timeout) 30 + signal?.removeEventListener('abort', cancel) 31 + } 32 + 33 + signal?.addEventListener('abort', cancel) 34 + return {cancel} 35 + } 36 + 26 37 /** 27 38 * @param signals - the list of signals to combine 28 39 * @returns a combined signal controller, which will abort when any given signal does, or be automatically aborted
+14
src/lib/async/events.ts
··· 1 + export function waitEvent(type: string, target: EventTarget, signal?: AbortSignal): Promise<Event> { 2 + const {promise, resolve, reject} = Promise.withResolvers<Event>() 3 + 4 + const handler = (event: Event) => { 5 + signal?.throwIfAborted() 6 + target.removeEventListener(type, handler) 7 + resolve(event) 8 + } 9 + 10 + signal?.addEventListener('abort', reject) 11 + target.addEventListener(type, handler) 12 + 13 + return promise 14 + }
+74
src/lib/async/fence.ts
··· 1 + import {normalizeError} from '#lib/errors' 2 + 3 + /** 4 + * Simple async fence, for blocking many async ops until a condition is met 5 + */ 6 + export class Fence { 7 + #open: boolean 8 + #resolvers: Array<() => void> = [] 9 + 10 + constructor(open = false) { 11 + this.#open = open 12 + } 13 + 14 + get value() { 15 + return this.#open 16 + } 17 + 18 + set value(v: boolean) { 19 + if (v) { 20 + this.open() 21 + } else { 22 + this.close() 23 + } 24 + } 25 + 26 + enter(signal?: AbortSignal): Promise<boolean> { 27 + if (this.#open) return Promise.resolve(true) 28 + 29 + return new Promise((resolve, reject) => { 30 + if (this.#open) { 31 + resolve(true) 32 + } else if (signal?.aborted) { 33 + reject(normalizeError(signal.reason)) 34 + } else { 35 + let cleanup: (() => void) | undefined 36 + const resolver = () => { 37 + cleanup?.() 38 + resolve(true) 39 + } 40 + 41 + if (signal) { 42 + const onabort = () => { 43 + const idx = this.#resolvers.indexOf(resolver) 44 + if (idx >= 0) { 45 + this.#resolvers.splice(idx, 1) 46 + } 47 + 48 + reject(normalizeError(signal.reason)) 49 + } 50 + 51 + signal.addEventListener('abort', onabort, {once: true}) 52 + cleanup = () => { 53 + signal.removeEventListener('abort', onabort) 54 + } 55 + } 56 + 57 + this.#resolvers.push(resolver) 58 + } 59 + }) 60 + } 61 + 62 + open(): void { 63 + this.#open = true 64 + 65 + const waiters = [...this.#resolvers] 66 + this.#resolvers = [] 67 + 68 + waiters.forEach(queueMicrotask) 69 + } 70 + 71 + close(): void { 72 + this.#open = false 73 + } 74 + }
+24 -3
src/lib/async/sleep.ts
··· 26 26 } 27 27 } 28 28 29 - export function backoff(options?: {maxAttempts?: number; baseDelay?: number; maxDelay?: number}) { 29 + export function backoff(options?: { 30 + maxAttempts?: number 31 + baseDelay?: number 32 + maxDelay?: number 33 + }): ((signal?: AbortSignal) => Promise<void>) & {attempts: number; delay: number} { 30 34 const maxAttempts = options?.maxAttempts ?? 10 31 35 const baseDelay = options?.baseDelay ?? 1000 32 36 const maxDelay = options?.maxDelay ?? 30_000 33 37 34 38 let attempts = 0 39 + let currentDelay = 0 40 + 35 41 const nextDelay = () => { 36 42 return attempts === 0 37 43 ? attempts++ // immediate at 0 38 44 : Math.min(baseDelay * Math.pow(2, attempts++ - 1), maxDelay) 39 45 } 40 46 41 - return async (signal?: AbortSignal) => { 47 + const delayFn = async (signal?: AbortSignal) => { 42 48 signal?.throwIfAborted() 43 49 if (attempts > maxAttempts) throw new Error('exceeded max attempts!') 44 50 45 - await sleep(nextDelay(), signal) 51 + currentDelay = nextDelay() 52 + await sleep(currentDelay, signal) 46 53 } 54 + 55 + Object.defineProperty(delayFn, 'attempts', { 56 + get() { 57 + return attempts 58 + }, 59 + }) 60 + 61 + Object.defineProperty(delayFn, 'delay', { 62 + get() { 63 + return currentDelay 64 + }, 65 + }) 66 + 67 + return delayFn as ((signal?: AbortSignal) => Promise<void>) & {attempts: number; delay: number} 47 68 }
+316
src/lib/client/webrtc.ts
··· 1 + import {match} from 'ts-pattern' 2 + import {z} from 'zod/v4' 3 + 4 + import {controllerWithSignals, timeoutAbort} from '#lib/async/aborts' 5 + import {BlockingQueue} from '#lib/async/blocking-queue' 6 + import {Fence} from '#lib/async/fence' 7 + import {normalizeError} from '#lib/errors' 8 + import {TypedEventTarget} from '#lib/events' 9 + 10 + export const rtcSessionDescriptionSchema = z.object({ 11 + sdp: z.string().optional(), 12 + type: z.enum(['answer', 'offer', 'pranswer', 'rollback']), 13 + }) 14 + 15 + export const rtcIceCandidateSchema = z.object({ 16 + candidate: z.string().optional(), 17 + sdpMLineIndex: z.number().nullish(), 18 + sdpMid: z.string().nullish(), 19 + usernameFragment: z.string().nullish(), 20 + }) 21 + 22 + export const rtcSignalDataSchema = z.discriminatedUnion('type', [ 23 + z.object({type: z.literal('offer'), sdp: rtcSessionDescriptionSchema}), 24 + z.object({type: z.literal('answer'), sdp: rtcSessionDescriptionSchema}), 25 + z.object({type: z.literal('candidate'), candidate: rtcIceCandidateSchema}), 26 + ]) 27 + 28 + export const rtcSignalPayloadSchema = z.object({ 29 + signal: rtcSignalDataSchema, 30 + initiator: z.boolean(), 31 + }) 32 + 33 + export const peerConnectionStatsSchema = z.object({ 34 + type: z.literal('candidate-pair'), 35 + state: z.literal('succeeded'), 36 + currentRoundTripTime: z.number().optional(), 37 + bytesReceived: z.number().optional(), 38 + bytesSent: z.number().optional(), 39 + }) 40 + 41 + export type RTCSignalData = z.infer<typeof rtcSignalDataSchema> 42 + export type RTCSignalPayload = z.infer<typeof rtcSignalPayloadSchema> 43 + 44 + export type PeerConnectionStats = z.infer<typeof peerConnectionStatsSchema> 45 + export type DataChannelSendable = string | Blob | ArrayBuffer | ArrayBufferView<ArrayBuffer> 46 + 47 + export type DataChannelEventMap = { 48 + signal: CustomEvent<RTCSignalData> 49 + connect: CustomEvent<never> 50 + close: CustomEvent<never> 51 + error: CustomEvent<Error> 52 + data: CustomEvent<string | ArrayBuffer> 53 + } 54 + 55 + const BUFFER_LOWWATER = 512 * 1024 // ~512KB 56 + 57 + export class DataChannelPeer< 58 + T extends DataChannelEventMap = DataChannelEventMap, 59 + > extends TypedEventTarget<T> { 60 + readonly initiator: boolean 61 + 62 + #peer: RTCPeerConnection 63 + #chan: RTCDataChannel | null = null 64 + #candidates: RTCIceCandidateInit[] = [] 65 + 66 + #abort: AbortController 67 + #queue = new BlockingQueue<DataChannelSendable>() 68 + 69 + #polite: boolean 70 + #pendingOffer: RTCSessionDescriptionInit | null = null 71 + #ignoreOffer = false 72 + 73 + constructor(initiator: boolean, config?: RTCConfiguration, signal?: AbortSignal) { 74 + super() 75 + 76 + this.initiator = initiator 77 + this.#polite = !initiator 78 + 79 + this.#abort = controllerWithSignals(signal) 80 + const opts = {signal: this.#abort.signal} 81 + 82 + this.#peer = new RTCPeerConnection(config) 83 + this.#peer.addEventListener('icecandidate', this.#onPeerCandidate, opts) 84 + this.#peer.addEventListener('connectionstatechange', this.#onPeerState, opts) 85 + 86 + if (initiator) { 87 + this.#chan = this.#peer.createDataChannel('data', {ordered: true, maxRetransmits: 10}) 88 + this.#setupDataChannel() 89 + this.#advertiseDataChannel().catch((exc: unknown) => { 90 + console.error('unexpected error in data channel offer', exc) 91 + this.dispatchCustomEvent('error', normalizeError(exc)) 92 + }) 93 + } else { 94 + this.#peer.addEventListener( 95 + 'datachannel', 96 + (event) => { 97 + this.#chan = event.channel 98 + this.#setupDataChannel() 99 + }, 100 + opts, 101 + ) 102 + } 103 + } 104 + 105 + async connectionStats() { 106 + const stats = await this.#peer.getStats() 107 + for (const report of stats.values()) { 108 + const parsed = peerConnectionStatsSchema.safeParse(report) 109 + if (parsed.success) return parsed.data 110 + } 111 + 112 + return null 113 + } 114 + 115 + #onPeerCandidate = (event: RTCPeerConnectionIceEvent) => { 116 + if (event.candidate) { 117 + this.dispatchCustomEvent('signal', {type: 'candidate', candidate: event.candidate.toJSON()}) 118 + } 119 + } 120 + 121 + #onPeerState = () => { 122 + const state = this.#peer.connectionState 123 + if (state === 'failed') { 124 + this.#restartIce().catch(() => { 125 + this.dispatchCustomEvent('error', new Error('ice restart failed')) 126 + this.dispatchCustomEvent('close') 127 + }) 128 + } else if (state === 'disconnected') { 129 + timeoutAbort( 130 + () => { 131 + if (this.#peer.connectionState === 'disconnected') { 132 + this.dispatchCustomEvent('close') 133 + } 134 + }, 135 + 5_000, 136 + this.#abort.signal, 137 + ) 138 + } else if (state === 'closed') { 139 + this.dispatchCustomEvent('close') 140 + } 141 + } 142 + 143 + async #restartIce() { 144 + console.log('restarting ice negotiation for', this.#peer) 145 + 146 + // nobody yet, we're starting over 147 + this.#candidates = [] 148 + 149 + // trigger ice restart 150 + const offer = await this.#peer.createOffer({iceRestart: true}) 151 + await this.#peer.setLocalDescription(offer) 152 + if (this.#peer.localDescription) { 153 + this.dispatchCustomEvent('signal', {type: 'offer', sdp: this.#peer.localDescription.toJSON()}) 154 + } 155 + } 156 + 157 + #setupDataChannel() { 158 + if (!this.#chan) return 159 + 160 + const opts = {signal: this.#abort.signal} 161 + 162 + this.#chan.addEventListener( 163 + 'open', 164 + () => { 165 + this.dispatchCustomEvent('connect') 166 + }, 167 + opts, 168 + ) 169 + 170 + this.#chan.addEventListener('close', () => this.dispatchCustomEvent('close'), opts) 171 + this.#chan.addEventListener('error', (e) => this.dispatchCustomEvent('error', normalizeError(e)), opts) 172 + this.#chan.addEventListener( 173 + 'message', 174 + (e: MessageEvent<string | ArrayBuffer>) => this.dispatchCustomEvent('data', e.data), 175 + opts, 176 + ) 177 + 178 + // drain loop blocks on lowwater, which it handles internally 179 + void this.#drainLoop(this.#chan).catch((exc: unknown) => { 180 + console.error('unexpected error in peer drain loop!', exc) 181 + this.dispatchCustomEvent('error', normalizeError(exc)) 182 + }) 183 + } 184 + 185 + async #advertiseDataChannel() { 186 + try { 187 + this.#pendingOffer = await this.#peer.createOffer() 188 + await this.#peer.setLocalDescription(this.#pendingOffer) 189 + if (this.#peer.localDescription) { 190 + this.dispatchCustomEvent('signal', {type: 'offer', sdp: this.#peer.localDescription.toJSON()}) 191 + } 192 + } finally { 193 + this.#pendingOffer = null 194 + } 195 + } 196 + 197 + signal(data: unknown) { 198 + const parsed = rtcSignalDataSchema.safeParse(data) 199 + if (!parsed.success) { 200 + this.dispatchCustomEvent('error', parsed.error) 201 + return 202 + } 203 + 204 + const promise = match(parsed.data) 205 + .with({type: 'offer'}, (d) => this.#signalOffer(d.sdp)) 206 + .with({type: 'answer'}, (d) => this.#signalAnswer(d.sdp)) 207 + .with({type: 'candidate'}, (d) => this.#signalCandidate(d.candidate)) 208 + .exhaustive() 209 + 210 + promise 211 + .then(() => this.#flushCandidates()) 212 + .catch((exc: unknown) => this.dispatchCustomEvent('error', normalizeError(exc))) 213 + } 214 + 215 + #signalOffer = async (sdp: RTCSessionDescriptionInit) => { 216 + const offerCollision = 217 + sdp.type === 'offer' && (this.#pendingOffer !== null || this.#peer.signalingState !== 'stable') 218 + 219 + this.#ignoreOffer = !this.#polite && offerCollision 220 + if (this.#ignoreOffer) { 221 + console.log('ignoring offer due to glare (impolite peer)') 222 + return 223 + } 224 + 225 + if (offerCollision) { 226 + console.log('offer collision detected, rolling back (polite peer)') 227 + await this.#peer.setLocalDescription({type: 'rollback'}) 228 + } 229 + 230 + await this.#peer.setRemoteDescription(new RTCSessionDescription(sdp)) 231 + 232 + const answer = await this.#peer.createAnswer() 233 + await this.#peer.setLocalDescription(answer) 234 + if (this.#peer.localDescription) { 235 + this.dispatchCustomEvent('signal', {type: 'answer', sdp: this.#peer.localDescription.toJSON()}) 236 + } 237 + } 238 + 239 + #signalAnswer = async (sdp: RTCSessionDescriptionInit) => { 240 + await this.#peer.setRemoteDescription(new RTCSessionDescription(sdp)) 241 + } 242 + 243 + #signalCandidate = async (candidate: RTCIceCandidateInit) => { 244 + if (this.#peer.remoteDescription) { 245 + await this.#peer.addIceCandidate(new RTCIceCandidate(candidate)) 246 + } else { 247 + this.#candidates.push(candidate) 248 + } 249 + } 250 + 251 + async #flushCandidates() { 252 + if (!this.#peer.remoteDescription) return 253 + 254 + const toflush = this.#candidates.splice(0) 255 + await Promise.all( 256 + toflush.map(async (candidate) => { 257 + try { 258 + await this.#peer.addIceCandidate(candidate) 259 + } catch (exc: unknown) { 260 + this.dispatchCustomEvent('error', normalizeError(exc)) 261 + } 262 + }), 263 + ) 264 + } 265 + 266 + send(data: DataChannelSendable): void { 267 + this.#queue.enqueue(data) 268 + } 269 + 270 + async #drainLoop(chan: RTCDataChannel) { 271 + const lowwater = new Fence(chan.bufferedAmount < BUFFER_LOWWATER) 272 + chan.bufferedAmountLowThreshold = BUFFER_LOWWATER 273 + chan.addEventListener( 274 + 'bufferedamountlow', 275 + () => { 276 + lowwater.open() 277 + }, 278 + {signal: this.#abort.signal}, 279 + ) 280 + 281 + try { 282 + let data: DataChannelSendable 283 + while ((data = await this.#queue.dequeue(this.#abort.signal))) { 284 + await lowwater.enter(this.#abort.signal) // wait for peer buffer capacity 285 + 286 + // channel closed while waiting? 287 + if (chan.readyState !== 'open') { 288 + this.#queue.prequeue(data) 289 + break 290 + } 291 + 292 + try { 293 + // typescript gets hungup on the type check, because of how the dom types for data channel do overloads 294 + // but we know that the data we have can go through, and at runtime it'll do whatever dispatch it was going to anyway 295 + chan.send(data as string) 296 + } catch (exc: unknown) { 297 + this.#queue.prequeue(data) 298 + this.dispatchCustomEvent('error', normalizeError(exc)) 299 + continue 300 + } 301 + 302 + lowwater.value = chan.bufferedAmount < BUFFER_LOWWATER 303 + } 304 + } catch (exc: unknown) { 305 + if (this.#abort.signal.aborted) return 306 + throw exc 307 + } 308 + } 309 + 310 + destroy() { 311 + this.#chan?.close() 312 + this.#peer.close() 313 + 314 + this.#abort.abort('shutting down') 315 + } 316 + }
+40
src/lib/events.ts
··· 1 + export type TypedEventMap = Record<string, CustomEvent> 2 + 3 + export type TypedEventListener<M extends TypedEventMap, K extends keyof M> = ( 4 + evt: M[K], 5 + ) => void | Promise<void> 6 + 7 + export interface TypedEventListenerObject<M extends TypedEventMap, K extends keyof M> { 8 + handleEvent: (evt: M[K]) => void | Promise<void> 9 + } 10 + 11 + export type TypedEventListenerOrEventListenerObject<M extends TypedEventMap, K extends keyof M> = 12 + | TypedEventListener<M, K> 13 + | TypedEventListenerObject<M, K> 14 + 15 + /* eslint-disable @typescript-eslint/no-unsafe-declaration-merging */ 16 + 17 + export interface TypedEventTarget<M extends TypedEventMap> { 18 + addEventListener: <K extends keyof M & string>( 19 + type: K, 20 + listener: TypedEventListenerOrEventListenerObject<M, K> | null, 21 + options?: boolean | AddEventListenerOptions, 22 + ) => void 23 + 24 + removeEventListener: <K extends keyof M & string>( 25 + type: K, 26 + callback: TypedEventListenerOrEventListenerObject<M, K> | null, 27 + options?: EventListenerOptions | boolean, 28 + ) => void 29 + 30 + dispatchEvent: (event: Event) => boolean 31 + } 32 + 33 + export class TypedEventTarget<M extends TypedEventMap> extends EventTarget implements TypedEventTarget<M> { 34 + dispatchCustomEvent<K extends keyof M>(type: K, detail?: M[K]['detail'], opts: Partial<EventInit> = {}) { 35 + const event = new CustomEvent<M[K]['detail']>(type as string, {...opts, detail}) 36 + return super.dispatchEvent(event) 37 + } 38 + } 39 + 40 + /* eslint-enable @typescript-eslint/no-unsafe-declaration-merging */
+129
src/realm/client/action-store.ts
··· 1 + import Dexie, {Collection, Table} from 'dexie' 2 + 3 + import {TypedEventTarget} from '#lib/events' 4 + 5 + import {LogicalClock} from '#realm/logical-clock' 6 + import {RealmAction} from '#realm/schema' 7 + import {IdentBrand, IdentID} from '#realm/schema/brands' 8 + import {PeerClocks, Timestamp, compare, explode, generate} from '#realm/schema/timestamp' 9 + 10 + export interface StoredAction { 11 + identid: IdentID 12 + action: RealmAction 13 + } 14 + 15 + export interface StoredClock { 16 + identid: IdentID 17 + clock: Timestamp 18 + } 19 + 20 + export type ActionEventMap = { 21 + action: CustomEvent<RealmAction> 22 + } 23 + 24 + export class ActionStore extends Dexie implements TypedEventTarget<ActionEventMap> { 25 + private actions!: Table<StoredAction, Timestamp> 26 + private clocks!: Table<StoredClock, IdentID> 27 + 28 + #events = new TypedEventTarget<ActionEventMap>() 29 + addEventListener = this.#events.addEventListener.bind(this.#events) 30 + dispatchEvent = this.#events.dispatchEvent.bind(this.#events) 31 + dispatchCustomEvent = this.#events.dispatchCustomEvent.bind(this.#events) 32 + removeEventListener = this.#events.removeEventListener.bind(this.#events) 33 + 34 + #identid: IdentID 35 + #clock: LogicalClock 36 + 37 + constructor(identid: IdentID, clock: LogicalClock) { 38 + super(`realm-actions-${identid}`) 39 + 40 + this.#identid = identid 41 + this.#clock = clock 42 + 43 + this.version(1).stores({ 44 + actions: '&action.clk, [identid+action.clk]', 45 + clocks: '&identid', 46 + }) 47 + } 48 + 49 + async recordActions(actions: RealmAction[]) { 50 + const [actionrows, clocks] = actions.reduce( 51 + ([actions, clocks], action) => { 52 + const {identid} = explode(action.clk) 53 + if (compare(clocks[identid], action.clk) < 0) { 54 + clocks[identid] = action.clk 55 + } 56 + 57 + return [[...actions, {identid, action}], clocks] 58 + }, 59 + [[] as StoredAction[], {} as PeerClocks], 60 + ) 61 + 62 + let addedkeys: Timestamp[] | undefined 63 + await this.transaction('rw', ['actions', 'clocks'], async () => { 64 + addedkeys = await this.actions.bulkAdd(actionrows, {allKeys: true}) 65 + 66 + for (const [_identid, clock] of Object.entries(clocks)) { 67 + if (clock === null) continue 68 + 69 + const identid = IdentBrand.parse(_identid) 70 + const extant = await this.clocks.get(identid) 71 + 72 + if (!extant) { 73 + await this.clocks.add({identid, clock}) 74 + } else if (compare(clock, extant.clock) > 0) { 75 + await this.clocks.update(identid, {clock}) 76 + } 77 + } 78 + }) 79 + 80 + if (addedkeys) { 81 + const keys = addedkeys 82 + const added = actions.filter((a) => keys.includes(a.clk)).sort((a, b) => compare(a.clk, b.clk)) 83 + 84 + added.forEach((action) => { 85 + this.#events.dispatchCustomEvent('action', action) 86 + }) 87 + } 88 + } 89 + 90 + async actionsSince(timestamp: Timestamp): Promise<RealmAction[]> { 91 + const rows = await this.actions.where('action.clk').above(timestamp).toArray() 92 + return rows.map((a) => a.action) 93 + } 94 + 95 + async buildSyncState(): Promise<PeerClocks> { 96 + const stored = await this.clocks.toArray() 97 + return { 98 + ...Object.fromEntries(stored.map((c) => [c.identid, c.clock])), 99 + [this.#identid]: this.#clock.latest(), 100 + } 101 + } 102 + 103 + async buildSyncDelta(clocks: PeerClocks): Promise<RealmAction[]> { 104 + const state = await this.buildSyncState() 105 + const results = await this.#buildSyncQuery(state, clocks).sortBy('action.clk') 106 + return results.map((row) => row.action) 107 + } 108 + 109 + #buildSyncQuery(state: PeerClocks, clocks: PeerClocks): Collection<StoredAction, Timestamp> { 110 + // in the initial case (they have no peers), just send everything 111 + if (!Object.keys(clocks).length) { 112 + return this.actions.toCollection() 113 + } 114 + 115 + // otherwise, build up a sync state from range queries 116 + const peerids = Object.keys(state) as IdentID[] 117 + const memoQuery = this.actions.where('action.clk').noneOf([]) 118 + return peerids.reduce((query, identid) => { 119 + const lowerbound = clocks[identid] ?? generate(identid, 0, 0) 120 + const upperbound = state[identid] 121 + return query.or('[identid+action.clk]').between( 122 + [identid, lowerbound], 123 + [identid, upperbound], 124 + clocks[identid] == null, // include the lower bound if they don't have it 125 + true, // always include the upper bound 126 + ) 127 + }, memoQuery) 128 + } 129 + }
+742 -8
src/realm/client/connection.ts
··· 1 - import {RealmID} from '#realm/schema/brands' 1 + import {type WebSocket as ISOWS} from 'isomorphic-ws' 2 + import {SignJWT} from 'jose' 3 + import {nanoid} from 'nanoid' 4 + import {z} from 'zod/v4' 5 + 6 + import {TimeoutSignal, combineSignals, controllerWithSignals, timeoutSignal} from '#lib/async/aborts' 7 + import {BlockingQueue} from '#lib/async/blocking-queue' 8 + import {Fence} from '#lib/async/fence' 9 + import {backoff, sleep} from '#lib/async/sleep' 10 + import { 11 + DataChannelEventMap, 12 + DataChannelPeer, 13 + RTCSignalData, 14 + RTCSignalPayload, 15 + rtcSignalPayloadSchema, 16 + } from '#lib/client/webrtc' 17 + import {generateSignableJwt, jwkExport, jwkImport} from '#lib/crypto/jwks' 18 + import {jwtPayload, verifyJwtToken} from '#lib/crypto/jwts' 19 + import {ProtocolError, normalizeError, normalizeProtocolError} from '#lib/errors' 20 + import {TypedEventTarget} from '#lib/events' 21 + import {jsonCodec} from '#lib/schema' 22 + import {streamSocket, takeSocketSchema} from '#lib/socket' 23 + import {StrictMap} from '#lib/strict-map' 24 + 25 + import { 26 + PreauthAuthnRequest, 27 + PreauthExchangeInviteRequest, 28 + PreauthRegisterRequest, 29 + preauthResSchema, 30 + } from '#realm/protocol/messages-preauth' 31 + import { 32 + RealmPingEvent, 33 + realmPeerJoinedEventSchema, 34 + realmPeerLeftEventSchema, 35 + realmPingEventSchema, 36 + realmSignalEventSchema, 37 + } from '#realm/protocol/messages-realm' 38 + import {RealmAction, RealmAddress} from '#realm/schema' 39 + import {IdentBrand, IdentID} from '#realm/schema/brands' 2 40 3 - import {RealmIdentity} from './identity' 41 + import {ActionStore} from './action-store' 4 42 5 - export class RealmConnection { 43 + const realmPeerMessageSchema = z.union([realmPingEventSchema]) 44 + const realmSocketMessageSchema = z.union([ 45 + realmSignalEventSchema, 46 + realmPingEventSchema, 47 + realmPeerJoinedEventSchema, 48 + realmPeerLeftEventSchema, 49 + ]) 50 + const jsonSocketMessageSchema = jsonCodec(realmSocketMessageSchema) 51 + 52 + type RealmSocketMessage = z.infer<typeof realmSocketMessageSchema> 53 + 54 + export type RealmRemoteEventMap = { 55 + wsopen: CustomEvent<RealmAddress> 56 + wsclose: CustomEvent<never> 57 + wserror: CustomEvent<ProtocolError> 58 + wsdata: CustomEvent<unknown> 59 + 60 + peerjoined: CustomEvent<{identid: IdentID}> 61 + peerleft: CustomEvent<{identid: IdentID}> 62 + } 63 + 64 + type RealmPeerEventMap = DataChannelEventMap & { 65 + peeropen: CustomEvent<{identid: IdentID}> 66 + peerclose: CustomEvent<{identid: IdentID}> 67 + peererror: CustomEvent<{identid: IdentID; error: Error}> 68 + peerdata: CustomEvent<{identid: IdentID; data: string | ArrayBuffer}> 69 + } 70 + 71 + const SOCKET_HIGHWATER = 1024 * 1024 // 1Mb 72 + 73 + /** 74 + * a realm connection is the persistent connection to a signalling realm server and a collection of peers 75 + * 76 + * @summary 77 + * * initial connection 78 + * - `new RealmConnection(address)` starts the re/connection loop 79 + * - `#connectLoop` manages the socket connection (with backoff on failures) 80 + * - `#connectedSocket` creates the websocket, and sets up event handlers, blocking until the socket closes 81 + * - `#socketOnOpen` fires, and starts authentication handshake 82 + * - authentication succeeds: 83 + * - peer connections started based on peers from authentication response 84 + * - socket carries signals via signed JWT 85 + * - `#peerMessageLoop` starts, handles peer messages, (dispatching upstream, except pings) 86 + * - `#peerPingLoop` starts, handles pings to the peer (every 30s) 87 + * - `#socketMessageLoop` starts, taking messages off the socket and into a queue 88 + * - processes `pings`, and dispatches others up to event listeners 89 + * - `#socketPingLoop` starts, pinging the server with sync requests every 30s 90 + * * disconnect/reconnect 91 + * - socket closes (for whatever reason) 92 + * - `#connectedSocket` promise (see `#connectLoop`) resolves 93 + * - `#socketOnClose()`, closes the `#ready` gate 94 + * - socket loops exit (`while (#ready.value)`) 95 + * - `#connectLoop` does backoff delay 96 + * - loops back and create fresh socket and socket-bound abort signal 97 + * 98 + * peers should survive socket reconnects, since they're separate connections. 99 + */ 100 + export class RealmConnection extends TypedEventTarget<RealmRemoteEventMap & RealmPeerEventMap> { 101 + #abort: AbortController 102 + #ready = new Fence() 103 + 104 + #address: RealmAddress 105 + #identid: IdentID 106 + #keypair: CryptoKeyPair 107 + #actions: ActionStore 108 + 109 + #identities = new StrictMap<IdentID, CryptoKey>() 110 + #peers = new StrictMap<IdentID, RealmPeer>() 111 + #peerConfig: Partial<RTCConfiguration> | undefined 112 + 113 + #socket: WebSocket | null = null 114 + #socketQueue = new BlockingQueue<string>() 115 + #socketTimeout: TimeoutSignal | null = null 116 + #socketAbort: AbortController | null = null 117 + 6 118 constructor( 7 - public readonly upstream: string, 8 - public readonly realmid: RealmID, 9 - public readonly identity: RealmIdentity, 10 - ) {} 119 + address: RealmAddress, 120 + identid: IdentID, 121 + keypair: CryptoKeyPair, 122 + actions: ActionStore, 123 + config?: Partial<RTCConfiguration>, 124 + signal?: AbortSignal, 125 + ) { 126 + super() 127 + this.#abort = controllerWithSignals(signal) 128 + 129 + this.#address = address 130 + this.#identid = identid 131 + this.#keypair = keypair 132 + this.#actions = actions 133 + 134 + this.#peerConfig = config 135 + 136 + this.#connectLoop().catch((exc: unknown) => { 137 + console.error('fatal error in connection loop', exc) 138 + this.dispatchCustomEvent('error', normalizeError(exc)) 139 + }) 140 + } 141 + 142 + get address() { 143 + return this.#address 144 + } 145 + 146 + get connected() { 147 + return this.#ready.value 148 + } 149 + 150 + async #connectLoop() { 151 + const delay = backoff({baseDelay: 1000, maxDelay: 30_000}) 152 + while (!this.#abort.signal.aborted) { 153 + try { 154 + await this.#connectedSocket(this.#abort.signal) 155 + 156 + console.log('reconnecting', delay.attempts, 'after', delay.delay) 157 + await delay(this.#abort.signal) 158 + } catch (exc: unknown) { 159 + console.error('unexpected error in connect loop, continuing...', exc) 160 + this.dispatchCustomEvent('error', normalizeError(exc)) 161 + await delay(this.#abort.signal) 162 + } 163 + } 164 + } 165 + 166 + async #connectedSocket(signal?: AbortSignal) { 167 + const {promise, resolve, reject} = Promise.withResolvers<void>() 168 + 169 + this.#socketAbort?.abort('reconnecting') 170 + this.#socketTimeout = timeoutSignal(60_000) 171 + this.#socketAbort = controllerWithSignals(signal, this.#socketTimeout.signal) 172 + 173 + try { 174 + const opts = {signal: this.#socketAbort.signal, once: true} 175 + this.#socket = new WebSocket(this.#address.upstream) 176 + 177 + this.#socket.addEventListener( 178 + 'open', 179 + () => { 180 + this.#socketTimeout?.cancel() 181 + this.#socketOnOpen().catch((exc: unknown) => { 182 + reject(normalizeError(exc)) 183 + }) 184 + }, 185 + opts, 186 + ) 187 + 188 + this.#socket.addEventListener( 189 + 'error', 190 + () => { 191 + this.#socketTimeout?.cancel() 192 + reject(new Error('websocket error')) 193 + }, 194 + opts, 195 + ) 196 + 197 + this.#socket.addEventListener( 198 + 'close', 199 + () => { 200 + resolve() 201 + }, 202 + opts, 203 + ) 204 + 205 + this.#socketAbort.signal.addEventListener('abort', () => { 206 + this.#socketTimeout?.cancel() 207 + reject(normalizeError(this.#socketAbort?.signal.reason)) 208 + }) 209 + 210 + await promise 211 + this.#socketOnClose() 212 + } catch (exc: unknown) { 213 + this.#socketTimeout.cancel() 214 + this.#socketOnError() 215 + throw exc 216 + } 217 + } 218 + 219 + broadcast<T>(payload: T) { 220 + if (!this.#ready.value) return 221 + 222 + const json = JSON.stringify(payload) 223 + const recipients = Array.of(...this.#peers.keys()) 224 + 225 + recipients.forEach((identid) => { 226 + const peer = this.#peers.get(identid) 227 + peer?.send(json) 228 + }) 229 + 230 + // broadcast through to the server so that it can store actions 231 + this.#socketQueue.enqueue(json) 232 + } 233 + 234 + destroy() { 235 + // shut down any ongoing loops 236 + if (!this.#abort.signal.aborted) { 237 + this.#abort.abort('shutting down') 238 + } 239 + 240 + // close connections 241 + for (const peer of this.#peers.values()) { 242 + peer.destroy() 243 + } 244 + 245 + if (this.#socket?.readyState !== WebSocket.CLOSED) { 246 + this.#socket?.close() 247 + } 248 + 249 + // empty state 250 + this.#peers.clear() 251 + } 252 + 253 + chooseSyncPeer(): IdentID | null { 254 + let max: IdentID | null = null 255 + for (const key of this.#peers.keys()) { 256 + if (key === this.#identid) continue 257 + if (max === null || max.localeCompare(key) < 0) { 258 + max = key 259 + } 260 + } 261 + 262 + return max 263 + } 264 + 265 + /// 266 + 267 + #socketOnOpen = async () => { 268 + this.dispatchCustomEvent('wsopen', this.#address) 269 + 270 + // drain loop is special, it doesn't wait for ready 271 + this.#socketDrainLoop().catch((exc: unknown) => { 272 + console.error('uncaught error in socket drain loop', exc) 273 + }) 274 + 275 + try { 276 + const authresp = await this.#socketAuthenticate() 277 + if (authresp.typ === 'err') { 278 + this.#ready.close() 279 + throw new ProtocolError(authresp.err.detail ?? 'unknown error', authresp.err.code) 280 + } 281 + 282 + this.#address = { 283 + realmid: this.#address.realmid, 284 + upstream: this.#address.upstream, 285 + // invitation/register intentionally omitted 286 + } 287 + 288 + // explicitly, we don't unlock authenticated until we've setup peers 289 + // otherwise various loops will start before connections are ready 290 + 291 + // store identities for message verification 292 + for (const [identid_, pubkey_] of Object.entries(authresp.dat.identities)) { 293 + try { 294 + const identid = IdentBrand.parse(identid_) 295 + const pubkey = await jwkImport.parseAsync(pubkey_) 296 + this.#identities.set(identid, pubkey) 297 + } catch (exc: unknown) { 298 + console.warn(`couldn't import remote identity: ${identid_}`, exc) 299 + } 300 + } 301 + 302 + // initiate outbound connections when starting up 303 + for (const peerid of authresp.dat.peers) { 304 + if (peerid === this.#identid) continue 305 + this.#peerConnect(peerid, true) 306 + } 307 + 308 + // we're ready! 309 + this.#ready.open() 310 + 311 + // start the loops 312 + this.#socketMessageLoop().catch((exc: unknown) => { 313 + console.error('uncaught error in message loop', exc) 314 + }) 315 + 316 + this.#socketPingLoop().catch((exc: unknown) => { 317 + console.error('uncaught error in ping loop', exc) 318 + }) 319 + } catch (exc: unknown) { 320 + console.warn('unknown error in socket outhentication or setup', exc) 321 + this.#socket?.close() 322 + this.dispatchCustomEvent('error', normalizeError(exc)) 323 + throw exc 324 + } 325 + } 326 + 327 + #socketOnClose = () => { 328 + this.#ready.close() 329 + this.dispatchCustomEvent('wsclose') 330 + } 331 + 332 + #socketOnError = () => { 333 + this.dispatchCustomEvent('wserror', normalizeProtocolError('dunno', 400)) 334 + } 335 + 336 + /// 337 + 338 + #socketPingLoop = async () => { 339 + await this.#ready.enter(this.#socketAbort?.signal) 340 + 341 + while (this.#ready.value) { 342 + await this.#socketPing() 343 + await sleep(30_000, this.#socketAbort?.signal) 344 + } 345 + } 11 346 12 - destroy() {} 347 + #socketMessageLoop = async () => { 348 + await this.#ready.enter(this.#socketAbort?.signal) 349 + 350 + const stream = streamSocket(this.#socket as unknown as ISOWS, {signal: this.#socketAbort?.signal}) 351 + try { 352 + for await (const raw of stream) { 353 + await this.#socketOnData(raw) 354 + } 355 + } catch (exc: unknown) { 356 + console.error('socket stream error', exc) 357 + this.dispatchCustomEvent('error', normalizeError(exc)) 358 + } 359 + } 360 + 361 + #socketDrainLoop = async () => { 362 + if (this.#socket == null) { 363 + throw new Error('unexpectedly ready with no socket?') 364 + } 365 + 366 + try { 367 + let data 368 + while ((data = await this.#socketQueue.dequeue(this.#socketAbort?.signal))) { 369 + while (this.#socket.bufferedAmount > SOCKET_HIGHWATER) { 370 + await sleep(100, this.#socketAbort?.signal) 371 + } 372 + 373 + // channel closed while waiting? 374 + if (this.#socket.readyState !== WebSocket.OPEN) { 375 + this.#socketQueue.prequeue(data) 376 + break 377 + } 378 + 379 + try { 380 + this.#socket.send(data) 381 + } catch (exc: unknown) { 382 + this.#socketQueue.prequeue(data) 383 + this.dispatchCustomEvent('error', normalizeError(exc)) 384 + continue 385 + } 386 + } 387 + } catch (exc: unknown) { 388 + if (this.#socketAbort?.signal.aborted) return 389 + throw exc 390 + } 391 + } 392 + 393 + #socketOnData = async (data: unknown) => { 394 + const parsed = await jsonSocketMessageSchema.safeParseAsync(data) 395 + if (parsed.success) { 396 + await this.#socketOnMessage(parsed.data) 397 + } else { 398 + this.dispatchCustomEvent('wsdata', data) 399 + } 400 + } 401 + 402 + /// 403 + 404 + #socketAuthenticate = async () => { 405 + let token: SignJWT 406 + 407 + // send the appropriate type of request 408 + if (this.#address.register) { 409 + token = await this.#buildAuthenticateRegister() 410 + } else if (this.#address.invitation) { 411 + token = await this.#buildAuthenticateExchange(this.#address.invitation) 412 + } else { 413 + token = this.#buildAuthenticateAuthn() 414 + } 415 + 416 + const signed = await token.sign(this.#keypair.privateKey) 417 + this.#socketQueue.enqueue(signed) 418 + 419 + // now, we're expecting an authenticated responese 420 + const timeout = timeoutSignal(5000) 421 + const signal = combineSignals(this.#socketAbort?.signal, timeout.signal) 422 + return await takeSocketSchema(this.#socket as unknown as ISOWS, jsonCodec(preauthResSchema), signal) 423 + } 424 + 425 + #buildAuthenticateAuthn = () => { 426 + return generateSignableJwt<PreauthAuthnRequest>({ 427 + iss: this.#identid, 428 + aud: this.#address.realmid, 429 + typ: 'req', 430 + msg: 'preauth.authn', 431 + seq: nanoid(), 432 + }) 433 + } 434 + 435 + #buildAuthenticateRegister = async () => { 436 + const pubkey = await jwkExport.parseAsync(this.#keypair.publicKey) 437 + return generateSignableJwt<PreauthRegisterRequest>({ 438 + iss: this.#identid, 439 + aud: this.#address.realmid, 440 + typ: 'req', 441 + msg: 'preauth.register', 442 + seq: nanoid(), 443 + dat: {pubkey}, 444 + }) 445 + } 446 + 447 + #buildAuthenticateExchange = async (inviteJwt: string) => { 448 + const pubkey = await jwkExport.parseAsync(this.#keypair.publicKey) 449 + return generateSignableJwt<PreauthExchangeInviteRequest>({ 450 + iss: this.#identid, 451 + aud: this.#address.realmid, 452 + typ: 'req', 453 + msg: 'preauth.exchange', 454 + seq: nanoid(), 455 + dat: {pubkey, inviteJwt}, 456 + }) 457 + } 458 + 459 + #socketPing = async () => { 460 + const clocks = await this.#actions.buildSyncState() 461 + const ping: RealmPingEvent = { 462 + typ: 'evt', 463 + msg: 'realm.ping', 464 + dat: {clocks, requestSync: true}, 465 + } 466 + 467 + this.#socketQueue.enqueue(JSON.stringify(ping)) 468 + } 469 + 470 + // anything we don't handle directly at the realm level gets handled by the server peer 471 + #socketOnMessage = async (data: RealmSocketMessage) => { 472 + switch (data.msg) { 473 + case 'realm.peer-joined': 474 + this.#identities.set(data.dat.identid, await jwkImport.parseAsync(data.dat.pubkey)) 475 + this.dispatchCustomEvent('peerjoined', {identid: data.dat.identid}) 476 + return 477 + 478 + case 'realm.peer-left': 479 + this.#peerDisconnect(data.dat.identid) 480 + this.dispatchCustomEvent('peerleft', {identid: data.dat.identid}) 481 + return 482 + 483 + case 'realm.ping': { 484 + if (data.dat.requestSync) { 485 + this.#socketSendActions(await this.#actions.buildSyncDelta(data.dat.clocks)) 486 + } 487 + 488 + return 489 + } 490 + 491 + case 'realm.signal': { 492 + // find the referenced peer and hand off the signal to it 493 + if (data.dat.remoteid !== this.#identid) { 494 + console.warn('got a socket signal for a different identity?') 495 + return 496 + } 497 + 498 + const pubkey = this.#identities.get(data.dat.localid) 499 + if (pubkey == null) { 500 + console.warn('signal from an unknown identity (we cannot verify!)') 501 + return 502 + } 503 + 504 + const token = await jwtPayload(rtcSignalPayloadSchema).parseAsync(data.dat.signed) 505 + await verifyJwtToken(data.dat.signed, pubkey) 506 + 507 + let peer = this.#peers.get(data.dat.localid) 508 + if (!peer && token.payload.initiator) { 509 + // false because _we_ are not the initiator, they are (they did initial outbound connections) 510 + peer = this.#peerConnect(data.dat.localid, false) 511 + } 512 + 513 + // might not have a connection yet (if we're waiting for an answer) 514 + peer?.signal(token.payload.signal) 515 + return 516 + } 517 + } 518 + } 519 + 520 + #socketSendActions(actions: RealmAction[], batch_size = 100) { 521 + for (let i = 0; i < actions.length; i += batch_size) { 522 + const batch = actions.slice(i, i + batch_size) 523 + this.#socketQueue.enqueue(JSON.stringify(batch)) 524 + } 525 + } 526 + 527 + /// 528 + 529 + #peerConnect(remoteid: IdentID, initiator: boolean): RealmPeer { 530 + return this.#peers.ensure(remoteid, () => { 531 + // important: we use the _connection_ signal, not the socket signal 532 + // the websocket dying and getting reconnected shouldn't effect webrtc peers 533 + const timeout = timeoutSignal(60_000) 534 + const signal = combineSignals(timeout.signal, this.#abort.signal) 535 + const opts = {signal} 536 + 537 + try { 538 + const peer = new RealmPeer(this, this.#actions, remoteid, initiator, this.#peerConfig, signal) 539 + this.#abort.signal.addEventListener('abort', () => { 540 + timeout.cancel() 541 + }) 542 + 543 + peer.addEventListener( 544 + 'signal', 545 + (e) => { 546 + this.#peerOnSignal(peer, e) 547 + }, 548 + opts, 549 + ) 550 + 551 + peer.addEventListener( 552 + 'connect', 553 + () => { 554 + timeout.cancel() 555 + this.dispatchCustomEvent('peeropen', {identid: remoteid}) 556 + }, 557 + opts, 558 + ) 559 + 560 + peer.addEventListener( 561 + 'peerdata', 562 + (e) => { 563 + this.dispatchCustomEvent('peerdata', e.detail) 564 + }, 565 + opts, 566 + ) 567 + 568 + peer.addEventListener( 569 + 'close', 570 + () => { 571 + timeout.cancel() 572 + this.dispatchCustomEvent('peerclose', {identid: remoteid}) 573 + }, 574 + opts, 575 + ) 576 + 577 + peer.addEventListener( 578 + 'error', 579 + (e) => { 580 + timeout.cancel() 581 + this.dispatchCustomEvent('peererror', { 582 + identid: remoteid, 583 + error: normalizeProtocolError(e.detail), 584 + }) 585 + }, 586 + opts, 587 + ) 588 + 589 + return peer 590 + } catch (exc) { 591 + timeout.cancel() 592 + throw exc 593 + } 594 + }) 595 + } 596 + 597 + #peerDisconnect(remoteid: IdentID) { 598 + const peer = this.#peers.remove(remoteid) 599 + if (peer) { 600 + peer.destroy() 601 + } 602 + } 603 + 604 + #peerOnSignal = (peer: RealmPeer, event: CustomEvent<RTCSignalData>) => { 605 + const go = async () => { 606 + const token = generateSignableJwt<RTCSignalPayload>({ 607 + aud: peer.identid, 608 + iss: this.#identid, 609 + exp: Math.floor(Date.now() / 1000) + 60, 610 + signal: event.detail, 611 + initiator: peer.initiator, 612 + }) 613 + 614 + const signed = await token.sign(this.#keypair.privateKey) 615 + this.#socketQueue.enqueue(signed) 616 + } 617 + 618 + go().catch((exc: unknown) => { 619 + console.error('error during peer signal send!', exc) 620 + this.dispatchCustomEvent('error', normalizeError(exc)) 621 + }) 622 + } 623 + } 624 + 625 + export class RealmPeer extends DataChannelPeer<RealmPeerEventMap> { 626 + #abort: AbortController 627 + #connected: Fence 628 + #queue: BlockingQueue<string> 629 + 630 + constructor( 631 + readonly realm: RealmConnection, 632 + readonly actions: ActionStore, 633 + readonly identid: IdentID, 634 + readonly initiator: boolean, 635 + config?: Partial<RTCConfiguration>, 636 + signal?: AbortSignal, 637 + ) { 638 + super(initiator, { 639 + iceServers: [{urls: 'stun:stun.l.google.com:19302'}, {urls: 'stun:stun1.l.google.com:19302'}], 640 + iceCandidatePoolSize: 0, // no pre-gather, trickle ico 641 + bundlePolicy: 'balanced', 642 + iceTransportPolicy: 'all', 643 + ...(config || {}), 644 + }) 645 + 646 + this.#abort = controllerWithSignals(signal) 647 + this.#connected = new Fence() 648 + this.#queue = new BlockingQueue() 649 + 650 + const opts = {signal: this.#abort.signal} 651 + this.addEventListener('connect', this.#peerConnected, opts) 652 + this.addEventListener('close', this.#peerClosed, opts) 653 + this.addEventListener('data', this.#peerData, opts) 654 + 655 + this.#pingLoop().catch((exc: unknown) => { 656 + console.warn('unexpected error in ping loop', exc) 657 + }) 658 + 659 + this.#receiveLoop().catch((exc: unknown) => { 660 + console.warn('unexpected error in receive loop', exc) 661 + }) 662 + } 663 + 664 + destroy(error?: Error) { 665 + this.#abort.abort(error) 666 + super.destroy() 667 + } 668 + 669 + /// 670 + 671 + #peerConnected = () => { 672 + this.#connected.open() 673 + } 674 + 675 + #peerClosed = () => { 676 + this.#connected.close() 677 + } 678 + 679 + #peerData = (event: CustomEvent<string | ArrayBuffer>) => { 680 + const data = event.detail 681 + const string = typeof data === 'string' ? data : new TextDecoder().decode(data) 682 + this.#queue.enqueue(string) 683 + } 684 + 685 + /// 686 + 687 + #receiveLoop = async () => { 688 + this.#abort.signal.throwIfAborted() 689 + 690 + while (await this.#connected.enter(this.#abort.signal)) { 691 + const payload = await this.#queue.dequeue(this.#abort.signal) 692 + if (!this.#connected.value) break 693 + 694 + const parsed = await jsonCodec(realmPeerMessageSchema).safeParseAsync(payload) 695 + if (parsed.success) { 696 + await this.#receivePing(parsed.data) 697 + } else { 698 + this.dispatchCustomEvent('peerdata', {identid: this.identid, data: payload}) 699 + } 700 + } 701 + } 702 + 703 + #pingLoop = async () => { 704 + while (await this.#connected.enter(this.#abort.signal)) { 705 + // wait a moment to let the connect settle before ping 706 + await sleep(100, this.#abort.signal) 707 + if (!this.#connected.value) { 708 + break 709 + } 710 + 711 + await this.#sendPing() 712 + await sleep(30_000, this.#abort.signal) 713 + } 714 + } 715 + 716 + #sendPing = async () => { 717 + if (!this.#connected.value) return 718 + 719 + const clocks = await this.actions.buildSyncState() 720 + const syncPeer = this.realm.chooseSyncPeer() 721 + 722 + const message: RealmPingEvent = { 723 + typ: 'evt', 724 + msg: 'realm.ping', 725 + dat: {clocks, requestSync: syncPeer === this.identid}, 726 + } 727 + 728 + this.send(JSON.stringify(message)) 729 + } 730 + 731 + #receivePing = async (ping: RealmPingEvent) => { 732 + if (!ping.dat.requestSync) return 733 + 734 + // TODO: record last received ping 735 + 736 + this.#sendActions(await this.actions.buildSyncDelta(ping.dat.clocks)) 737 + } 738 + 739 + /// 740 + 741 + #sendActions(actions: RealmAction[], batch_size = 100) { 742 + for (let i = 0; i < actions.length; i += batch_size) { 743 + const batch = actions.slice(i, i + batch_size) 744 + this.send(JSON.stringify(batch)) 745 + } 746 + } 13 747 }
-111
src/realm/client/context.tsx
··· 1 - import {PropsWithChildren, createContext, useCallback, useContext, useEffect, useMemo, useState} from 'react' 2 - 3 - import {normalizeError} from '#lib/errors' 4 - 5 - import {RealmID} from '#realm/schema/brands' 6 - 7 - import {RealmConnection} from './connection' 8 - import {RealmIdentity} from './identity' 9 - import {RealmContextStorage} from './storage' 10 - 11 - // loading -> local 12 - // \ v ^ 13 - // -> remote 14 - 15 - export type RealmState = 16 - | {status: 'loading'} 17 - | {status: 'local'; identity: RealmIdentity} 18 - | {status: 'remote'; identity: RealmIdentity; remote: RealmRemote; connection?: RealmConnection} 19 - | {status: 'error'; identity: RealmIdentity | null; error: Error} 20 - 21 - export type RealmRemote = { 22 - realmid: RealmID 23 - upstream: string 24 - register?: boolean 25 - invitation?: string 26 - } 27 - 28 - export const RealmContext = createContext<RealmContext | null>(null) 29 - export interface RealmContext { 30 - state: RealmState 31 - 32 - connect: (opts: RealmRemote) => Promise<void> 33 - disconnect: () => Promise<void> 34 - } 35 - 36 - export type RealmContextProviderProps = { 37 - url: string 38 - } 39 - 40 - export const RealmContextProvider = (props: PropsWithChildren<RealmContextProviderProps>) => { 41 - const store = useMemo(() => new RealmContextStorage(), []) 42 - const [state, setState] = useState<RealmState>({status: 'loading'}) 43 - 44 - const status = state.status 45 - const identity = status !== 'error' && status !== 'loading' ? state.identity : null 46 - const remote = status === 'remote' ? state.remote : null 47 - const connection = status === 'remote' ? state.connection : null 48 - 49 - const connect = useCallback( 50 - async (remote: RealmRemote) => { 51 - if (!identity) return 52 - 53 - await store.connect(remote.upstream, remote.realmid) 54 - setState({status: 'remote', identity, remote}) 55 - }, 56 - [identity, store], 57 - ) 58 - 59 - const disconnect = useCallback(async () => { 60 - if (!identity) return 61 - 62 - await store.disconnect() 63 - setState({status: 'local', identity}) 64 - }, [identity, store]) 65 - 66 - // on mount, pull our connection info from storage 67 - useEffect(() => { 68 - if (status !== 'loading') return 69 - 70 - const go = async () => { 71 - const row = await store.fetch() 72 - const identity = new RealmIdentity(row.identid, row.keypair, row.clock) 73 - const {realmid, upstream} = row 74 - 75 - if (realmid && upstream) { 76 - setState({status: 'remote', identity, remote: {realmid, upstream}}) 77 - } else { 78 - setState({status: 'local', identity}) 79 - } 80 - } 81 - 82 - go().catch((exc: unknown) => { 83 - console.error(exc) 84 - setState({status: 'error', error: normalizeError(exc), identity: null}) 85 - }) 86 - }, [status, identity, store]) 87 - 88 - // whenever we're in connecting state, connect! 89 - useEffect(() => { 90 - if (status !== 'remote' || !identity || !remote) return 91 - if (connection) return 92 - 93 - const conn = new RealmConnection(remote.upstream, remote.realmid, identity) 94 - setState({status: 'remote', identity, remote, connection: conn}) 95 - 96 - return () => { 97 - conn.destroy() 98 - setState({status: 'remote', identity, remote}) 99 - // TODO: reconnect counter, backoff, etc. 100 - } 101 - }, [status, identity, remote, connection]) 102 - 103 - return <RealmContext.Provider value={{state, connect, disconnect}}>{props.children}</RealmContext.Provider> 104 - } 105 - 106 - export function useRealmContext(): RealmContext { 107 - const context = useContext(RealmContext) 108 - if (context == null) throw new Error('expected to be under a realm context!') 109 - 110 - return context 111 - }
+90
src/realm/client/identity-store.ts
··· 1 + import Dexie, {Table} from 'dexie' 2 + 3 + import {generateSigningJwkPair} from '#lib/crypto/jwks' 4 + 5 + import {RealmAddress} from '#realm/schema' 6 + import {IdentBrand, IdentID} from '#realm/schema/brands' 7 + import {Timestamp} from '#realm/schema/timestamp' 8 + 9 + import {RealmIdentity} from './identity' 10 + 11 + interface StoredIdentity { 12 + identid: IdentID 13 + keypair: CryptoKeyPair 14 + clock?: Timestamp 15 + remote?: RealmAddress 16 + lastUsed?: number 17 + } 18 + 19 + export class RealmIdentityStore extends Dexie { 20 + private identities!: Table<StoredIdentity> 21 + 22 + constructor() { 23 + super('realm-identities') 24 + 25 + this.version(1).stores({ 26 + identities: '&identid, lastUsed', 27 + }) 28 + } 29 + 30 + async list(): Promise<IdentID[]> { 31 + const keys = await this.identities.toCollection().keys() 32 + return keys.map((k) => IdentBrand.parse(k)) 33 + } 34 + 35 + async load(identid: IdentID): Promise<RealmIdentity | undefined> { 36 + const row = await this.identities.get(identid) 37 + return row ? this.#loadIdentity(row) : undefined 38 + } 39 + 40 + ensure(): Promise<RealmIdentity> { 41 + return this.transaction('rw', [this.identities], async () => { 42 + let row = await this.identities.orderBy('lastUsed').reverse().first() 43 + if (!row) { 44 + const identid = IdentBrand.generate() 45 + const keypair = await generateSigningJwkPair() 46 + 47 + row = {identid, keypair, lastUsed: Date.now()} 48 + await this.identities.put(row) 49 + } else { 50 + await this.identities.update(row.identid, {lastUsed: Date.now()}) 51 + } 52 + 53 + return this.#loadIdentity(row) 54 + }) 55 + } 56 + 57 + #loadIdentity(row: StoredIdentity) { 58 + const abort = new AbortController() 59 + const identity = new RealmIdentity(row.identid, row.keypair, row.clock) 60 + identity.addEventListener('tick', this.#onTick.bind(this, identity), {signal: abort.signal}) 61 + identity.addEventListener('connected', this.#onConnected.bind(this, identity), {signal: abort.signal}) 62 + 63 + // this will abort when the identity destroys itself, clearing event listeners 64 + identity.addEventListener('destroyed', abort.abort.bind(abort), {signal: abort.signal}) 65 + 66 + if (row.remote) { 67 + identity.connect(row.remote) 68 + } 69 + 70 + return identity 71 + } 72 + 73 + #onConnected = (identity: RealmIdentity, event: CustomEvent<RealmAddress>) => { 74 + this.identities 75 + .update(identity.identid, { 76 + 'remote.realmid': event.detail.realmid, 77 + 'remote.upstream': event.detail.upstream, 78 + lastUsed: Date.now(), 79 + }) 80 + .catch((exc: unknown) => { 81 + console.error('uncaught error in identity connected handler', exc) 82 + }) 83 + } 84 + 85 + #onTick = (identity: RealmIdentity, event: CustomEvent<Timestamp>) => { 86 + this.identities.update(identity.identid, {clock: event.detail}).catch((exc: unknown) => { 87 + console.error('uncaught error in identity tick handler', exc) 88 + }) 89 + } 90 + }
+150 -4
src/realm/client/identity.ts
··· 1 + import {z} from 'zod/v4' 2 + 3 + import {controllerWithSignals} from '#lib/async/aborts' 4 + import {jwtSchema} from '#lib/crypto/jwts' 5 + import {TypedEventTarget} from '#lib/events' 6 + import {jsonCodec} from '#lib/schema' 7 + 1 8 import {LogicalClock} from '#realm/logical-clock' 2 - import {IdentID} from '#realm/schema/brands' 9 + import {RealmAction, RealmAddress, actionSchema} from '#realm/schema' 10 + import {IdentID, RealmBrand} from '#realm/schema/brands' 3 11 import {Timestamp} from '#realm/schema/timestamp' 4 12 5 - export class RealmIdentity { 13 + import {ActionStore} from './action-store' 14 + import {RealmConnection} from './connection' 15 + 16 + export type RealmIdentityEventMap = { 17 + connected: CustomEvent<RealmAddress> 18 + disconnected: CustomEvent<never> 19 + destroyed: CustomEvent<never> 20 + 21 + tick: CustomEvent<Timestamp> 22 + action: CustomEvent<RealmAction> 23 + 24 + peerjoined: CustomEvent<{identid: IdentID}> 25 + peerleft: CustomEvent<{identid: IdentID}> 26 + } 27 + 28 + const realmIdentityMessageSchema = z.union([ 29 + z.array(actionSchema), 30 + // other messages (for now) are handled by connection directly 31 + ]) 32 + 33 + export class RealmIdentity extends TypedEventTarget<RealmIdentityEventMap> { 6 34 readonly identid: IdentID 7 35 readonly keypair: CryptoKeyPair 8 36 readonly clock: LogicalClock 37 + readonly actions: ActionStore 38 + 39 + #abort = new AbortController() 9 40 10 - constructor(identid: IdentID, keypair: CryptoKeyPair, timestamp?: Timestamp) { 41 + #connection?: RealmConnection 42 + #connectionAbort?: AbortController 43 + 44 + constructor(identid: IdentID, keypair: CryptoKeyPair, latest?: Timestamp) { 45 + super() 46 + 11 47 this.identid = identid 12 48 this.keypair = keypair 13 - this.clock = new LogicalClock(this.identid, timestamp ?? null) 49 + 50 + this.clock = new LogicalClock(identid, latest) 51 + this.clock.addEventListener('tick', this.#onTick, {signal: this.#abort.signal}) 52 + 53 + this.actions = new ActionStore(identid, this.clock) 54 + this.actions.addEventListener('action', this.#onAction, {signal: this.#abort.signal}) 55 + } 56 + 57 + async dispatchAction< 58 + A extends RealmAction, 59 + K extends A['msg'] = A['msg'], 60 + T extends Extract<A, {msg: K}>['dat'] = A['dat'], 61 + >(msg: K, dat: T) { 62 + const action: RealmAction<T> = {typ: 'act', clk: this.clock.now(), msg, dat} 63 + 64 + await this.actions.recordActions([action]) 65 + this.#connection?.broadcast([action]) 66 + } 67 + 68 + #onTick = (e: CustomEvent<Timestamp>) => { 69 + this.dispatchCustomEvent('tick', e.detail) 70 + } 71 + 72 + #onAction = (e: CustomEvent<RealmAction>) => { 73 + this.dispatchCustomEvent('action', e.detail) 74 + } 75 + 76 + connect(remote: RealmAddress) { 77 + this.disconnect() 78 + 79 + this.#connectionAbort = controllerWithSignals(this.#abort.signal) 80 + const opts = {signal: this.#connectionAbort.signal} 81 + const dispatch = this.dispatchEvent.bind(this) as EventListener 82 + 83 + this.#connection = new RealmConnection( 84 + remote, 85 + this.identid, 86 + this.keypair, 87 + this.actions, 88 + {}, 89 + this.#abort.signal, 90 + ) 91 + this.#connection.addEventListener('wsopen', this.#onWSOpen, opts) 92 + this.#connection.addEventListener('wsclose', this.#onWSClose, opts) 93 + this.#connection.addEventListener('peerjoined', dispatch, opts) 94 + this.#connection.addEventListener('peerleft', dispatch, opts) 95 + this.#connection.addEventListener('peerdata', this.#onPeerData, opts) 96 + this.#connection.addEventListener('wsdata', this.#onWSData, opts) 97 + } 98 + 99 + register(upstream: string): void { 100 + const realmid = RealmBrand.generate() 101 + this.connect({upstream, realmid, register: true}) 102 + } 103 + 104 + exchangeInvite(upstream: string, invitation: string) { 105 + const token = jwtSchema.parse(invitation) 106 + const realmid = RealmBrand.parse(token.claims.aud) 107 + 108 + const now = Math.floor(Date.now() / 1000) 109 + if (!token.claims.exp || token.claims.exp < now) throw new Error('invitation has expired') 110 + 111 + this.connect({realmid, upstream, invitation}) 112 + } 113 + 114 + get remote(): RealmAddress | undefined { 115 + return this.#connection?.address 116 + } 117 + 118 + get connected(): boolean { 119 + return this.#connection != null && this.#connection.connected 120 + } 121 + 122 + #onWSOpen = (e: CustomEvent<RealmAddress>) => { 123 + this.dispatchCustomEvent('connected', e.detail) 124 + } 125 + 126 + #onWSClose = () => { 127 + this.dispatchCustomEvent('disconnected') 128 + } 129 + 130 + #onWSData = (e: CustomEvent<unknown>) => { 131 + this.#onIncomingData(e.detail) 132 + } 133 + 134 + #onPeerData = (e: CustomEvent<{identid: IdentID; data: unknown}>) => { 135 + this.#onIncomingData(e.detail.data) 136 + } 137 + 138 + #onIncomingData = (data: unknown) => { 139 + const parsed = jsonCodec(realmIdentityMessageSchema).safeParse(data) 140 + if (parsed.success) { 141 + this.actions.recordActions(parsed.data).catch((exc: unknown) => { 142 + console.error('error on identity action recording', exc, parsed.data) 143 + }) 144 + } else { 145 + console.warn('unknown message in identity:', data, parsed.error) 146 + } 147 + } 148 + 149 + disconnect() { 150 + this.#connection?.destroy() 151 + this.#connectionAbort?.abort('shutting down') 152 + 153 + this.#connection = undefined 154 + this.#connectionAbort = undefined 155 + } 156 + 157 + destroy() { 158 + this.disconnect() 159 + this.#abort.abort('shutting down') 14 160 } 15 161 }
-56
src/realm/client/storage.ts
··· 1 - import Dexie, {Table} from 'dexie' 2 - 3 - import {generateSigningJwkPair} from '#lib/crypto/jwks.js' 4 - 5 - import {IdentBrand, IdentID, RealmID} from '#realm/schema/brands' 6 - import {Timestamp} from '#realm/schema/timestamp' 7 - 8 - export interface RealmContextRow { 9 - identid: IdentID 10 - keypair: CryptoKeyPair 11 - clock?: Timestamp 12 - realmid?: RealmID 13 - upstream?: string 14 - } 15 - 16 - class ContextDB extends Dexie { 17 - context!: Table<RealmContextRow> 18 - 19 - constructor() { 20 - super('realm-context') 21 - this.version(1).stores({ 22 - context: '&identid', 23 - }) 24 - } 25 - } 26 - 27 - export class RealmContextStorage { 28 - #db: ContextDB 29 - 30 - constructor() { 31 - this.#db = new ContextDB() 32 - } 33 - 34 - async fetch(): Promise<RealmContextRow> { 35 - const extant = await this.#db.context.toCollection().first() 36 - if (extant) return extant 37 - 38 - const identid = IdentBrand.generate() 39 - const keypair = await generateSigningJwkPair() 40 - 41 - const row = {identid, keypair} 42 - await this.#db.context.put(row) 43 - 44 - return row 45 - } 46 - 47 - async connect(upstream: string, realmid: RealmID) { 48 - const extant = await this.fetch() // make sure there's an existing row 49 - await this.#db.context.update(extant.identid, {upstream, realmid}) 50 - } 51 - 52 - async disconnect() { 53 - const extant = await this.fetch() // make sure there's an existing row 54 - await this.#db.context.update(extant.identid, {upstream: undefined, realmid: undefined}) 55 - } 56 - }
+8 -13
src/realm/logical-clock.ts
··· 1 + import {TypedEventTarget} from '#lib/events' 2 + 1 3 import {IdentID} from '#realm/schema/brands' 2 4 import {Timestamp, compare, generate} from '#realm/schema/timestamp' 5 + 6 + export type LogicalClockEventMap = { 7 + tick: CustomEvent<Timestamp> 8 + } 3 9 4 10 /** 5 11 * A logical clock that generates monotonically increasing timestamps, with identity as tiebreaking. ··· 23 29 * clock.tick(receivedTimestamp) 24 30 * ``` 25 31 */ 26 - export class LogicalClock extends EventTarget { 32 + export class LogicalClock extends TypedEventTarget<LogicalClockEventMap> { 27 33 #identid: IdentID 28 34 #counter = 0 29 35 #seconds = 0 ··· 94 100 tick(timestamp: Timestamp) { 95 101 if (!this.#latest || compare(this.#latest, timestamp) < 0) { 96 102 this.#latest = timestamp 97 - this.dispatchEvent(new TickEvent(LogicalClock.TICK, timestamp)) 103 + this.dispatchCustomEvent('tick', timestamp) 98 104 } 99 105 100 106 return this.#latest 101 107 } 102 108 } 103 - 104 - /** 105 - * An event emmitted when the logical clock advances. 106 - * @see {@link LogicalClock.tick} 107 - * @event 108 - */ 109 - export class TickEvent extends CustomEvent<Timestamp> { 110 - constructor(name: string, timestamp: Timestamp) { 111 - super(name, {detail: timestamp}) 112 - } 113 - }
+2 -2
src/realm/protocol/messages-preauth.ts
··· 3 3 import {jwkSchema} from '#lib/crypto/jwks' 4 4 import {makeEmptyRequestSchema, makeErrorSchema, makeRequestSchema, makeResponseSchema} from '#lib/schema' 5 5 6 - import {peerIdSchema} from '#realm/schema/brands' 6 + import {IdentBrand} from '#realm/schema/brands' 7 7 8 8 /** preauth message: given a public key, request a new realm */ 9 9 export const preauthRegisterReqSchema = makeRequestSchema('preauth.register', z.object({pubkey: jwkSchema})) ··· 24 24 export const preauthSuccessSchema = makeResponseSchema( 25 25 'preauth.authn', 26 26 z.object({ 27 - peers: z.array(peerIdSchema), 27 + peers: z.array(IdentBrand.schema), 28 28 identities: z.record(z.string(), jwkSchema), 29 29 }), 30 30 )
-7
src/realm/protocol/messages-realm.ts
··· 27 27 }), 28 28 ) 29 29 30 - export const realmSignalEventPayloadSchema = z.object({ 31 - signal: z.string(), 32 - initiator: z.boolean(), 33 - }) 34 - 35 30 export const realmPeerJoinedEventSchema = makeEventSchema( 36 31 'realm.peer-joined', 37 32 z.object({ ··· 56 51 ) 57 52 58 53 export type RealmSignalEvent = z.infer<typeof realmSignalEventSchema> 59 - export type RealmSignalEventPayload = z.infer<typeof realmSignalEventPayloadSchema> 60 - 61 54 export type RealmBroadcastEvent = z.infer<typeof realmBroadcastEventSchema> 62 55 export type RealmPeerJoinedEvent = z.infer<typeof realmPeerJoinedEventSchema> 63 56 export type RealmPeerLeftEvent = z.infer<typeof realmPeerLeftEventSchema>
+15
src/realm/protocol/messages-rtc.ts
··· 1 + import {z} from 'zod/v4' 2 + 3 + export const rtcSignalDataSchema = z.discriminatedUnion('type', [ 4 + z.object({type: z.literal('offer'), sdp: z.any()}), 5 + z.object({type: z.literal('answer'), sdp: z.any()}), 6 + z.object({type: z.literal('candidate'), candidate: z.any()}), 7 + ]) 8 + 9 + export const rtcSignalPayloadSchema = z.object({ 10 + signal: rtcSignalDataSchema, 11 + initiator: z.boolean(), 12 + }) 13 + 14 + export type RTCSignalData = z.infer<typeof rtcSignalDataSchema> 15 + export type RTCSignalPayload = z.infer<typeof rtcSignalPayloadSchema>
-8
src/realm/schema/brands.ts
··· 1 - import {z} from 'zod/v4' 2 - 3 1 import {Brand} from '#lib/schema/brand' 4 2 5 3 const ident = Symbol('ident') ··· 9 7 const realm = Symbol('realm') 10 8 export const RealmBrand = new Brand<typeof realm>(realm, 'rlm') 11 9 export type RealmID = ReturnType<typeof RealmBrand.generate> 12 - 13 - export const serverPeerIdSchema = z.literal('server') 14 - export type ServerPeerID = z.infer<typeof serverPeerIdSchema> 15 - 16 - export const peerIdSchema = z.union([IdentBrand.schema, serverPeerIdSchema]) 17 - export type PeerID = z.infer<typeof peerIdSchema>
+10
src/realm/schema/index.ts
··· 1 1 import {z} from 'zod/v4' 2 2 3 + import {RealmBrand} from './brands' 3 4 import {logicalClockSchema} from './timestamp' 4 5 5 6 export const actionSchema = z.object({ ··· 22 23 dat: schema, 23 24 }) 24 25 } 26 + 27 + export const realmAddressSchema = z.object({ 28 + realmid: RealmBrand.schema, 29 + upstream: z.url(), 30 + invitation: z.string().optional(), 31 + register: z.boolean().optional(), 32 + }) 33 + 34 + export type RealmAddress = z.infer<typeof realmAddressSchema>
+4 -4
src/realm/schema/timestamp.ts
··· 58 58 * compare(ts1, ts2) // Returns negative (ts1 happened before ts2) 59 59 * ``` 60 60 */ 61 - export function compare(a?: Timestamp, b?: Timestamp): number { 62 - if (a === undefined && b == undefined) return 0 63 - if (a === undefined) return 1 64 - if (b === undefined) return -1 61 + export function compare(a?: Timestamp | null, b?: Timestamp | null): number { 62 + if (a == null && b == null) return 0 63 + if (a == null) return 1 64 + if (b == null) return -1 65 65 66 66 const [_a, asec, acounter, aident] = a.split(':') 67 67 const [_b, bsec, bcounter, bident] = b.split(':')
+3 -3
src/realm/server/driver-preauth.spec.ts
··· 70 70 typ: 'res', 71 71 msg: 'preauth.authn', 72 72 dat: { 73 - peers: ['server'], 73 + peers: [], 74 74 identities: { 75 75 [identid]: pubkeyJwk, 76 76 }, ··· 159 159 typ: 'res', 160 160 msg: 'preauth.authn', 161 161 dat: expect.objectContaining({ 162 - peers: expect.arrayContaining(['server']), 162 + peers: expect.arrayContaining([]), 163 163 identities: expect.objectContaining({ 164 164 [identid]: pubkeyJwk, 165 165 }), ··· 345 345 typ: 'res', 346 346 msg: 'preauth.authn', 347 347 dat: expect.objectContaining({ 348 - peers: expect.arrayContaining(['server']), 348 + peers: expect.arrayContaining([]), 349 349 identities: expect.objectContaining({ 350 350 [inviterIdentid]: expect.anything(), 351 351 }),
+1 -1
src/realm/server/driver-preauth.ts
··· 76 76 msg: 'preauth.authn', 77 77 seq: jwt.payload.seq, 78 78 dat: { 79 - peers: [...realm.peers, 'server'], 79 + peers: realm.peers, 80 80 identities: realm.knownIdentities(), 81 81 }, 82 82 })
+2 -2
src/realm/server/driver-realm.ts
··· 121 121 122 122 // and also send sync actions (server is always a sync partner) 123 123 if (req.dat.requestSync) { 124 - const actions = realm.buildSyncDelta(clocks) 124 + const actions = realm.buildSyncDelta(req.dat.clocks) 125 125 if (actions.length) { 126 - putSocket(ws, actions) 126 + putSocket(ws, actions) 127 127 } 128 128 } 129 129 }
+2 -1
src/realm/server/realm.ts
··· 239 239 * @returns array of actions the peer has not seen 240 240 */ 241 241 buildSyncDelta(clocks: PeerClocks): RealmAction[] { 242 - const entries = Object.entries(clocks) 243 242 const schema = jsonCodec(actionSchema) 244 243 244 + // a null clock is the same as no clock 245 + const entries = Object.entries(clocks).filter(([_, v]) => !!v) 245 246 if (entries.length === 0) { 246 247 const stmt = this.#database.prepare(`SELECT action FROM action ORDER BY clock ASC`) 247 248 const rows = stmt.all()