Experiment to rebuild Diffuse using web applets.
1import * as Uint8 from "uint8arrays";
2import * as Comlink from "comlink";
3import { xxh32 } from "xxh32";
4import { getTransferables } from "@okikio/transferables";
5
6import type { Track } from "@applets/core/types";
7import type { DiffuseApplet } from "./applet/common";
8
9// export { SharedWorkerPolyfill as SharedWorker } from "@okikio/sharedworker";
10export const SharedWorker = globalThis.SharedWorker;
11
12////////////////////////////////////////////
13// 🌳
14////////////////////////////////////////////
15
16export type WorkerTasks = {
17 _listen: ReturnType<typeof _listen>;
18 _manage: ReturnType<typeof _manage>;
19};
20
21////////////////////////////////////////////
22// 🛠️
23////////////////////////////////////////////
24
25export function arrayShuffle<T>(array: Array<T>): Array<T> {
26 if (array.length === 0) {
27 return [];
28 }
29
30 array = [...array];
31
32 for (let index = array.length - 1; index > 0; index--) {
33 const randArr = crypto.getRandomValues(new Uint32Array(1));
34 const randVal = randArr[0] / 2 ** 32;
35 const newIndex = Math.floor(randVal * (index + 1));
36 [array[index], array[newIndex]] = [array[newIndex], array[index]];
37 }
38
39 return array;
40}
41
42export function cleanUndefinedValuesForTracks(tracks: Track[]): Track[] {
43 return tracks.map((track) => {
44 const t = { ...track };
45
46 if (t.tags) {
47 if ("album" in t.tags && t.tags.album === undefined) delete t.tags.album;
48 if ("artist" in t.tags && t.tags.artist === undefined) delete t.tags.artist;
49 if ("genre" in t.tags && t.tags.genre === undefined) delete t.tags.genre;
50 if ("year" in t.tags && t.tags.year === undefined) delete t.tags.year;
51
52 if ("of" in t.tags.disc && t.tags.disc.of === undefined) delete t.tags.disc.of;
53 if ("of" in t.tags.track && t.tags.track.of === undefined) delete t.tags.track.of;
54 }
55
56 return t;
57 });
58}
59
60export function comparable(value: unknown) {
61 return xxh32(JSON.stringify(value));
62}
63
64export function endpoint<T extends Record<string, any> = WorkerTasks>(ini: Comlink.Endpoint) {
65 const e = Comlink.wrap<T>(ini);
66 if ("start" in ini && typeof ini.start === "function") ini.start();
67 return e;
68}
69
70export function expose<A extends Record<string, any>>(
71 tasks: A,
72 opts?: {
73 ports?: {
74 applets: MessagePort[];
75 consumers: MessagePort[];
76 };
77 },
78): A {
79 if (globalThis.SharedWorkerGlobalScope && self instanceof SharedWorkerGlobalScope) {
80 self.onconnect = (event: MessageEvent) => {
81 const port = event.ports[0];
82 opts?.ports?.applets?.push(port);
83 Comlink.expose(tasks, port);
84 port.start();
85 };
86
87 (self as any).connected = true;
88 } else {
89 Comlink.expose(tasks, self);
90 }
91
92 return tasks;
93}
94
95export function groupTracksPerScheme(
96 tracks: Track[],
97 initial: Record<string, Track[]> = {},
98): Record<string, Track[]> {
99 const acc: Record<string, Track[]> = initial;
100
101 tracks.forEach((track) => {
102 const scheme = track.uri.substring(0, track.uri.indexOf(":"));
103 acc[scheme] ??= [];
104 acc[scheme].push(track);
105 });
106
107 return acc;
108}
109
110export function inIframe() {
111 return window.self !== window.top;
112}
113
114export function initialConnections<C extends Record<string, any>>(ids: string[]) {
115 const connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>> = {};
116
117 ids.forEach((c) => {
118 connections[c] = Promise.withResolvers<Comlink.Remote<C>>();
119 });
120
121 return connections;
122}
123
124export function isPrimitive(test: unknown) {
125 return test !== Object(test);
126}
127
128export function jsonDecode<T>(a: any): T {
129 return JSON.parse(new TextDecoder().decode(a));
130}
131
132export function jsonEncode<T>(a: T): Uint8Array {
133 return new TextEncoder().encode(JSON.stringify(a));
134}
135
136export function postMessages<D, T>({
137 data,
138 ports,
139 transfer,
140}: {
141 data: D;
142 ports: MessagePort[];
143 transfer?: Transferable[];
144}) {
145 ports.forEach((port) => {
146 port.postMessage(data, transfer ?? []);
147 });
148}
149
150export function provide<
151 C extends Record<string, any>,
152 A extends Record<string, any>,
153 T extends Record<string, any>,
154>({
155 actions,
156 connections,
157 tasks,
158}: {
159 actions?: A;
160 connections?: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>;
161 tasks?: T;
162}) {
163 const portsHolder = {
164 applets: [] as MessagePort[],
165 consumers: [] as MessagePort[],
166 };
167
168 const allTasks = expose<WorkerTasks & T>(
169 {
170 _listen: _listen<A>(actions || ({} as A), portsHolder),
171 _manage: _manage<C>(connections || {}),
172 ...(tasks || ({} as T)),
173 },
174 {
175 ports: portsHolder,
176 },
177 );
178
179 return {
180 connections: connections || ({} as Record<string, PromiseWithResolvers<Comlink.Remote<C>>>),
181 ports: portsHolder,
182 tasks: allTasks,
183 };
184}
185
186export function sync<DataType = unknown>(
187 context: DiffuseApplet<DataType>,
188 port: MessagePort | Worker,
189 options: { groupId?: string } = {},
190) {
191 port.onmessage = (event) => {
192 if (
193 event.data?.type === "data" &&
194 (options.groupId ? event.data?.groupId === options.groupId : true)
195 ) {
196 context.data = event.data.data;
197 }
198 };
199}
200
201export async function trackArtworkCacheId(track: Track): Promise<string> {
202 return await crypto.subtle
203 .digest("SHA-256", new TextEncoder().encode(track.uri))
204 .then((a) => Uint8.toString(new Uint8Array(a), "base64url"));
205}
206
207export function transfer<T = unknown>(a: T) {
208 const b = getTransferables(a);
209 return Comlink.transfer(a, b);
210}
211
212// PRIVATE
213
214function _listen<A extends Record<string, any>>(
215 actions: A,
216 portsHolder: {
217 applets: MessagePort[];
218 consumers: MessagePort[];
219 },
220) {
221 async function handleAction(
222 port: MessagePort,
223 action: {
224 type: "action";
225 id: string;
226 actionId: string;
227 arguments: any;
228 },
229 ) {
230 const result = await actions[action.actionId]?.(action.arguments);
231 return postMessage(port, action.id, result);
232 }
233
234 function postMessage<T>(port: MessagePort, id: string, result: T) {
235 port.postMessage(
236 {
237 type: "actioncomplete",
238 id,
239 result,
240 },
241 {
242 transfer: getTransferables(result),
243 },
244 );
245 }
246
247 return (port: MessagePort) => {
248 Comlink.expose(actions, port);
249 portsHolder.consumers.push(port);
250
251 port.onmessage = async (message) => {
252 switch (message.data?.type) {
253 case "action":
254 return handleAction(port, message.data);
255 }
256 };
257 };
258}
259
260function _manage<C extends Record<string, any>>(
261 connections: Record<string, PromiseWithResolvers<Comlink.Remote<C>>>,
262) {
263 return (connectionId: string, workerPort: MessagePort) => {
264 let conn = connections[connectionId];
265 const remote = endpoint<C>(workerPort);
266
267 if (!conn) {
268 connections[connectionId] = Promise.withResolvers<Comlink.Remote<C>>();
269 conn = connections[connectionId];
270 }
271
272 conn.resolve(remote);
273 };
274}