⚡ Zero-dependency plcbundle library exclusively for Bun
at main 5.8 kB view raw
1/// <reference lib="webworker" /> 2 3import { search as jmespathSearch } from '@jmespath-community/jmespath'; 4 5export interface WorkerTask { 6 dir: string; 7 start: number; 8 end: number; 9 modulePath?: string; 10 expression?: string; 11 useSimple?: boolean; 12 silent?: boolean; 13 flush?: boolean; 14 mode?: 'detect' | 'process' | 'query'; 15} 16 17export interface WorkerProgress { 18 type: 'progress'; 19 currentBundle: number; 20 totalOps: number; 21 totalBytes: number; 22 matchCount: number; 23} 24 25export interface WorkerMatchBatch { 26 type: 'match-batch'; 27 matches: Array<{ result: any }>; 28} 29 30export interface WorkerResult { 31 type: 'result'; 32 totalOps: number; 33 totalBytes: number; 34 matchCount: number; 35 data?: any; 36} 37 38self.onmessage = async (event: MessageEvent<WorkerTask>) => { 39 const { 40 dir, 41 start, 42 end, 43 modulePath, 44 expression, 45 useSimple, 46 silent, 47 flush, 48 mode = 'detect' 49 } = event.data; 50 51 if (silent) { 52 globalThis.console = { 53 log: () => {}, 54 error: () => {}, 55 warn: () => {}, 56 info: () => {}, 57 debug: () => {}, 58 trace: () => {}, 59 } as any; 60 } 61 62 let userFn: any; 63 let queryFn: ((op: any) => any) | null = null; 64 65 if (mode === 'query') { 66 // Query mode 67 if (useSimple) { 68 const compiled = compileSimplePath(expression!); 69 70 // Ultra-fast path for single property 71 if (compiled.segments.length === 1 && compiled.segments[0].type === 'property') { 72 const prop = compiled.segments[0].value as string; 73 queryFn = (op) => op[prop]; 74 } else { 75 queryFn = (op) => querySimplePath(op, compiled); 76 } 77 } else { 78 queryFn = (op) => jmespathSearch(op, expression!); 79 } 80 } else { 81 // Detect or process mode 82 const mod = await import(modulePath!); 83 userFn = mode === 'detect' ? (mod.detect || mod.default) : (mod.process || mod.default); 84 } 85 86 let totalOps = 0; 87 let totalBytes = 0; 88 let matchCount = 0; 89 const BATCH_SIZE = 1000; 90 let matchBatch: any[] = []; 91 92 const flushBatch = () => { 93 if (matchBatch.length > 0) { 94 self.postMessage({ type: 'match-batch', matches: matchBatch } as WorkerMatchBatch); 95 matchBatch = []; 96 } 97 }; 98 99 for (let bundleNum = start; bundleNum <= end; bundleNum++) { 100 const bundlePath = `${dir}${bundleNum.toString().padStart(6, '0')}.jsonl.zst`; 101 102 try { 103 const compressed = await Bun.file(bundlePath).arrayBuffer(); 104 const decompressed = Bun.zstdDecompressSync(compressed); 105 const text = new TextDecoder().decode(decompressed); 106 const lines = text.split('\n'); 107 108 for (let position = 0; position < lines.length; position++) { 109 const line = lines[position]; 110 if (!line.trim()) continue; 111 112 totalOps++; 113 totalBytes += line.length; 114 const op = JSON.parse(line); 115 116 if (mode === 'query') { 117 try { 118 const result = queryFn!(op); 119 120 if (result !== null && result !== undefined) { 121 matchCount++; 122 matchBatch.push({ result }); 123 if (matchBatch.length >= BATCH_SIZE) flushBatch(); 124 } 125 } catch (error) {} 126 } else if (mode === 'detect') { 127 const labels = userFn({ op }); 128 if (labels && labels.length > 0) { 129 matchCount++; 130 matchBatch.push({ result: { bundle: bundleNum, position, cid: op.cid.slice(-4), labels } }); 131 if (matchBatch.length >= BATCH_SIZE) flushBatch(); 132 } 133 } else { 134 // process mode 135 userFn({ op, position, bundle: bundleNum, line }); 136 } 137 138 if (totalOps % 10000 === 0) { 139 self.postMessage({ type: 'progress', currentBundle: bundleNum, totalOps, totalBytes, matchCount } as WorkerProgress); 140 } 141 } 142 143 self.postMessage({ type: 'progress', currentBundle: bundleNum, totalOps, totalBytes, matchCount } as WorkerProgress); 144 } catch (error) {} 145 } 146 147 flushBatch(); 148 149 let prepareResult: any; 150 if (mode !== 'query' && modulePath) { 151 const mod = await import(modulePath); 152 if (typeof mod.prepare === 'function') { 153 try { 154 prepareResult = await mod.prepare(); 155 } catch (error) {} 156 } 157 } 158 159 self.postMessage({ type: 'result', totalOps, totalBytes, matchCount, data: prepareResult || null } as WorkerResult); 160}; 161 162interface SimplePath { 163 segments: Array<{ type: 'property' | 'index'; value: string | number }>; 164} 165 166function compileSimplePath(expression: string): SimplePath { 167 const segments: Array<{ type: 'property' | 'index'; value: string | number }> = []; 168 let current = ''; 169 let i = 0; 170 171 while (i < expression.length) { 172 const char = expression[i]; 173 if (char === '.') { 174 if (current) segments.push({ type: 'property', value: current }); 175 current = ''; 176 i++; 177 } else if (char === '[') { 178 if (current) segments.push({ type: 'property', value: current }); 179 current = ''; 180 i++; 181 let index = ''; 182 while (i < expression.length && expression[i] !== ']') { 183 index += expression[i]; 184 i++; 185 } 186 segments.push({ type: 'index', value: parseInt(index) }); 187 i++; 188 } else { 189 current += char; 190 i++; 191 } 192 } 193 if (current) segments.push({ type: 'property', value: current }); 194 return { segments }; 195} 196 197function querySimplePath(obj: any, compiled: SimplePath): any { 198 let current = obj; 199 for (const segment of compiled.segments) { 200 if (current == null) return null; 201 if (segment.type === 'property') { 202 current = current[segment.value]; 203 } else { 204 if (Array.isArray(current)) { 205 current = current[segment.value as number]; 206 } else { 207 return null; 208 } 209 } 210 } 211 return current; 212}