Offload functions to worker threads with shared memory primitives for Node.js.

feat: auto-detect AsyncGenerator and StreamTask args

Adds auto-detection in prepareArg so AsyncGenerator and StreamTask
values are automatically piped via MessageChannel without needing
a channel() wrapper.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+82
+22
src/execute.ts
··· 70 70 })(); 71 71 } 72 72 73 + function isAsyncGenerator(arg: unknown): boolean { 74 + return typeof arg === 'object' && arg !== null && 75 + (arg as any)[Symbol.toStringTag] === 'AsyncGenerator'; 76 + } 77 + 73 78 function prepareArg(arg: unknown): unknown { 79 + // Auto-detect AsyncGenerator args — pipe via MessageChannel 80 + if (isAsyncGenerator(arg)) { 81 + const { port1, port2 } = new MessageChannel(); 82 + port1.unref(); 83 + pipeToPort(arg as AsyncIterable<unknown>, port1, DEFAULT_HIGH_WATER); 84 + streamPortStack[streamPortStack.length - 1].push(port2); 85 + return port2; 86 + } 87 + // Auto-detect StreamTask args — dispatch to dedicated worker, pipe output 88 + if (arg instanceof StreamTask) { 89 + const iterable = runStreamOnDedicated(arg.id, arg.args); 90 + const { port1, port2 } = new MessageChannel(); 91 + port1.unref(); 92 + pipeToPort(iterable, port1, DEFAULT_HIGH_WATER); 93 + streamPortStack[streamPortStack.length - 1].push(port2); 94 + return port2; 95 + } 74 96 if (typeof arg === 'object' && arg !== null && CHANNEL in arg) { 75 97 const data = (arg as any)[CHANNEL]; 76 98 let iterable = data.iterable;
+41
test/channel-autodetect.test.ts
··· 1 + import { describe, it } from 'node:test'; 2 + import assert from 'node:assert/strict'; 3 + import { workers } from 'moroutine'; 4 + import { sumStream, uppercaseStream, generate } from './fixtures/channel-autodetect.ts'; 5 + 6 + describe('AsyncGenerator auto-detection', () => { 7 + it('auto-detects AsyncGenerator arg without channel()', async () => { 8 + async function* numbers() { yield 1; yield 2; yield 3; } 9 + const run = workers(1); 10 + try { 11 + const result = await run(sumStream(numbers())); 12 + assert.equal(result, 6); 13 + } finally { 14 + run[Symbol.dispose](); 15 + } 16 + }); 17 + 18 + it('auto-detects with streaming output (bidirectional)', async () => { 19 + async function* words() { yield 'hello'; yield 'world'; } 20 + const run = workers(1); 21 + try { 22 + const results: string[] = []; 23 + for await (const word of run(uppercaseStream(words()))) { 24 + results.push(word); 25 + } 26 + assert.deepEqual(results, ['HELLO', 'WORLD']); 27 + } finally { 28 + run[Symbol.dispose](); 29 + } 30 + }); 31 + 32 + it('StreamTask arg is auto-detected without channel()', async () => { 33 + const run = workers(1); 34 + try { 35 + const result = await run(sumStream(generate(5))); 36 + assert.equal(result, 10); 37 + } finally { 38 + run[Symbol.dispose](); 39 + } 40 + }); 41 + });
+19
test/fixtures/channel-autodetect.ts
··· 1 + import { mo } from 'moroutine'; 2 + 3 + export const sumStream = mo(import.meta, async (input: AsyncIterable<number>): Promise<number> => { 4 + let sum = 0; 5 + for await (const n of input) { 6 + sum += n; 7 + } 8 + return sum; 9 + }); 10 + 11 + export const uppercaseStream = mo(import.meta, async function* (input: AsyncIterable<string>) { 12 + for await (const s of input) { 13 + yield s.toUpperCase(); 14 + } 15 + }); 16 + 17 + export const generate = mo(import.meta, async function* (n: number) { 18 + for (let i = 0; i < n; i++) yield i; 19 + });