1import { RPCChannel } from "@kunkun/kkrpc";
2import { getTransferables } from "@okikio/transferables";
3import { debounceMicrotask } from "@vicary/debounce-microtask";
4import { xxh32 } from "xxh32";
5
6import { BrowserPostMessageIo } from "./worker/rpc.js";
7
8export { getTransferables } from "@okikio/transferables";
9export { transfer } from "@kunkun/kkrpc";
10
11/**
12 * @import {Announcement, Dependencies, MessengerRealm, ProxiedActions, Tunnel} from "./worker.d.ts"
13 */
14
15////////////////////////////////////////////
16// MISC
17////////////////////////////////////////////
18
19/**
20 * Manage incoming connections for a shared worker.
21 * If a regular worker is used instead, it'll just execute the callback immediately.
22 *
23 * @template {MessagePort | Worker | MessengerRealm} T
24 * @param {(context: MessagePort | T, firstConnection: boolean, connectionId: string) => void} callback
25 * @param {T} [context] Uses `globalThis` by default.
26 */
27export function ostiary(
28 callback,
29 context = /** @type {T} */ (/** @type {unknown} */ (globalThis)),
30) {
31 if (/** @type {any} */ (context).onmessage === null) {
32 return callback(context, true, crypto.randomUUID());
33 }
34
35 const c = /** @type {any} */ (context);
36 c.__id ??= crypto.randomUUID();
37
38 context.addEventListener(
39 "connect",
40 /**
41 * @param {any} event
42 */
43 (event) => {
44 /** @type {MessagePort} */
45 const port = event.ports[0];
46 port.start();
47
48 // Initiate setup
49 callback(port, !(c.__initiated ?? false), c.__id);
50 c.__initiated = true;
51 },
52 );
53}
54
55/**
56 * @param {Worker | SharedWorker} worker
57 */
58export function workerLink(worker) {
59 if (worker instanceof SharedWorker) {
60 worker.port.start();
61 return worker.port;
62 } else {
63 return worker;
64 }
65}
66
67/**
68 * @template {Record<string, (...args: any[]) => any>} Actions
69 * @param {() => MessagePort | Worker} workerLinkCreator
70 * @returns {ProxiedActions<Actions>}
71 */
72export function workerProxy(workerLinkCreator) {
73 /** @type {ProxiedActions<Actions> | undefined} */
74 let int_api;
75
76 /** @returns {ProxiedActions<Actions>} */
77 function ensureAPI() {
78 if (!int_api) {
79 const io = new BrowserPostMessageIo(workerLinkCreator);
80
81 /** @type {undefined | RPCChannel<{}, ProxiedActions<Actions>>} */
82 const rpc = new RPCChannel(io, { enableTransfer: true });
83
84 int_api = rpc.getAPI();
85 }
86
87 return int_api;
88 }
89
90 // Create proxy that creates RPC API when needed
91 const proxy = new Proxy(() => {}, {
92 get: (_target, prop) => {
93 /** @param {Parameters<Actions[any]>} args */
94 return (...args) => {
95 const api = ensureAPI();
96 return api[prop.toString()](...args);
97 };
98 },
99 });
100
101 return /** @type {ProxiedActions<Actions>} */ (/** @type {any} */ (proxy));
102}
103
104/**
105 * @param {MessagePort | Worker | SharedWorker} workerOrLink
106 * @param {{ fromWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }>; toWorker?: (message: any) => Promise<{ data: any, transfer?: Transferable[] }> }} [hooks]
107 * @returns {Tunnel}
108 */
109export function workerTunnel(workerOrLink, hooks = {}) {
110 const link = workerOrLink instanceof SharedWorker
111 ? workerLink(workerOrLink)
112 : workerOrLink;
113 const channel = new MessageChannel();
114
115 channel.port1.addEventListener("message", async (event) => {
116 // Send to worker
117 const { data, transfer } = await hooks?.toWorker?.(event.data) ??
118 { data: event.data };
119 link.postMessage(data, { transfer });
120 });
121
122 /**
123 * @param {Event} event
124 */
125 const workerListener = async (event) => {
126 // Receive from worker
127 const msgEvent = /** @type {MessageEvent} */ (event);
128 const { data, transfer } = await hooks?.fromWorker?.(msgEvent.data) ??
129 { data: msgEvent.data };
130 channel.port1.postMessage(data, { transfer });
131 };
132
133 link.addEventListener("message", workerListener);
134
135 channel.port1.start();
136 channel.port2.start();
137
138 return {
139 disconnect: () => {
140 link.removeEventListener("message", workerListener);
141 channel.port1.close();
142 channel.port2.close();
143 },
144 port: channel.port2,
145 };
146}
147
148////////////////////////////////////////////
149// RAW
150////////////////////////////////////////////
151
152/**
153 * @template T
154 * @param {string} name
155 * @param {T} args
156 * @param {MessagePort | Worker | MessengerRealm} [context] Uses `globalThis` by default.
157 */
158export function announce(
159 name,
160 args,
161 context,
162) {
163 const a = announcement(name, args);
164 const transferables = getTransferables(a);
165 (context ?? globalThis).postMessage(a, { transfer: transferables });
166}
167
168/**
169 * @template T
170 * @param {string} name
171 * @param {(args: T) => void} fn
172 * @param {MessagePort | Worker | MessengerRealm} [context]
173 */
174export function listen(
175 name,
176 fn,
177 context = /** @type {MessengerRealm} */ (globalThis),
178) {
179 const c = /** @type {any} */ (context);
180
181 if (!c.__incoming) {
182 context.addEventListener("message", incomingAnnouncementsHandler(context));
183 c.__incoming = {};
184 }
185
186 c.__incoming[name] = debounceMicrotask(fn, { updateArguments: true });
187}
188
189////////////////////////////////////////////
190// RPC
191////////////////////////////////////////////
192
193/**
194 * @template {Record<string, (...args: any[]) => any>} Actions
195 * @param {MessagePort | Worker | MessengerRealm} context
196 * @param {Actions} actions
197 */
198export function rpc(context, actions) {
199 const io = new BrowserPostMessageIo(() => context);
200
201 /** @type {undefined | RPCChannel<Actions, {}>} */
202 return new RPCChannel(io, { enableTransfer: true, expose: actions });
203}
204
205////////////////////////////////////////////
206// ⛔️
207////////////////////////////////////////////
208
209const ANNOUNCEMENT = "announcement";
210
211/**
212 * @template T
213 * @param {string} name
214 * @param {T} args
215 * @returns {Announcement<T>}
216 */
217function announcement(name, args) {
218 return {
219 ns: ANNOUNCEMENT,
220 name,
221 key: xxh32(crypto.randomUUID()),
222
223 type: ANNOUNCEMENT,
224 args,
225 };
226}
227
228/**
229 * @param {MessagePort | Worker | MessengerRealm} context
230 */
231function incomingAnnouncementsHandler(context) {
232 /** @param {any} event */
233 return (event) => {
234 const { ns, type } = event.data;
235 if (ns !== ANNOUNCEMENT || type !== ANNOUNCEMENT) return;
236 const announcement = /** @type {Announcement<any>} */ (event.data);
237 const c = /** @type {any} */ (context);
238 c.__incoming[announcement.name]?.(announcement.args);
239 };
240}