moroutine#
Offload functions to worker threads with shared memory primitives for Node.js.
Installation#
npm install moroutine
Requires Node.js v24+.
Quick Start#
// main.ts
import { isPrime } from './is-prime.ts';
const result = await isPrime(999_999_937);
console.log(result); // true
// is-prime.ts
import { mo } from 'moroutine';
export const isPrime = mo(import.meta, (n: number): boolean => {
if (n < 2) return false;
for (let i = 2; i * i <= n; i++) {
if (n % i === 0) return false;
}
return true;
});
Define a function with mo() in its own module, then import and run it on a worker pool. Moroutine modules must be side-effect free — workers import them to find the registered functions.
Core API#
mo(import.meta, fn)#
Wraps a function so it runs on a worker thread. The function must be defined at module scope (not dynamically).
// math.ts
import { mo } from 'moroutine';
export const add = mo(import.meta, (a: number, b: number): number => {
return a + b;
});
workers(size)#
Creates a pool of worker threads. Returns a Runner that dispatches tasks with round-robin scheduling. Disposable via using or [Symbol.dispose](). Defaults to os.availableParallelism() workers when size is omitted.
import { workers } from 'moroutine';
import { add } from './math.ts';
{
using run = workers(2);
const result = await run(add(3, 4)); // single task
const [a, b] = await run([add(1, 2), add(3, 4)]); // batch
}
Dedicated Workers#
Awaiting a task directly (without a pool) runs it on a dedicated worker thread, one per moroutine function.
const result = await add(3, 4); // runs on a dedicated worker for `add`
Task-Args#
Pass a task as an argument to another task. The result is resolved on the worker and cached, so it never crosses back to the main thread. This is useful for non-transferable context like a database connection.
// db.ts
import { DatabaseSync } from 'node:sqlite';
import { mo } from 'moroutine';
export const openDb = mo(import.meta, (filename: string): DatabaseSync => {
return new DatabaseSync(filename);
});
export const exec = mo(import.meta, (db: DatabaseSync, sql: string): void => {
db.exec(sql);
});
export const query = mo(import.meta, (db: DatabaseSync, sql: string): unknown[] => {
return db.prepare(sql).all();
});
import { workers } from 'moroutine';
import { openDb, exec, query } from './db.ts';
const db = openDb(':memory:');
{
using run = workers(1);
await run(exec(db, `CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`));
await run(exec(db, `INSERT INTO users (name) VALUES ('Alice')`));
const rows = await run(query(db, 'SELECT * FROM users')); // [{ id: 1, name: 'Alice' }]
}
openDb() returns a Task<DatabaseSync>, and exec()/query() accept it in place of DatabaseSync. The database is opened once on the worker and reused for every subsequent call — the main thread never touches it.
Shared Memory#
Descriptors and shared()#
Shared-memory types are created with descriptor functions or the shared() allocator.
import { shared, int32, bool, mutex, string, bytes } from 'moroutine';
Primitives#
const counter = int32(); // standalone Int32
const flag = bool(); // standalone Bool
const big = int64(); // standalone Int64 (bigint)
Atomics#
Atomic variants use Atomics.* for thread-safe operations without a lock.
const counter = int32atomic();
counter.add(1); // atomic increment, returns previous value
counter.load(); // atomic read
Full atomic operations: load, store, add, sub, and, or, xor, exchange, compareExchange.
Structs#
Plain objects in shared() create structs backed by a single SharedArrayBuffer.
const point = shared({ x: int32, y: int32 });
point.load(); // { x: 0, y: 0 }
point.store({ x: 10, y: 20 });
point.fields.x.store(10); // direct field access
Structs nest:
const rect = shared({
pos: { x: int32, y: int32 },
size: { w: int32, h: int32 },
});
Tuples#
Arrays in shared() create fixed-length tuples.
const pair = shared([int32, bool]);
pair.load(); // [0, false]
pair.store([42, true]);
pair.elements[0].store(99);
Bytes and Strings#
const buf = bytes(32); // fixed 32-byte buffer
buf.store(new Uint8Array(32)); // exact length required
buf.load(); // Readonly<Uint8Array> view
buf.view[0] = 0xff; // direct mutable access
const name = string(64); // UTF-8, max 64 bytes
name.store('hello');
name.load(); // 'hello'
Value Shorthand#
Primitive values in schemas infer their type.
shared(0); // Int32 initialized to 0
shared(true); // Bool initialized to true
shared(0n); // Int64 initialized to 0n
shared({ x: 10, y: 20 }); // struct with Int32 fields
Locks#
Mutex#
const mu = mutex();
using guard = await mu.lock();
// exclusive access
// auto-unlocks when guard is disposed
// or manually:
await mu.lock();
mu.unlock();
RwLock#
const rw = rwlock();
using guard = await rw.readLock(); // multiple readers OK
using guard = await rw.writeLock(); // exclusive access
Using with Workers#
Shared-memory types pass through postMessage automatically. They're reconstructed on the worker side with the same shared backing memory.
// update-position.ts
import { mo } from 'moroutine';
import type { Mutex, SharedStruct, Int32 } from 'moroutine';
type Position = SharedStruct<{ x: Int32; y: Int32 }>;
export const updatePosition = mo(
import.meta,
async (mu: Mutex, pos: Position, dx: number, dy: number): Promise<void> => {
using guard = await mu.lock();
const current = pos.load();
pos.store({ x: current.x + dx, y: current.y + dy });
},
);
// main.ts
import { workers, shared, int32, mutex } from 'moroutine';
import { updatePosition } from './update-position.ts';
const mu = mutex();
const pos = shared({ x: int32, y: int32 });
{
using run = workers(4);
await run([updatePosition(mu, pos, 1, 0), updatePosition(mu, pos, 0, 1)]);
}
console.log(pos.load()); // { x: 1, y: 1 }
Streaming#
Streaming Moroutines#
Wrap an async function* with mo() to create a streaming moroutine. Values are streamed between threads via MessageChannel with pause/resume backpressure.
// count.ts
import { mo } from 'moroutine';
export const countUp = mo(import.meta, async function* (n: number) {
for (let i = 0; i < n; i++) {
yield i;
}
});
Iterate directly (dedicated worker) or dispatch via a pool:
import { workers } from 'moroutine';
import { countUp } from './count.ts';
// Dedicated worker
for await (const n of countUp(5)) {
console.log(n); // 0, 1, 2, 3, 4
}
// Worker pool
{
using run = workers(2);
for await (const n of run(countUp(5))) {
console.log(n); // 0, 1, 2, 3, 4
}
}
channel() and Fan-out#
When you pass the same AsyncIterable or StreamTask argument to multiple tasks, each task gets its own copy of the data. Use channel() to share a single source across multiple workers — each item goes to exactly one consumer (work stealing).
import { workers, channel, mo } from 'moroutine';
const generate = mo(import.meta, async function* (n: number) {
for (let i = 0; i < n; i++) yield i;
});
const process = mo(import.meta, async (input: AsyncIterable<number>): Promise<number[]> => {
const items: number[] = [];
for await (const n of input) items.push(n);
return items;
});
const ch = channel(generate(100));
{
using run = workers(4);
const [a, b, c, d] = await run([process(ch), process(ch), process(ch), process(ch)]);
// Items distributed across workers — no duplicates, no gaps
}
Without channel(), AsyncIterable and StreamTask arguments are auto-detected and streamed to a single consumer. channel() is only needed for fan-out.
Pipelines#
Chain streaming moroutines by passing one as an argument to the next. Each stage runs on its own dedicated worker.
const doubled = double(generate(5));
const squared = square(doubled);
for await (const n of squared) {
console.log(n);
}
Transfers#
Use transfer() for zero-copy movement of ArrayBuffer, TypedArray, MessagePort, or streams.
import { transfer } from 'moroutine';
const buf = new ArrayBuffer(1024);
await run(processData(transfer(buf)));
// buf is now detached (zero-length) — ownership moved to worker
Return values from workers are auto-transferred when possible.
Examples#
All examples require Node v24+ and can be run directly, e.g. node examples/primes/main.ts.
examples/primes-- CPU-bound prime checking on a dedicated workerexamples/non-blocking-- main thread stays responsive during heavy computationexamples/parallel-batch-- sequential vs parallel batch processingexamples/atomics-- shared atomic counter across workersexamples/shared-state-- mutex-protected shared structexamples/multi-module-- moroutines from multiple modules on one workerexamples/transfer-- zero-copy buffer transfer to and from a workerexamples/sqlite-- shared SQLite database on a worker via task-arg cachingexamples/pipeline-- streaming pipeline across dedicated workersexamples/channel-fanout-- fan-out a channel to multiple workers via work stealingexamples/benchmark-- roundtrip channel throughput with 1–N workers