bring in devenv and move more processing into the peer

+398
.devenv.flake.nix
··· 1 + { 2 + inputs = 3 + let 4 + version = "1.10.0"; 5 + system = "x86_64-linux"; 6 + devenv_root = "/home/jonathan/code/skypod-node"; 7 + devenv_dotfile = "/home/jonathan/code/skypod-node/.devenv"; 8 + devenv_dotfile_path = ./.devenv; 9 + devenv_tmpdir = "/run/user/1000"; 10 + devenv_runtime = "/run/user/1000/devenv-d48b567"; 11 + devenv_istesting = false; 12 + devenv_direnvrc_latest_version = 1; 13 + container_name = null; 14 + active_profiles = [ ]; 15 + hostname = "buzz"; 16 + username = "jonathan"; 17 + git_root = "/home/jonathan/code/skypod-node"; 18 + 19 + in { 20 + git-hooks.url = "github:cachix/git-hooks.nix"; 21 + git-hooks.inputs.nixpkgs.follows = "nixpkgs"; 22 + pre-commit-hooks.follows = "git-hooks"; 23 + nixpkgs.url = "github:cachix/devenv-nixpkgs/rolling"; 24 + devenv.url = "github:cachix/devenv?dir=src/modules"; 25 + } // (if builtins.pathExists (devenv_dotfile_path + "/flake.json") 26 + then builtins.fromJSON (builtins.readFile (devenv_dotfile_path + "/flake.json")) 27 + else { }); 28 + 29 + outputs = { nixpkgs, ... }@inputs: 30 + let 31 + version = "1.10.0"; 32 + system = "x86_64-linux"; 33 + devenv_root = "/home/jonathan/code/skypod-node"; 34 + devenv_dotfile = "/home/jonathan/code/skypod-node/.devenv"; 35 + devenv_dotfile_path = ./.devenv; 36 + devenv_tmpdir = "/run/user/1000"; 37 + devenv_runtime = "/run/user/1000/devenv-d48b567"; 38 + devenv_istesting = false; 39 + devenv_direnvrc_latest_version = 1; 40 + container_name = null; 41 + active_profiles = [ ]; 42 + hostname = "buzz"; 43 + username = "jonathan"; 44 + git_root = "/home/jonathan/code/skypod-node"; 45 + 46 + devenv = 47 + if builtins.pathExists (devenv_dotfile_path + "/devenv.json") 48 + then builtins.fromJSON (builtins.readFile (devenv_dotfile_path + "/devenv.json")) 49 + else { }; 50 + 51 + systems = [ "x86_64-linux" "aarch64-linux" "x86_64-darwin" "aarch64-darwin" ]; 52 + 53 + # Function to create devenv configuration for a specific system with profiles support 54 + mkDevenvForSystem = targetSystem: 55 + let 56 + getOverlays = inputName: inputAttrs: 57 + map 58 + (overlay: 59 + let 60 + input = inputs.${inputName} or (throw "No such input `${inputName}` while trying to configure overlays."); 61 + in 62 + input.overlays.${overlay} or (throw "Input `${inputName}` has no overlay called `${overlay}`. Supported overlays: ${nixpkgs.lib.concatStringsSep ", " (builtins.attrNames input.overlays)}")) 63 + inputAttrs.overlays or [ ]; 64 + overlays = nixpkgs.lib.flatten (nixpkgs.lib.mapAttrsToList getOverlays (devenv.inputs or { })); 65 + permittedUnfreePackages = devenv.nixpkgs.per-platform."${targetSystem}".permittedUnfreePackages or devenv.nixpkgs.permittedUnfreePackages or [ ]; 66 + pkgs = import nixpkgs { 67 + system = targetSystem; 68 + config = { 69 + allowUnfree = devenv.nixpkgs.per-platform."${targetSystem}".allowUnfree or devenv.nixpkgs.allowUnfree or devenv.allowUnfree or false; 70 + allowBroken = devenv.nixpkgs.per-platform."${targetSystem}".allowBroken or devenv.nixpkgs.allowBroken or devenv.allowBroken or false; 71 + cudaSupport = devenv.nixpkgs.per-platform."${targetSystem}".cudaSupport or devenv.nixpkgs.cudaSupport or false; 72 + cudaCapabilities = devenv.nixpkgs.per-platform."${targetSystem}".cudaCapabilities or devenv.nixpkgs.cudaCapabilities or [ ]; 73 + permittedInsecurePackages = devenv.nixpkgs.per-platform."${targetSystem}".permittedInsecurePackages or devenv.nixpkgs.permittedInsecurePackages or devenv.permittedInsecurePackages or [ ]; 74 + allowUnfreePredicate = if (permittedUnfreePackages != [ ]) then (pkg: builtins.elem (nixpkgs.lib.getName pkg) permittedUnfreePackages) else (_: false); 75 + }; 76 + inherit overlays; 77 + }; 78 + lib = pkgs.lib; 79 + importModule = path: 80 + if lib.hasPrefix "./" path 81 + then if lib.hasSuffix ".nix" path 82 + then ./. + (builtins.substring 1 255 path) 83 + else ./. + (builtins.substring 1 255 path) + "/devenv.nix" 84 + else if lib.hasPrefix "../" path 85 + then 86 + # For parent directory paths, concatenate with /. 87 + # ./. refers to the directory containing this file (project root) 88 + # So ./. + "/../shared" = <project-root>/../shared 89 + if lib.hasSuffix ".nix" path 90 + then ./. + "/${path}" 91 + else ./. + "/${path}/devenv.nix" 92 + else 93 + let 94 + paths = lib.splitString "/" path; 95 + name = builtins.head paths; 96 + input = inputs.${name} or (throw "Unknown input ${name}"); 97 + subpath = "/${lib.concatStringsSep "/" (builtins.tail paths)}"; 98 + devenvpath = "${input}" + subpath; 99 + devenvdefaultpath = devenvpath + "/devenv.nix"; 100 + in 101 + if lib.hasSuffix ".nix" devenvpath 102 + then devenvpath 103 + else if builtins.pathExists devenvdefaultpath 104 + then devenvdefaultpath 105 + else throw (devenvdefaultpath + " file does not exist for input ${name}."); 106 + 107 + # Phase 1: Base evaluation to extract profile definitions 108 + baseProject = pkgs.lib.evalModules { 109 + specialArgs = inputs // { inherit inputs; }; 110 + modules = [ 111 + ({ config, ... }: { 112 + _module.args.pkgs = pkgs.appendOverlays (config.overlays or [ ]); 113 + }) 114 + (inputs.devenv.modules + /top-level.nix) 115 + ({ options, ... }: { 116 + config.devenv = lib.mkMerge [ 117 + { 118 + cliVersion = version; 119 + root = devenv_root; 120 + dotfile = devenv_dotfile; 121 + } 122 + (pkgs.lib.optionalAttrs (builtins.hasAttr "tmpdir" options.devenv) { 123 + tmpdir = devenv_tmpdir; 124 + }) 125 + (pkgs.lib.optionalAttrs (builtins.hasAttr "isTesting" options.devenv) { 126 + isTesting = devenv_istesting; 127 + }) 128 + (pkgs.lib.optionalAttrs (builtins.hasAttr "runtime" options.devenv) { 129 + runtime = devenv_runtime; 130 + }) 131 + (pkgs.lib.optionalAttrs (builtins.hasAttr "direnvrcLatestVersion" options.devenv) { 132 + direnvrcLatestVersion = devenv_direnvrc_latest_version; 133 + }) 134 + ]; 135 + }) 136 + ({ options, ... }: { 137 + config = lib.mkMerge [ 138 + (pkgs.lib.optionalAttrs (builtins.hasAttr "git" options) { 139 + git.root = git_root; 140 + }) 141 + ]; 142 + }) 143 + (pkgs.lib.optionalAttrs (container_name != null) { 144 + container.isBuilding = pkgs.lib.mkForce true; 145 + containers.${container_name}.isBuilding = true; 146 + }) 147 + ] ++ (map importModule (devenv.imports or [ ])) ++ [ 148 + (if builtins.pathExists ./devenv.nix then ./devenv.nix else { }) 149 + (devenv.devenv or { }) 150 + (if builtins.pathExists ./devenv.local.nix then ./devenv.local.nix else { }) 151 + (if builtins.pathExists (devenv_dotfile_path + "/cli-options.nix") then import (devenv_dotfile_path + "/cli-options.nix") else { }) 152 + ]; 153 + }; 154 + 155 + # Phase 2: Extract and apply profiles using extendModules with priority overrides 156 + project = 157 + let 158 + # Build ordered list of profile names: hostname -> user -> manual 159 + manualProfiles = active_profiles; 160 + currentHostname = hostname; 161 + currentUsername = username; 162 + hostnameProfiles = lib.optional (currentHostname != "" && builtins.hasAttr currentHostname (baseProject.config.profiles.hostname or { })) "hostname.${currentHostname}"; 163 + userProfiles = lib.optional (currentUsername != "" && builtins.hasAttr currentUsername (baseProject.config.profiles.user or { })) "user.${currentUsername}"; 164 + 165 + # Ordered list of profiles to activate 166 + orderedProfiles = hostnameProfiles ++ userProfiles ++ manualProfiles; 167 + 168 + # Resolve profile extends with cycle detection 169 + resolveProfileExtends = profileName: visited: 170 + if builtins.elem profileName visited then 171 + throw "Circular dependency detected in profile extends: ${lib.concatStringsSep " -> " visited} -> ${profileName}" 172 + else 173 + let 174 + profile = getProfileConfig profileName; 175 + extends = profile.extends or [ ]; 176 + newVisited = visited ++ [ profileName ]; 177 + extendedProfiles = lib.flatten (map (name: resolveProfileExtends name newVisited) extends); 178 + in 179 + extendedProfiles ++ [ profileName ]; 180 + 181 + # Get profile configuration by name from baseProject 182 + getProfileConfig = profileName: 183 + if lib.hasPrefix "hostname." profileName then 184 + let name = lib.removePrefix "hostname." profileName; 185 + in baseProject.config.profiles.hostname.${name} 186 + else if lib.hasPrefix "user." profileName then 187 + let name = lib.removePrefix "user." profileName; 188 + in baseProject.config.profiles.user.${name} 189 + else 190 + let 191 + availableProfiles = builtins.attrNames (baseProject.config.profiles or { }); 192 + hostnameProfiles = map (n: "hostname.${n}") (builtins.attrNames (baseProject.config.profiles.hostname or { })); 193 + userProfiles = map (n: "user.${n}") (builtins.attrNames (baseProject.config.profiles.user or { })); 194 + allAvailableProfiles = availableProfiles ++ hostnameProfiles ++ userProfiles; 195 + in 196 + baseProject.config.profiles.${profileName} or (throw "Profile '${profileName}' not found. Available profiles: ${lib.concatStringsSep ", " allAvailableProfiles}"); 197 + 198 + # Fold over ordered profiles to build final list with extends 199 + expandedProfiles = lib.foldl' 200 + (acc: profileName: 201 + let 202 + allProfileNames = resolveProfileExtends profileName [ ]; 203 + in 204 + acc ++ allProfileNames 205 + ) [ ] 206 + orderedProfiles; 207 + 208 + # Map over expanded profiles and apply priorities 209 + allPrioritizedModules = lib.imap0 210 + (index: profileName: 211 + let 212 + # Decrement priority for each profile (lower = higher precedence) 213 + # Start with the next lowest priority after the default priority for values (100) 214 + profilePriority = (lib.modules.defaultOverridePriority - 1) - index; 215 + profileConfig = getProfileConfig profileName; 216 + 217 + # Check if an option type needs explicit override to resolve conflicts 218 + # Only apply overrides to LEAF values (scalars), not collection types that can merge 219 + typeNeedsOverride = type: 220 + if type == null then false 221 + else 222 + let 223 + typeName = type.name or type._type or ""; 224 + 225 + # True leaf types that need priority resolution when they conflict 226 + isLeafType = builtins.elem typeName [ 227 + "str" 228 + "int" 229 + "bool" 230 + "enum" 231 + "path" 232 + "package" 233 + "float" 234 + "anything" 235 + ]; 236 + in 237 + if isLeafType then true 238 + else if typeName == "nullOr" then 239 + # For nullOr, check the wrapped type recursively 240 + let 241 + innerType = type.elemType or 242 + (if type ? nestedTypes && type.nestedTypes ? elemType 243 + then type.nestedTypes.elemType 244 + else null); 245 + in 246 + if innerType != null then typeNeedsOverride innerType else false 247 + else 248 + # Everything else (collections, submodules, etc.) should merge naturally 249 + false; 250 + 251 + # Check if a config path needs explicit override 252 + pathNeedsOverride = optionPath: 253 + let 254 + # Try direct option first 255 + directOption = lib.attrByPath optionPath null baseProject.options; 256 + in 257 + if directOption != null && lib.isOption directOption then 258 + typeNeedsOverride directOption.type 259 + else if optionPath != [ ] then 260 + # Check parent for freeform type 261 + let 262 + parentPath = lib.init optionPath; 263 + parentOption = lib.attrByPath parentPath null baseProject.options; 264 + in 265 + if parentOption != null && lib.isOption parentOption then 266 + let 267 + # Look for freeform type: 268 + # 1. Standard location: type.freeformType (primary) 269 + # 2. Nested location: type.nestedTypes.freeformType (evaluated form) 270 + freeformType = parentOption.type.freeformType or 271 + parentOption.type.nestedTypes.freeformType or 272 + null; 273 + elementType = 274 + if freeformType ? elemType then freeformType.elemType 275 + else if freeformType ? nestedTypes && freeformType.nestedTypes ? elemType then freeformType.nestedTypes.elemType 276 + else freeformType; 277 + in 278 + typeNeedsOverride elementType 279 + else false 280 + else false; 281 + 282 + # Support overriding both plain attrset modules and functions 283 + applyModuleOverride = config: 284 + if builtins.isFunction config 285 + then 286 + let 287 + wrapper = args: applyOverrideRecursive (config args) [ ]; 288 + in 289 + lib.mirrorFunctionArgs config wrapper 290 + else applyOverrideRecursive config [ ]; 291 + 292 + # Apply overrides recursively based on option types 293 + applyOverrideRecursive = config: optionPath: 294 + if lib.isAttrs config && config ? _type then 295 + config # Don't touch values with existing type metadata 296 + else if lib.isAttrs config then 297 + lib.mapAttrs (name: value: applyOverrideRecursive value (optionPath ++ [ name ])) config 298 + else if pathNeedsOverride optionPath then 299 + lib.mkOverride profilePriority config 300 + else 301 + config; 302 + 303 + # Apply priority overrides recursively to the deferredModule imports structure 304 + prioritizedConfig = ( 305 + profileConfig.module // { 306 + imports = lib.map 307 + (importItem: 308 + importItem // { 309 + imports = lib.map 310 + (nestedImport: 311 + applyModuleOverride nestedImport 312 + ) 313 + (importItem.imports or [ ]); 314 + } 315 + ) 316 + (profileConfig.module.imports or [ ]); 317 + } 318 + ); 319 + in 320 + prioritizedConfig 321 + ) 322 + expandedProfiles; 323 + in 324 + if allPrioritizedModules == [ ] 325 + then baseProject 326 + else baseProject.extendModules { modules = allPrioritizedModules; }; 327 + 328 + config = project.config; 329 + 330 + options = pkgs.nixosOptionsDoc { 331 + options = builtins.removeAttrs project.options [ "_module" ]; 332 + warningsAreErrors = false; 333 + # Unpack Nix types, e.g. literalExpression, mDoc. 334 + transformOptions = 335 + let isDocType = v: builtins.elem v [ "literalDocBook" "literalExpression" "literalMD" "mdDoc" ]; 336 + in lib.attrsets.mapAttrs (_: v: 337 + if v ? _type && isDocType v._type then 338 + v.text 339 + else if v ? _type && v._type == "derivation" then 340 + v.name 341 + else 342 + v 343 + ); 344 + }; 345 + 346 + # Recursively search for outputs in the config. 347 + # This is used when not building a specific output by attrpath. 348 + build = options: config: 349 + lib.concatMapAttrs 350 + (name: option: 351 + if lib.isOption option then 352 + let typeName = option.type.name or ""; 353 + in 354 + if builtins.elem typeName [ "output" "outputOf" ] then 355 + { ${name} = config.${name}; } 356 + else { } 357 + else if builtins.isAttrs option && !lib.isDerivation option then 358 + let v = build option config.${name}; 359 + in if v != { } then { 360 + ${name} = v; 361 + } else { } 362 + else { } 363 + ) 364 + options; 365 + in 366 + { 367 + inherit config options build project; 368 + shell = config.shell; 369 + packages = { 370 + optionsJSON = options.optionsJSON; 371 + # deprecated 372 + inherit (config) info procfileScript procfileEnv procfile; 373 + ci = config.ciDerivation; 374 + }; 375 + }; 376 + 377 + # Generate per-system devenv configurations 378 + perSystem = nixpkgs.lib.genAttrs systems mkDevenvForSystem; 379 + 380 + # Default devenv for the current system 381 + currentSystemDevenv = perSystem.${system}; 382 + in 383 + { 384 + devShell = nixpkgs.lib.genAttrs systems (s: perSystem.${s}.shell); 385 + packages = nixpkgs.lib.genAttrs systems (s: perSystem.${s}.packages); 386 + 387 + # Per-system devenv configurations 388 + devenv = { 389 + # Default devenv for the current system 390 + inherit (currentSystemDevenv) config options build shell packages project; 391 + # Per-system devenv configurations 392 + inherit perSystem; 393 + }; 394 + 395 + # Legacy build output 396 + build = currentSystemDevenv.build currentSystemDevenv.options currentSystemDevenv.config; 397 + }; 398 + }
+1
.devenv/devenv.json
··· 1 + {"inputs":{"nixpkgs":{"url":"github:cachix/devenv-nixpkgs/rolling"}}}
+1
.devenv/flake.json
··· 1 + {"nixpkgs":{"url":"github:cachix/devenv-nixpkgs/rolling"}}
+1
.devenv/gc/shell
··· 1 + shell-5-link
.devenv/imports.txt

This is a binary file and will not be displayed.

+7
.devenv/input-paths.txt
··· 1 + /home/jonathan/code/skypod-node/.devenv/flake.json 2 + /home/jonathan/code/skypod-node/.devenv.flake.nix 3 + /home/jonathan/code/skypod-node/.env 4 + /home/jonathan/code/skypod-node/devenv.local.nix 5 + /home/jonathan/code/skypod-node/devenv.lock 6 + /home/jonathan/code/skypod-node/devenv.nix 7 + /home/jonathan/code/skypod-node/devenv.yaml
+1
.devenv/load-exports
··· 1 +
.devenv/nix-eval-cache.db

This is a binary file and will not be displayed.

.devenv/nix-eval-cache.db-shm

This is a binary file and will not be displayed.

+1
.devenv/profile
··· 1 + /nix/store/wmh3hwran6m1ln7n4ci6794xyrrgcs6w-devenv-profile
+1
.devenv/run
··· 1 + /run/user/1000/devenv-d48b567
.devenv/tasks.db

This is a binary file and will not be displayed.

.devenv/tasks.db-shm

This is a binary file and will not be displayed.

.devenv/tasks.db-wal

This is a binary file and will not be displayed.

+123 -72
src/client/realm/service-connection-peer.ts
··· 3 3 import * as protocol from '#common/protocol' 4 4 import {IdentID} from '#common/protocol' 5 5 6 + import {BlockingQueue} from '#common/async/blocking-queue.js' 7 + import {sleep} from '#common/async/sleep.js' 8 + import z from 'zod/v4' 6 9 import {RealmSyncManager} from './service-connection-sync' 7 10 11 + const realmRtcAutohandleSchema = z.union([ 12 + protocol.realmRtcPingRequestSchema, 13 + protocol.realmRtcPongResponseSchema, 14 + ]) 15 + 8 16 /** a single webrtc peer connection within a realm */ 9 17 export class RealmPeer extends SimplePeer { 10 18 initiator: boolean 11 19 identid: IdentID 12 20 21 + #seq: number 13 22 #sync: RealmSyncManager 14 - 15 - #pingSeq: number 16 - #pingAbort: AbortController 17 - #pingTimer: ReturnType<typeof setInterval> 23 + #queue: BlockingQueue<unknown> 24 + #abort: AbortController 18 25 19 26 constructor(sync: RealmSyncManager, identid: IdentID, initiator: boolean) { 20 27 super({ ··· 30 37 this.initiator = initiator 31 38 this.identid = identid 32 39 40 + this.#seq = 0 33 41 this.#sync = sync 42 + this.#queue = new BlockingQueue() 43 + this.#abort = new AbortController() 34 44 35 - this.#pingSeq = 0 36 - this.#pingAbort = new AbortController() 37 - this.#pingAbort.signal.addEventListener('abort', this.#pingCancel) 38 - this.#pingTimer = setInterval(this.#pingSync, 30_000) 45 + // we start the loops and more event handlers in the connect handler 39 46 40 - this.on('connect', this.#handlePeerConnect.bind(this)) 41 - this.on('close', this.#handlePeerClose.bind(this)) 42 - this.on('error', this.#handlePeerError.bind(this)) 47 + this.on('connect', this.#connected) 48 + this.on('close', this.#closed) 49 + this.on('error', this.#errored) 43 50 44 - if (initiator) { 45 - this.on('connect', this.#pingSync) 46 - } 51 + this.#abort.signal.addEventListener('abort', this.#aborted) 47 52 } 48 53 49 54 destroy(error?: Error) { 50 - this.#pingAbort.abort(error) 55 + this.#abort.abort(error) 56 + 51 57 super.destroy(error) 52 58 } 53 59 54 60 sendJson<T extends unknown>(data: T) { 55 61 console.debug('sending:', this.identid, data) 62 + 56 63 this.send(JSON.stringify(data)) 57 64 } 58 65 59 - async ping(signal?: AbortSignal) { 60 - signal?.throwIfAborted() 61 - if (!this.connected) return 66 + #dispatch(type: string, detail?: object) { 67 + this.emit(type, new CustomEvent(type, {bubbles: true, detail})) 68 + } 62 69 63 - const peerSync = this.identid === this.#sync.chooseSyncPeer(this.#sync.knownPeers) 64 - const peerClocks = await this.#sync.buildSyncState() 70 + #connected = () => { 71 + console.debug(`connected to ${this.identid}`) 65 72 66 - console.log('sending ping to: ', this.identid, peerClocks, peerSync) 67 - this.sendJson<protocol.RealmRtcPingRequest>({ 68 - typ: 'req', 69 - msg: 'realm.rtc.ping', 70 - seq: this.#pingSeq++, 71 - dat: { 72 - peerClocks, 73 - peerSync, 74 - }, 73 + // start loops, then start listening 74 + // loops go first so we don't have messages without handling 75 + 76 + this.#pingLoop().catch((err: unknown) => { 77 + this.#errored(err as Error) 78 + }) 79 + this.#receiveLoop().catch((err: unknown) => { 80 + this.#errored(err as Error) 75 81 }) 82 + 83 + this.on('data', this.#receive) 84 + 85 + // announce 86 + this.#dispatch('peeropen', {identid: this.identid}) 76 87 } 77 88 78 - #pingSync = () => { 79 - this.ping(this.#pingAbort.signal).catch((err: unknown) => { 80 - console.error(err) 81 - }) 89 + #aborted = () => { 90 + console.log('peer connected aborted...', this.identid) 91 + 92 + // stop listeners; loops were stopped by the abort signal 93 + this.off('data', this.#receive) 94 + 95 + // announce 96 + this.#dispatch('peerabort', {identid: this.identid}) 97 + } 98 + 99 + #closed = () => { 100 + console.debug(`disconnected from ${this.identid}`) 101 + 102 + this.#abort.abort() 103 + this.#dispatch('peerclose', {identid: this.identid}) 104 + } 105 + 106 + #errored = (err: Error) => { 107 + console.error(`Error with peer ${this.identid}:`, err) 108 + 109 + this.#dispatch('peererror', {identid: this.identid, error: err}) 110 + } 111 + 112 + /// receives go into a queue 113 + 114 + #receive = (chunk: string | Uint8Array) => { 115 + const string = typeof chunk === 'string' ? chunk : new TextDecoder().decode(chunk) 116 + const data: unknown = JSON.parse(string) 117 + 118 + const messages: unknown[] = Array.isArray(data) ? data : [data] 119 + this.#queue.enqueue(...messages) 82 120 } 83 121 84 - #pingCancel = () => { 85 - console.log('shutting down ping...', this.identid) 122 + #receiveLoop = async () => { 123 + this.#abort.signal.throwIfAborted() 124 + console.debug('receiver loop running for:', this.identid) 125 + 126 + while (!this.#abort.signal.aborted) { 127 + const message = await this.#queue.dequeue(this.#abort.signal) 128 + const parsed = await realmRtcAutohandleSchema.safeParseAsync(message) 129 + 130 + if (!parsed.success) { 131 + this.#dispatch('peerdata', {identid: this.identid, data: message}) 132 + continue 133 + } 134 + 135 + switch (parsed.data.msg) { 136 + case 'realm.rtc.ping': 137 + await this.#receivePing(parsed.data) 138 + continue 86 139 87 - clearInterval(this.#pingTimer) 88 - this.#pingAbort.signal.removeEventListener('abort', this.#pingCancel) 140 + case 'realm.rtc.pong': 141 + await this.#receivePong(parsed.data) 142 + continue 143 + } 144 + } 89 145 } 90 146 91 - // when a peer sends a ping 92 - incomingPing = async (ping: protocol.RealmRtcPingRequest) => { 147 + #receivePing = async (ping: protocol.RealmRtcPingRequest) => { 93 148 const peerClocks = await this.#sync.buildSyncState() 94 149 95 150 // reply to pings with pongs ··· 104 159 if (ping.dat.peerSync) { 105 160 const actions = await this.#sync.fetchSyncDelta(ping.dat.peerClocks) 106 161 if (actions.length) { 107 - console.debug('ping request is sending data:', actions) 108 162 this.sendJson(actions.map((a) => a.action)) 109 163 } 110 164 } 165 + 166 + this.#dispatch('peerping', {identid: this.identid}) 111 167 } 112 168 113 169 // when a peer responds to a ping (lazy sync) 114 - incomingPong = async (pong: protocol.RealmRtcPongResponse) => { 170 + #receivePong = async (pong: protocol.RealmRtcPongResponse) => { 115 171 const actions = await this.#sync.fetchSyncDelta(pong.dat.peerClocks) 116 172 if (actions.length) { 117 - console.debug('pong is sending data:', actions) 118 173 this.sendJson(actions.map((a) => a.action)) 119 174 } 120 175 } 121 176 122 - #dispatchCustomEvent(type: string, detail?: object) { 123 - this.emit(type, new CustomEvent(type, {bubbles: true, detail})) 124 - } 177 + /// ping/pong 125 178 126 - #handlePeerConnect = () => { 127 - console.debug(`connected to ${this.identid}`) 128 - this.on('data', this.#handlePeerData.bind(this)) 179 + #pingLoop = async () => { 180 + this.#abort.signal.throwIfAborted() 181 + console.debug('receiver loop running for:', this.identid) 129 182 130 - this.#dispatchCustomEvent('peeropen', {identid: this.identid}) 131 - } 132 - 133 - #handlePeerClose = () => { 134 - console.debug(`disconnected from ${this.identid}`) 135 - this.off('data', this.#handlePeerData.bind(this)) 183 + if (this.initiator) { 184 + await this.#ping() 185 + } 136 186 137 - this.#dispatchCustomEvent('peerclose', {identid: this.identid}) 187 + while (!this.#abort.signal.aborted) { 188 + await sleep(30_000, this.#abort.signal) 189 + await this.#ping() 190 + } 138 191 } 139 192 140 - #handlePeerError = (err: Error) => { 141 - console.error(`Error with peer ${this.identid}:`, err) 142 - 143 - this.#dispatchCustomEvent('peererror', {identid: this.identid, error: err}) 144 - } 193 + #ping = async () => { 194 + this.#abort.signal.throwIfAborted() 195 + if (!this.connected) return 145 196 146 - #handlePeerData = (chunk: string | Uint8Array) => { 147 - try { 148 - const string = typeof chunk === 'string' ? chunk : new TextDecoder().decode(chunk) 149 - const data: unknown = JSON.parse(string) 197 + const peerSync = this.identid === this.#sync.chooseSyncPeer(this.#sync.knownPeers) 198 + const peerClocks = await this.#sync.buildSyncState() 150 199 151 - const messages: unknown[] = Array.isArray(data) ? data : [data] 152 - for (const message of messages) { 153 - this.#dispatchCustomEvent('peermessage', {peer: this, message}) 154 - } 155 - } catch (er) { 156 - console.error(er) 157 - throw er 158 - } 200 + console.log('sending ping to: ', this.identid, peerClocks, peerSync) 201 + this.sendJson<protocol.RealmRtcPingRequest>({ 202 + typ: 'req', 203 + msg: 'realm.rtc.ping', 204 + seq: this.#seq++, 205 + dat: { 206 + peerClocks, 207 + peerSync, 208 + }, 209 + }) 159 210 } 160 211 }
+5 -51
src/client/realm/service-connection.ts
··· 3 3 import {z} from 'zod/v4' 4 4 5 5 import {timeoutSignal} from '#common/async/aborts.js' 6 - import {BlockingQueue} from '#common/async/blocking-queue.js' 7 6 import {generateSignableJwt, jwkImport} from '#common/crypto/jwks' 8 7 import {jwtPayload, verifyJwtToken} from '#common/crypto/jwts.js' 9 8 import {normalizeError, normalizeProtocolError, ProtocolError} from '#common/errors' ··· 26 25 protocol.realmRtcPongResponseSchema, 27 26 ]) 28 27 29 - const realmRtcAutohandleSchema = z.union([ 30 - protocol.realmRtcPingRequestSchema, 31 - protocol.realmRtcPongResponseSchema, 32 - ]) 33 - 34 28 export interface ConnectionOptions { 35 29 realmid?: RealmID 36 30 register?: boolean ··· 48 42 49 43 #identity: RealmIdentity 50 44 #sync: RealmSyncManager 51 - #queue: BlockingQueue<{peer: RealmPeer; message: unknown}> 52 - #queueAbort: AbortController 53 45 54 46 constructor( 55 47 url: string, ··· 68 60 this.#nonces = new Map() 69 61 70 62 this.#sync = new RealmSyncManager(db, clock, identity.identid, this.#peers) 71 - 72 - this.#queue = new BlockingQueue() 73 - this.#queueAbort = new AbortController() 74 - this.#queueRun(this.#queueAbort.signal).catch((err: unknown) => { 75 - console.error('error in the queue runner!', err) 76 - }) 77 - 78 63 this.#socket = new WebSocket(url) 79 64 this.#socket.onopen = this.#handleSocketOpen 80 65 this.#socket.onclose = this.#handleSocketClose ··· 118 103 const peer = this.#peers.get(identid) 119 104 if (!peer) throw new Error(`Not connected to peer: ${identid}`) 120 105 121 - console.debug('sending:', identid, data) 122 - peer.send(JSON.stringify(data)) 106 + peer.sendJson(data) 123 107 } 124 108 125 109 broadcast(data: unknown, self = false) { ··· 133 117 134 118 destroy() { 135 119 console.debug('realm connection destroy!') 136 - this.#queueAbort.abort() 137 120 138 121 if (this.connected) { 139 122 this.#socket.close() ··· 152 135 this.dispatchEvent(new CustomEvent(type, {bubbles: true, detail})) 153 136 } 154 137 155 - async #queueRun(signal?: AbortSignal) { 156 - signal?.throwIfAborted() 157 - console.debug('realm message queue running!') 158 - 159 - while (!signal?.aborted) { 160 - const {peer, message} = await this.#queue.dequeue(signal) 161 - const parsed = realmRtcAutohandleSchema.safeParse(message) 162 - console.log('processing...', message, parsed) 163 - 164 - // this is not autorespondable - pass along 165 - if (!parsed.success) { 166 - this.#dispatchCustomEvent('peerdata', {identid: peer.identid, data: message}) 167 - continue 168 - } 169 - 170 - // otherwise do the auto handling 171 - switch (parsed.data.msg) { 172 - case 'realm.rtc.ping': 173 - await peer.incomingPing(parsed.data) 174 - continue 175 - 176 - case 'realm.rtc.pong': 177 - await peer.incomingPong(parsed.data) 178 - continue 179 - } 180 - } 181 - } 182 - 183 138 #socketSend<T extends object>(payload: T) { 184 139 this.#socket.send(JSON.stringify(payload)) 185 140 } ··· 366 321 peer = new RealmPeer(this.#sync, remoteid, initiator) 367 322 this.#peers.set(remoteid, peer) 368 323 324 + // handle websocket signalling in here 369 325 peer.on('signal', this.#handlePeerSignal.bind(this, peer)) 370 - peer.on('peermessage', this.#handlePeerMessage.bind(this)) 326 + 327 + // forward everything 328 + peer.on('peerdata', this.dispatchEvent.bind(this)) 371 329 peer.on('peeropen', this.dispatchEvent.bind(this)) 372 330 peer.on('peerclose', this.dispatchEvent.bind(this)) 373 331 peer.on('peererror', this.dispatchEvent.bind(this)) ··· 385 343 this.#peers.delete(identid) 386 344 this.#nonces.delete(identid) 387 345 } 388 - } 389 - 390 - #handlePeerMessage = (event: CustomEvent<{peer: RealmPeer; message: unknown}>) => { 391 - this.#queue.enqueue(event.detail) 392 346 } 393 347 394 348 #handlePeerSignal = (peer: RealmPeer, payload: SimplePeer.SignalData) => {