A music player that connects to your cloud/distributed storage.
at v4 14 kB view raw
1import QS from "query-string"; 2import { decodeMessage, encodeMessage, RPCChannel } from "@kunkun/kkrpc"; 3import { html, render } from "lit-html"; 4 5import { effect, signal } from "@common/signal.js"; 6import { rpc, workerLink, workerProxy, workerTunnel } from "./worker.js"; 7import { BrowserPostMessageIo } from "./worker/rpc.js"; 8 9/** 10 * @import {BroadcastingStatus, WorkerOpts} from "./element.d.ts" 11 * @import {ProxiedActions, Tunnel} from "./worker.d.ts"; 12 * @import {Signal} from "./signal.d.ts" 13 */ 14 15export { nothing } from "lit-html"; 16export const DEFAULT_GROUP = "default"; 17 18/** 19 * Base for custom elements, provides some utility functionality 20 * around rendering and managing signals. 21 */ 22export class DiffuseElement extends HTMLElement { 23 $connected = signal(false) 24 25 #connected = Promise.withResolvers(); 26 #disposables = /** @type {Array<() => void>} */ ([]); 27 28 /** */ 29 constructor() { 30 super(); 31 32 this.worker = this.worker.bind(this); 33 this.workerLink = this.workerLink.bind(this); 34 } 35 36 /** 37 * @param {string} _name 38 * @param {string} oldValue 39 * @param {string} newValue 40 */ 41 attributeChangedCallback(_name, oldValue, newValue) { 42 if (oldValue !== newValue) this.#render(); 43 } 44 45 /** 46 * Effect helper that automatically is disposes 47 * when this element is removed from the DOM. 48 * 49 * @param {() => void} fn 50 */ 51 effect(fn) { 52 this.#disposables.push(effect(fn)); 53 } 54 55 /** */ 56 forceRender() { 57 return this.#render(); 58 } 59 60 /** */ 61 get group() { 62 return this.getAttribute("group") ?? DEFAULT_GROUP; 63 } 64 65 /** */ 66 get label() { 67 return this.getAttribute("label") ?? this.id ?? this.localName; 68 } 69 70 /** */ 71 get nameWithGroup() { 72 return `${this.constructor.prototype.constructor.NAME}/${this.group}`; 73 } 74 75 /** */ 76 root() { 77 return (this.shadowRoot ?? this); 78 } 79 80 /** */ 81 whenConnected() { 82 return this.#connected.promise; 83 } 84 85 /** 86 * Avoid replacing the whole subtree, 87 * morph the existing DOM into the new given tree. 88 */ 89 #render() { 90 if (!("render" in this && typeof this.render === "function")) return; 91 92 const tmp = this.render({ 93 html: html, 94 state: "state" in this ? this.state : undefined, 95 }); 96 97 render(tmp, this.root()); 98 } 99 100 // LIFECYCLE 101 102 connectedCallback() { 103 this.$connected.value = true 104 this.#connected.resolve(null); 105 106 if (!("render" in this && typeof this.render === "function")) return; 107 108 this.effect(() => { 109 if (!("render" in this && typeof this.render === "function")) return; 110 this.#render(); 111 }); 112 } 113 114 disconnectedCallback() { 115 this.$connected.value = false 116 this.#teardown(); 117 } 118 119 #teardown() { 120 this.#disposables.forEach((fn) => fn()); 121 } 122 123 // WORKERS 124 125 /** @type {undefined | Worker | SharedWorker} */ 126 #worker; 127 128 createWorker() { 129 const NAME = this.constructor.prototype.constructor.NAME; 130 const WORKER_URL = this.constructor.prototype.constructor.WORKER_URL; 131 132 if (!NAME) throw new Error("Missing `NAME` static property"); 133 if (!WORKER_URL) throw new Error("Missing `WORKER_URL` static property"); 134 135 // Query 136 const query = QS.stringify( 137 "workerQuery" in this && typeof this.workerQuery === "function" 138 ? this.workerQuery() 139 : {}, 140 ); 141 142 // Setup worker 143 const name = this.nameWithGroup; 144 const url = import.meta.resolve("./" + WORKER_URL) + `?${query}`; 145 146 let worker; 147 148 if (this.hasAttribute("group")) { 149 worker = new SharedWorker(url, { name, type: "module" }); 150 } else { 151 worker = new Worker(url, { name, type: "module" }); 152 } 153 154 return worker; 155 } 156 157 /** */ 158 dependencies() { 159 return Object.fromEntries( 160 Array.from(this.children).flatMap((element) => { 161 if ("nameWithGroup" in element === false) { 162 return []; 163 } 164 165 const d = /** @type {DiffuseElement} */ (element); 166 return [[d.localName, d]]; 167 }), 168 ); 169 } 170 171 worker() { 172 this.#worker ??= this.createWorker(); 173 return this.#worker; 174 } 175 176 workerLink() { 177 const worker = this.worker(); 178 return workerLink(worker); 179 } 180 181 /** 182 * @template {Record<string, (...args: any[]) => any>} Actions 183 * @param {WorkerOpts} [opts] 184 * @returns {ProxiedActions<Actions>} 185 */ 186 workerProxy(opts) { 187 return workerProxy( 188 () => this.workerTunnel(opts).port, 189 ); 190 } 191 192 /** 193 * @param {WorkerOpts} [opts] 194 */ 195 workerTunnel({ forceNew } = {}) { 196 // Creates a MessagePort that is connected to the worker. 197 // All the dependencies are added automatically. 198 const worker = forceNew === true || 199 (typeof forceNew === "object" && forceNew.self === true) 200 ? this.createWorker() 201 : this.worker(); 202 const deps = this.dependencies(); 203 204 let toWorker; 205 206 if (Object.keys(deps).length) { 207 toWorker = 208 /** 209 * @param {any} msg 210 */ 211 async (msg) => { 212 /** @type {Array<[string, Tunnel]>} */ 213 const ports = Object.entries(deps).map( 214 /** @param {[string, DiffuseElement]} _ */ 215 ([k, v]) => { 216 const n = typeof forceNew === "object" 217 ? forceNew.dependencies?.[k] ?? false 218 : false; 219 return [k, v.workerTunnel({ forceNew: n })]; 220 }, 221 ); 222 223 const decoded = await decodeMessage(msg); 224 const data = { 225 data: Array.isArray(decoded.args) ? decoded.args[0] : decoded.args, 226 ports: Object.fromEntries(ports.map(([k, v]) => { 227 return [k, v.port]; 228 })), 229 }; 230 231 const encoded = encodeMessage( 232 { 233 ...decoded, 234 args: Array.isArray(decoded.args) 235 ? [data, ...decoded.args.slice(1)] 236 : decoded.args, 237 }, 238 {}, 239 true, 240 ports.map(([_k, v]) => v.port), 241 ); 242 243 this.#disposables.push(() => { 244 ports.forEach(([_k, v]) => v.disconnect()); 245 }); 246 247 return { 248 data: encoded, 249 transfer: ports.map(([_k, v]) => v.port), 250 }; 251 }; 252 } 253 254 const tunnel = workerTunnel(worker, { toWorker }); 255 return tunnel; 256 } 257} 258 259/** 260 * Broadcastable version of the base class. 261 * 262 * Share the state of an element across multiple tabs 263 * of the same origin and have one instance be the leader. 264 */ 265export class BroadcastableDiffuseElement extends DiffuseElement { 266 broadcasted = false; 267 268 /** @type {{ assumeLeadership?: boolean }} */ 269 #broadcastingOptions = {}; 270 271 #broadcastingStatus; 272 broadcastingStatus; 273 274 /** @type {PromiseWithResolvers<void>} */ 275 #lock = Promise.withResolvers(); 276 277 /** @type {PromiseWithResolvers<BroadcastingStatus>} */ 278 #status = Promise.withResolvers(); 279 280 constructor() { 281 super(); 282 283 this.broadcast = this.broadcast.bind(this); 284 285 /** @type {Signal<Promise<BroadcastingStatus>>} */ 286 this.#broadcastingStatus = signal(this.#status.promise, { eager: true }); 287 this.broadcastingStatus = this.#broadcastingStatus.get; 288 } 289 290 /** 291 * @template {Record<string, { strategy: "leaderOnly" | "replicate", fn: (...args: any[]) => any }>} ActionsWithStrategy 292 * @template {{ [K in keyof ActionsWithStrategy]: ActionsWithStrategy[K]["fn"] }} Actions 293 * @param {string} channelName 294 * @param {ActionsWithStrategy} actionsWithStrategy 295 * @param {{ assumeLeadership?: boolean }} [options] 296 */ 297 broadcast(channelName, actionsWithStrategy, options) { 298 if (this.broadcasted) return; 299 if (options) this.#broadcastingOptions = options; 300 301 const channel = new BroadcastChannel(channelName); 302 const msg = new MessageChannel(); 303 304 /** 305 * @typedef {{ [K in keyof ActionsWithStrategy]: ActionsWithStrategy[K]["fn"] }} A 306 */ 307 308 this.broadcasted = true; 309 this.channelName = channelName; 310 311 const _rpc = rpc( 312 msg.port2, 313 Object.fromEntries( 314 Object.entries(actionsWithStrategy).map(([k, v]) => { 315 return [k, v.fn.bind(this)]; 316 }), 317 ), 318 ); 319 320 channel.addEventListener( 321 "message", 322 async (event) => { 323 if (event.data?.includes('"method":"leader:')) { 324 const status = await this.#status.promise; 325 if (status.leader) { 326 const json = event.data.replace('"method":"leader:', '"method":"'); 327 msg.port1.postMessage(json); 328 } 329 } else { 330 msg.port1.postMessage(event.data); 331 } 332 }, 333 ); 334 335 msg.port1.addEventListener( 336 "message", 337 (event) => channel.postMessage(event.data), 338 ); 339 340 msg.port1.start(); 341 msg.port2.start(); 342 343 async function anyoneWaiting() { 344 const state = await navigator.locks.query(); 345 return !!state.pending?.length; 346 } 347 348 const io = new BrowserPostMessageIo(() => msg.port2); 349 350 /** @type {undefined | RPCChannel<{}, ProxiedActions<Actions>>} */ 351 const proxyChannel = new RPCChannel(io, { enableTransfer: true }); 352 353 /** @type {ProxiedActions<Actions>} */ 354 const proxy = proxyChannel.getAPI(); 355 356 /** @type {any} */ 357 const actions = {}; 358 359 Object.entries(actionsWithStrategy).forEach( 360 ([action, { fn, strategy }]) => { 361 const ogFn = fn.bind(this); 362 let wrapFn = ogFn; 363 364 switch (strategy) { 365 case "leaderOnly": 366 /** @param {Parameters<Actions[action]>} args */ 367 wrapFn = async (...args) => { 368 const status = await this.#status.promise; 369 return status.leader 370 ? ogFn(...args) 371 : proxyChannel.callMethod(`leader:${action}`, args); 372 }; 373 break; 374 375 case "replicate": 376 /** @param {Parameters<Actions[action]>} args */ 377 wrapFn = async (...args) => { 378 anyoneWaiting().then((bool) => { 379 if (bool) proxy[action](...args); 380 }); 381 return ogFn(...args); 382 }; 383 break; 384 } 385 386 actions[action] = wrapFn; 387 }, 388 ); 389 390 return /** @type {ProxiedActions<Actions>} */ (actions); 391 } 392 393 async isLeader() { 394 if (this.broadcasted) { 395 const status = await this.broadcastingStatus(); 396 return status.leader; 397 } else { 398 return true; 399 } 400 } 401 402 // LIFECYCLE 403 404 /** 405 * @override 406 */ 407 connectedCallback() { 408 super.connectedCallback(); 409 410 if (!this.broadcasted) return; 411 412 // Grab a lock if it isn't acquired yet and if needed, 413 // and hold it until `this.lock.promise` resolves. 414 const assumeLeadership = this.#broadcastingOptions?.assumeLeadership; 415 416 if (assumeLeadership === undefined || assumeLeadership === true) { 417 navigator.locks.request( 418 `${this.channelName}/lock`, 419 assumeLeadership === true ? { steal: true } : { ifAvailable: true }, 420 (lock) => { 421 this.#status.resolve( 422 lock ? { leader: true, initialLeader: true } : { leader: false }, 423 ); 424 if (lock) return this.#lock.promise; 425 }, 426 ); 427 } else { 428 this.#status.resolve( 429 { leader: false }, 430 ); 431 } 432 433 // When the lock status is initially determined, log its status. 434 // Additionally, wait for lock if needed. 435 this.#status.promise.then((status) => { 436 if (status.leader) { 437 console.log(`🧙 Elected leader for: ${this.channelName}`); 438 } else { 439 console.log(`🔮 Watching leader: ${this.channelName}`); 440 } 441 442 // Wait for leadership 443 if (status.leader === false) { 444 navigator.locks.request( 445 `${this.channelName}/lock`, 446 () => { 447 this.#status = Promise.withResolvers(); 448 this.#status.resolve({ leader: true, initialLeader: false }); 449 450 this.#broadcastingStatus.value = this.#status.promise; 451 452 return this.#lock.promise; 453 }, 454 ); 455 } 456 }); 457 } 458 459 /** 460 * @override 461 */ 462 disconnectedCallback() { 463 super.disconnectedCallback(); 464 this.#lock.resolve(); 465 } 466} 467 468/** 469 * Component DOM selector. 470 * 471 * Basically `document.querySelector` but returns the element 472 * with the correct type based on the element module given. 473 * 474 * ``` 475 * import * as QueryEngine from "@components/engine/query/element.js" 476 * 477 * const instance = component(QueryEngine) 478 * ``` 479 * 480 * @template {abstract new (...args: any[]) => any} C 481 * @param {{ CLASS: C; NAME: string }} elementModule 482 * @param {string} [id] Optional id to select 483 */ 484export function component(elementModule, id) { 485 const el = document.querySelector( 486 id ? `${elementModule.NAME}#${id}` : elementModule.NAME, 487 ); 488 if (!el) { 489 throw new Error(`Element for selector '${elementModule.NAME}' not found.`); 490 } 491 return /** @type {InstanceType<C>} */ (el); 492} 493 494/** 495 * @template {HTMLElement} T 496 * @param {DiffuseElement} parent 497 * @param {string} attribute 498 * @returns {T} 499 */ 500export function query(parent, attribute) { 501 const selector = parent.getAttribute(attribute); 502 503 if (!selector) { 504 throw new Error(`Missing required '${attribute}' attribute`); 505 } 506 507 /** @type {T | null} */ 508 const element = document.querySelector(selector); 509 if (!element) throw new Error(`Missing required '${selector}' element`); 510 511 return element; 512} 513 514/** 515 * @param {Record<string, Worker | SharedWorker>} workers 516 */ 517export function terminateWorkers(workers) { 518 Object.values(workers).forEach((worker) => { 519 if (worker instanceof Worker) worker.terminate(); 520 }); 521} 522 523/** 524 * @template {Record<string, DiffuseElement>} T 525 * @param {T} elements 526 */ 527export async function whenElementsDefined(elements) { 528 await Promise.all( 529 Object.values(elements).map((element) => 530 customElements.whenDefined(element.localName) 531 ), 532 ); 533}