⚡ Zero-dependency plcbundle library exclusively for Bun
1/// <reference lib="webworker" />
2
3import { search as jmespathSearch } from '@jmespath-community/jmespath';
4
5export interface WorkerTask {
6 dir: string;
7 start: number;
8 end: number;
9 modulePath?: string;
10 expression?: string;
11 useSimple?: boolean;
12 silent?: boolean;
13 flush?: boolean;
14 mode?: 'detect' | 'process' | 'query';
15}
16
17export interface WorkerProgress {
18 type: 'progress';
19 currentBundle: number;
20 totalOps: number;
21 totalBytes: number;
22 matchCount: number;
23}
24
25export interface WorkerMatchBatch {
26 type: 'match-batch';
27 matches: Array<{ result: any }>;
28}
29
30export interface WorkerResult {
31 type: 'result';
32 totalOps: number;
33 totalBytes: number;
34 matchCount: number;
35 data?: any;
36}
37
38self.onmessage = async (event: MessageEvent<WorkerTask>) => {
39 const {
40 dir,
41 start,
42 end,
43 modulePath,
44 expression,
45 useSimple,
46 silent,
47 flush,
48 mode = 'detect'
49 } = event.data;
50
51 if (silent) {
52 globalThis.console = {
53 log: () => {},
54 error: () => {},
55 warn: () => {},
56 info: () => {},
57 debug: () => {},
58 trace: () => {},
59 } as any;
60 }
61
62 let userFn: any;
63 let queryFn: ((op: any) => any) | null = null;
64
65 if (mode === 'query') {
66 // Query mode
67 if (useSimple) {
68 const compiled = compileSimplePath(expression!);
69
70 // Ultra-fast path for single property
71 if (compiled.segments.length === 1 && compiled.segments[0].type === 'property') {
72 const prop = compiled.segments[0].value as string;
73 queryFn = (op) => op[prop];
74 } else {
75 queryFn = (op) => querySimplePath(op, compiled);
76 }
77 } else {
78 queryFn = (op) => jmespathSearch(op, expression!);
79 }
80 } else {
81 // Detect or process mode
82 const mod = await import(modulePath!);
83 userFn = mode === 'detect' ? (mod.detect || mod.default) : (mod.process || mod.default);
84 }
85
86 let totalOps = 0;
87 let totalBytes = 0;
88 let matchCount = 0;
89 const BATCH_SIZE = 1000;
90 let matchBatch: any[] = [];
91
92 const flushBatch = () => {
93 if (matchBatch.length > 0) {
94 self.postMessage({ type: 'match-batch', matches: matchBatch } as WorkerMatchBatch);
95 matchBatch = [];
96 }
97 };
98
99 for (let bundleNum = start; bundleNum <= end; bundleNum++) {
100 const bundlePath = `${dir}${bundleNum.toString().padStart(6, '0')}.jsonl.zst`;
101
102 try {
103 const compressed = await Bun.file(bundlePath).arrayBuffer();
104 const decompressed = Bun.zstdDecompressSync(compressed);
105 const text = new TextDecoder().decode(decompressed);
106 const lines = text.split('\n');
107
108 for (let position = 0; position < lines.length; position++) {
109 const line = lines[position];
110 if (!line.trim()) continue;
111
112 totalOps++;
113 totalBytes += line.length;
114 const op = JSON.parse(line);
115
116 if (mode === 'query') {
117 try {
118 const result = queryFn!(op);
119
120 if (result !== null && result !== undefined) {
121 matchCount++;
122 matchBatch.push({ result });
123 if (matchBatch.length >= BATCH_SIZE) flushBatch();
124 }
125 } catch (error) {}
126 } else if (mode === 'detect') {
127 const labels = userFn({ op });
128 if (labels && labels.length > 0) {
129 matchCount++;
130 matchBatch.push({ result: { bundle: bundleNum, position, cid: op.cid.slice(-4), labels } });
131 if (matchBatch.length >= BATCH_SIZE) flushBatch();
132 }
133 } else {
134 // process mode
135 userFn({ op, position, bundle: bundleNum, line });
136 }
137
138 if (totalOps % 10000 === 0) {
139 self.postMessage({ type: 'progress', currentBundle: bundleNum, totalOps, totalBytes, matchCount } as WorkerProgress);
140 }
141 }
142
143 self.postMessage({ type: 'progress', currentBundle: bundleNum, totalOps, totalBytes, matchCount } as WorkerProgress);
144 } catch (error) {}
145 }
146
147 flushBatch();
148
149 let prepareResult: any;
150 if (mode !== 'query' && modulePath) {
151 const mod = await import(modulePath);
152 if (typeof mod.prepare === 'function') {
153 try {
154 prepareResult = await mod.prepare();
155 } catch (error) {}
156 }
157 }
158
159 self.postMessage({ type: 'result', totalOps, totalBytes, matchCount, data: prepareResult || null } as WorkerResult);
160};
161
162interface SimplePath {
163 segments: Array<{ type: 'property' | 'index'; value: string | number }>;
164}
165
166function compileSimplePath(expression: string): SimplePath {
167 const segments: Array<{ type: 'property' | 'index'; value: string | number }> = [];
168 let current = '';
169 let i = 0;
170
171 while (i < expression.length) {
172 const char = expression[i];
173 if (char === '.') {
174 if (current) segments.push({ type: 'property', value: current });
175 current = '';
176 i++;
177 } else if (char === '[') {
178 if (current) segments.push({ type: 'property', value: current });
179 current = '';
180 i++;
181 let index = '';
182 while (i < expression.length && expression[i] !== ']') {
183 index += expression[i];
184 i++;
185 }
186 segments.push({ type: 'index', value: parseInt(index) });
187 i++;
188 } else {
189 current += char;
190 i++;
191 }
192 }
193 if (current) segments.push({ type: 'property', value: current });
194 return { segments };
195}
196
197function querySimplePath(obj: any, compiled: SimplePath): any {
198 let current = obj;
199 for (const segment of compiled.segments) {
200 if (current == null) return null;
201 if (segment.type === 'property') {
202 current = current[segment.value];
203 } else {
204 if (Array.isArray(current)) {
205 current = current[segment.value as number];
206 } else {
207 return null;
208 }
209 }
210 }
211 return current;
212}