A music player that connects to your cloud/distributed storage.
at v4 6.4 kB view raw
1import { RPCChannel } from "@kunkun/kkrpc"; 2import { getTransferables } from "@okikio/transferables"; 3import { debounceMicrotask } from "@vicary/debounce-microtask"; 4import { xxh32 } from "xxh32"; 5 6import { BrowserPostMessageIo } from "./worker/rpc.js"; 7 8export { getTransferables } from "@okikio/transferables"; 9export { transfer } from "@kunkun/kkrpc"; 10 11/** 12 * @import {Announcement, Dependencies, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts" 13 */ 14 15//////////////////////////////////////////// 16// MISC 17//////////////////////////////////////////// 18 19/** 20 * Manage incoming connections for a shared worker. 21 * If a regular worker is used instead, it'll just execute the callback immediately. 22 * 23 * @template {MessagePort | Worker | MessengerRealm} T 24 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback 25 * @param {T} [context] Uses `globalThis` by default. 26 */ 27export function ostiary( 28 callback, 29 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)), 30) { 31 if (/** @type {any} */ (context).onmessage === null) { 32 return callback(context, true, crypto.randomUUID()); 33 } 34 35 const c = /** @type {any} */ (context); 36 c.__id ??= crypto.randomUUID(); 37 38 context.addEventListener( 39 "connect", 40 /** 41 * @param {any} event 42 */ 43 (event) => { 44 /** @type {MessagePort} */ 45 const port = event.ports[0]; 46 port.start(); 47 48 // Initiate setup 49 callback(port, !(c.__initiated ?? false), c.__id); 50 c.__initiated = true; 51 }, 52 ); 53} 54 55/** 56 * @param {Worker | SharedWorker} worker 57 */ 58export function workerLink(worker) { 59 if (worker instanceof SharedWorker) { 60 worker.port.start(); 61 return worker.port; 62 } else { 63 return worker; 64 } 65} 66 67/** 68 * @template {Record<string, (...args: any[]) => any>} Actions 69 * @param {() => MessagePort | Worker} workerLinkCreator 70 * @returns {ProxiedActions<Actions>} 71 */ 72export function workerProxy(workerLinkCreator) { 73 /** @type {ProxiedActions<Actions> | undefined} */ 74 let int_api; 75 76 /** @returns {ProxiedActions<Actions>} */ 77 function ensureAPI() { 78 if (!int_api) { 79 const io = new BrowserPostMessageIo(workerLinkCreator); 80 81 /** @type {undefined | RPCChannel<{}, ProxiedActions<Actions>>} */ 82 const rpc = new RPCChannel(io, { enableTransfer: true }); 83 84 int_api = rpc.getAPI(); 85 } 86 87 return int_api; 88 } 89 90 // Create proxy that creates RPC API when needed 91 const proxy = new Proxy(() => {}, { 92 get: (_target, prop) => { 93 /** @param {Parameters<Actions[any]>} args */ 94 return (...args) => { 95 const api = ensureAPI(); 96 return api[prop.toString()](...args); 97 }; 98 }, 99 }); 100 101 return /** @type {ProxiedActions<Actions>} */ (/** @type {any} */ (proxy)); 102} 103 104/** 105 * @param {MessagePort | Worker | SharedWorker} workerOrLink 106 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks] 107 * @returns {Tunnel} 108 */ 109export function workerTunnel(workerOrLink, hooks = {}) { 110 const link = workerOrLink instanceof SharedWorker 111 ? workerLink(workerOrLink) 112 : workerOrLink; 113 const channel = new MessageChannel(); 114 115 channel.port1.addEventListener("message", async (event) => { 116 // Send to worker 117 const { data, transfer } = await hooks?.toWorker?.(event.data) ?? 118 { data: event.data }; 119 link.postMessage(data, { transfer }); 120 }); 121 122 /** 123 * @param {Event} event 124 */ 125 const workerListener = async (event) => { 126 // Receive from worker 127 const msgEvent = /** @type {MessageEvent} */ (event); 128 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ?? 129 { data: msgEvent.data }; 130 channel.port1.postMessage(data, { transfer }); 131 }; 132 133 link.addEventListener("message", workerListener); 134 135 channel.port1.start(); 136 channel.port2.start(); 137 138 return { 139 disconnect: () => { 140 link.removeEventListener("message", workerListener); 141 channel.port1.close(); 142 channel.port2.close(); 143 }, 144 port: channel.port2, 145 }; 146} 147 148//////////////////////////////////////////// 149// RAW 150//////////////////////////////////////////// 151 152/** 153 * @template T 154 * @param {string} name 155 * @param {T} args 156 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default. 157 */ 158export function announce( 159 name, 160 args, 161 context, 162) { 163 const a = announcement(name, args); 164 const transferables = getTransferables(a); 165 (context ?? globalThis).postMessage(a, { transfer: transferables }); 166} 167 168/** 169 * @template T 170 * @param {string} name 171 * @param {(args: T) => void} fn 172 * @param {MessagePort | Worker | MessengerRealm} [context] 173 */ 174export function listen( 175 name, 176 fn, 177 context = /** @type {MessengerRealm} */ (globalThis), 178) { 179 const c = /** @type {any} */ (context); 180 181 if (!c.__incoming) { 182 context.addEventListener("message", incomingAnnouncementsHandler(context)); 183 c.__incoming = {}; 184 } 185 186 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true }); 187} 188 189//////////////////////////////////////////// 190// RPC 191//////////////////////////////////////////// 192 193/** 194 * @template {Record<string, (...args: any[]) => any>} Actions 195 * @param {MessagePort | Worker | MessengerRealm} context 196 * @param {Actions} actions 197 */ 198export function rpc(context, actions) { 199 const io = new BrowserPostMessageIo(() => context); 200 201 /** @type {undefined | RPCChannel<Actions, {}>} */ 202 return new RPCChannel(io, { enableTransfer: true, expose: actions }); 203} 204 205//////////////////////////////////////////// 206// ⛔️ 207//////////////////////////////////////////// 208 209const ANNOUNCEMENT = "announcement"; 210 211/** 212 * @template T 213 * @param {string} name 214 * @param {T} args 215 * @returns {Announcement<T>} 216 */ 217function announcement(name, args) { 218 return { 219 ns: ANNOUNCEMENT, 220 name, 221 key: xxh32(crypto.randomUUID()), 222 223 type: ANNOUNCEMENT, 224 args, 225 }; 226} 227 228/** 229 * @param {MessagePort | Worker | MessengerRealm} context 230 */ 231function incomingAnnouncementsHandler(context) { 232 /** @param {any} event */ 233 return (event) => { 234 const { ns, type } = event.data; 235 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return; 236 const announcement = /** @type {Announcement<any>} */ (event.data); 237 const c = /** @type {any} */ (context); 238 c.__incoming[announcement.name]?.(announcement.args); 239 }; 240}