Experiment to rebuild Diffuse using web applets.
at main 6.8 kB view raw
1import * as Uint8 from "uint8arrays"; 2import * as Comlink from "comlink"; 3import { xxh32 } from "xxh32"; 4import { getTransferables } from "@okikio/transferables"; 5 6import type { Track } from "@applets/core/types"; 7import type { DiffuseApplet } from "./applet/common"; 8 9// export { SharedWorkerPolyfill as SharedWorker } from "@okikio/sharedworker"; 10export const SharedWorker = globalThis.SharedWorker; 11 12//////////////////////////////////////////// 13// 🌳 14//////////////////////////////////////////// 15 16export type WorkerTasks = { 17 _listen: ReturnType<typeof _listen>; 18 _manage: ReturnType<typeof _manage>; 19}; 20 21//////////////////////////////////////////// 22// 🛠️ 23//////////////////////////////////////////// 24 25export function arrayShuffle<T>(array: Array<T>): Array<T> { 26 if (array.length === 0) { 27 return []; 28 } 29 30 array = [...array]; 31 32 for (let index = array.length - 1; index > 0; index--) { 33 const randArr = crypto.getRandomValues(new Uint32Array(1)); 34 const randVal = randArr[0] / 2 ** 32; 35 const newIndex = Math.floor(randVal * (index + 1)); 36 [array[index], array[newIndex]] = [array[newIndex], array[index]]; 37 } 38 39 return array; 40} 41 42export function cleanUndefinedValuesForTracks(tracks: Track[]): Track[] { 43 return tracks.map((track) => { 44 const t = { ...track }; 45 46 if (t.tags) { 47 if ("album" in t.tags && t.tags.album === undefined) delete t.tags.album; 48 if ("artist" in t.tags && t.tags.artist === undefined) delete t.tags.artist; 49 if ("genre" in t.tags && t.tags.genre === undefined) delete t.tags.genre; 50 if ("year" in t.tags && t.tags.year === undefined) delete t.tags.year; 51 52 if ("of" in t.tags.disc && t.tags.disc.of === undefined) delete t.tags.disc.of; 53 if ("of" in t.tags.track && t.tags.track.of === undefined) delete t.tags.track.of; 54 } 55 56 return t; 57 }); 58} 59 60export function comparable(value: unknown) { 61 return xxh32(JSON.stringify(value)); 62} 63 64export function endpoint<T extends Record<string, any> = WorkerTasks>(ini: Comlink.Endpoint) { 65 const e = Comlink.wrap<T>(ini); 66 if ("start" in ini && typeof ini.start === "function") ini.start(); 67 return e; 68} 69 70export function expose<A extends Record<string, any>>( 71 tasks: A, 72 opts?: { 73 ports?: { 74 applets: MessagePort[]; 75 consumers: MessagePort[]; 76 }; 77 }, 78): A { 79 if (globalThis.SharedWorkerGlobalScope && self instanceof SharedWorkerGlobalScope) { 80 self.onconnect = (event: MessageEvent) => { 81 const port = event.ports[0]; 82 opts?.ports?.applets?.push(port); 83 Comlink.expose(tasks, port); 84 port.start(); 85 }; 86 87 (self as any).connected = true; 88 } else { 89 Comlink.expose(tasks, self); 90 } 91 92 return tasks; 93} 94 95export function groupTracksPerScheme( 96 tracks: Track[], 97 initial: Record<string, Track[]> = {}, 98): Record<string, Track[]> { 99 const acc: Record<string, Track[]> = initial; 100 101 tracks.forEach((track) => { 102 const scheme = track.uri.substring(0, track.uri.indexOf(":")); 103 acc[scheme] ??= []; 104 acc[scheme].push(track); 105 }); 106 107 return acc; 108} 109 110export function inIframe() { 111 return window.self !== window.top; 112} 113 114export function initialConnections<C extends Record<string, any>>(ids: string[]) { 115 const connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>> = {}; 116 117 ids.forEach((c) => { 118 connections[c] = Promise.withResolvers<Comlink.Remote<C>>(); 119 }); 120 121 return connections; 122} 123 124export function isPrimitive(test: unknown) { 125 return test !== Object(test); 126} 127 128export function jsonDecode<T>(a: any): T { 129 return JSON.parse(new TextDecoder().decode(a)); 130} 131 132export function jsonEncode<T>(a: T): Uint8Array { 133 return new TextEncoder().encode(JSON.stringify(a)); 134} 135 136export function postMessages<D, T>({ 137 data, 138 ports, 139 transfer, 140}: { 141 data: D; 142 ports: MessagePort[]; 143 transfer?: Transferable[]; 144}) { 145 ports.forEach((port) => { 146 port.postMessage(data, transfer ?? []); 147 }); 148} 149 150export function provide< 151 C extends Record<string, any>, 152 A extends Record<string, any>, 153 T extends Record<string, any>, 154>({ 155 actions, 156 connections, 157 tasks, 158}: { 159 actions?: A; 160 connections?: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>; 161 tasks?: T; 162}) { 163 const portsHolder = { 164 applets: [] as MessagePort[], 165 consumers: [] as MessagePort[], 166 }; 167 168 const allTasks = expose<WorkerTasks & T>( 169 { 170 _listen: _listen<A>(actions || ({} as A), portsHolder), 171 _manage: _manage<C>(connections || {}), 172 ...(tasks || ({} as T)), 173 }, 174 { 175 ports: portsHolder, 176 }, 177 ); 178 179 return { 180 connections: connections || ({} as Record<string, PromiseWithResolvers<Comlink.Remote<C>>>), 181 ports: portsHolder, 182 tasks: allTasks, 183 }; 184} 185 186export function sync<DataType = unknown>( 187 context: DiffuseApplet<DataType>, 188 port: MessagePort | Worker, 189 options: { groupId?: string } = {}, 190) { 191 port.onmessage = (event) => { 192 if ( 193 event.data?.type === "data" && 194 (options.groupId ? event.data?.groupId === options.groupId : true) 195 ) { 196 context.data = event.data.data; 197 } 198 }; 199} 200 201export async function trackArtworkCacheId(track: Track): Promise<string> { 202 return await crypto.subtle 203 .digest("SHA-256", new TextEncoder().encode(track.uri)) 204 .then((a) => Uint8.toString(new Uint8Array(a), "base64url")); 205} 206 207export function transfer<T = unknown>(a: T) { 208 const b = getTransferables(a); 209 return Comlink.transfer(a, b); 210} 211 212// PRIVATE 213 214function _listen<A extends Record<string, any>>( 215 actions: A, 216 portsHolder: { 217 applets: MessagePort[]; 218 consumers: MessagePort[]; 219 }, 220) { 221 async function handleAction( 222 port: MessagePort, 223 action: { 224 type: "action"; 225 id: string; 226 actionId: string; 227 arguments: any; 228 }, 229 ) { 230 const result = await actions[action.actionId]?.(action.arguments); 231 return postMessage(port, action.id, result); 232 } 233 234 function postMessage<T>(port: MessagePort, id: string, result: T) { 235 port.postMessage( 236 { 237 type: "actioncomplete", 238 id, 239 result, 240 }, 241 { 242 transfer: getTransferables(result), 243 }, 244 ); 245 } 246 247 return (port: MessagePort) => { 248 Comlink.expose(actions, port); 249 portsHolder.consumers.push(port); 250 251 port.onmessage = async (message) => { 252 switch (message.data?.type) { 253 case "action": 254 return handleAction(port, message.data); 255 } 256 }; 257 }; 258} 259 260function _manage<C extends Record<string, any>>( 261 connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>, 262) { 263 return (connectionId: string, workerPort: MessagePort) => { 264 let conn = connections[connectionId]; 265 const remote = endpoint<C>(workerPort); 266 267 if (!conn) { 268 connections[connectionId] = Promise.withResolvers<Comlink.Remote<C>>(); 269 conn = connections[connectionId]; 270 } 271 272 conn.resolve(remote); 273 }; 274}