+12
examples/flush-pds.ts
+12
examples/flush-pds.ts
···
1
+
// flushes all pds endpoints
2
+
3
+
const unique: Array<string> = []
4
+
5
+
export function process({ op }: { op: any }) {
6
+
7
+
const endpoint = op.operation.services?.atproto_pds?.endpoint
8
+
if (!unique.includes(endpoint)) {
9
+
console.log(endpoint)
10
+
unique.push(endpoint)
11
+
}
12
+
}
+1
-1
src/cli.ts
+1
-1
src/cli.ts
···
2
2
3
3
import { clone } from './cmds/clone';
4
4
import { detect } from './cmds/detect';
5
-
import { process as processCmd } from './cmds/process';
5
+
import { processCmd } from './cmds/process';
6
6
import { info } from './cmds/info';
7
7
import { verify } from './cmds/verify';
8
8
import { exportCmd } from './cmds/export';
+5
-3
src/cmds/common.ts
+5
-3
src/cmds/common.ts
···
11
11
silent: boolean;
12
12
flush?: boolean;
13
13
mode: 'detect' | 'process';
14
+
noProgress?: boolean;
14
15
onMatch?: (match: any, matchCount: number, matchedBytes: number) => void;
15
16
}
16
17
···
18
19
* Common processing logic for both detect and process commands
19
20
*/
20
21
export async function processOperations(options: ProcessingOptions) {
21
-
const { dir, start, end, modulePath, threads, silent, flush, mode, onMatch } = options;
22
+
const { dir, start, end, modulePath, threads, silent, flush, mode, noProgress, onMatch } = options;
22
23
23
24
const bundle = new PLCBundle(dir);
24
25
···
58
59
threads,
59
60
silent,
60
61
flush,
61
-
onProgress: (progressStats: ProcessStats) => {
62
+
onProgress: noProgress ? undefined : (progressStats: ProcessStats) => { // Check noProgress
62
63
const elapsed = (Date.now() - startTime) / 1000;
63
64
const opsPerSec = (progressStats.totalOps / elapsed).toFixed(0);
64
65
const mbPerSec = (progressStats.totalBytes / elapsed / 1e6).toFixed(1);
···
129
130
}
130
131
},
131
132
{
132
-
onProgress: (progressStats: ProcessStats) => {
133
+
onProgress: noProgress ? undefined : (progressStats: ProcessStats) => { // Check noProgress
133
134
const elapsed = (Date.now() - startTime) / 1000;
134
135
const opsPerSec = (progressStats.totalOps / elapsed).toFixed(0);
135
136
const mbPerSec = (progressStats.totalBytes / elapsed / 1e6).toFixed(1);
···
182
183
silent: { type: 'boolean', default: false },
183
184
s: { type: 'boolean', default: false },
184
185
flush: { type: 'boolean', default: false },
186
+
'no-progress': { type: 'boolean', default: false }, // Add this
185
187
},
186
188
strict: false,
187
189
allowPositionals: true,
+4
-1
src/cmds/detect.ts
+4
-1
src/cmds/detect.ts
···
18
18
--threads <num> Number of worker threads (default: 1)
19
19
--flush Output matches immediately (unsorted)
20
20
-s, --silent Suppress all console output from detect script
21
+
--no-progress Disable progress output (default: false)
21
22
22
23
EXAMPLES:
23
24
plcbundle-bun detect ./detect.ts
24
25
plcbundle-bun detect ./detect.ts --bundles 1-50 --threads 4
25
-
plcbundle-bun detect ./detect.ts --flush --silent
26
+
plcbundle-bun detect ./detect.ts --flush --silent --no-progress
26
27
`);
27
28
return;
28
29
}
···
40
41
const threads = parseInt((values.threads as string) || '1');
41
42
const silent = Boolean(values.silent || values.s);
42
43
const flush = Boolean(values.flush);
44
+
const noProgress = Boolean(values['no-progress']); // Add this
43
45
44
46
const bundle = new PLCBundle(dir);
45
47
const { start, end } = await parseBundleSelection(values, bundle);
···
53
55
threads,
54
56
silent,
55
57
flush,
58
+
noProgress, // Pass it
56
59
mode: 'detect',
57
60
onMatch: (match) => {
58
61
console.log(`${match.bundle},${match.position},${match.cid},${match.size},0.95,${match.labels.join(';')}`);
+6
-1
src/cmds/process.ts
+6
-1
src/cmds/process.ts
···
1
+
import { exit } from 'process';
1
2
import { parseProcessArgs, parseBundleSelection, processOperations } from './common';
2
3
import { PLCBundle } from '../plcbundle';
3
4
4
-
export async function process(args: string[]) {
5
+
export async function processCmd(args: string[]) {
5
6
if (args.includes('-h') || args.includes('--help')) {
6
7
console.log(`
7
8
process - Process operations with a custom function
···
17
18
--bundles <spec> Bundle selection: number (42) or range (1-50)
18
19
--threads <num> Number of worker threads (default: 1)
19
20
-s, --silent Suppress all console output from process script
21
+
--no-progress Disable progress output (default: false)
20
22
21
23
EXAMPLES:
22
24
plcbundle-bun process ./my-processor.ts
23
25
plcbundle-bun process ./my-processor.ts --bundles 1-50 --threads 4
26
+
plcbundle-bun process ./my-processor.ts --no-progress
24
27
25
28
PROCESS FUNCTION:
26
29
export function process({ op, position, bundle, line }) {
···
42
45
const dir = (values.dir as string) || './';
43
46
const threads = parseInt((values.threads as string) || '1');
44
47
const silent = Boolean(values.silent || values.s);
48
+
const noProgress = Boolean(values['no-progress']); // Add this
45
49
46
50
const bundle = new PLCBundle(dir);
47
51
const { start, end } = await parseBundleSelection(values, bundle);
···
54
58
modulePath: resolvedPath,
55
59
threads,
56
60
silent,
61
+
noProgress, // Pass it
57
62
mode: 'process',
58
63
});
59
64
}
+39
-7
src/plcbundle.ts
+39
-7
src/plcbundle.ts
···
307
307
throw new Error('Multi-threading requires module path. Use: processBundles(start, end, { module: "./detect.ts", threads: 4 })');
308
308
}
309
309
310
+
// Determine mode based on what function is exported
311
+
let mode: 'detect' | 'process' = 'detect';
312
+
if (module) {
313
+
try {
314
+
const mod = await import(module);
315
+
// If module has 'process' function, use process mode
316
+
if (mod.process) {
317
+
mode = 'process';
318
+
} else if (mod.detect) {
319
+
mode = 'detect';
320
+
}
321
+
} catch (e) {
322
+
// Default to detect
323
+
}
324
+
}
325
+
310
326
// Use workers for multi-threading with module
311
327
if (threads > 1 && module) {
312
-
return await this.processBundlesWorkers(start, end, module, threads, silent, flush, onProgress, onMatch);
328
+
return await this.processBundlesWorkers(
329
+
start,
330
+
end,
331
+
module,
332
+
threads,
333
+
silent,
334
+
flush,
335
+
mode, // Pass mode
336
+
onProgress,
337
+
onMatch
338
+
);
313
339
}
314
340
315
341
// Load module if provided but single-threaded
316
342
if (module && !callback) {
317
-
const resolvedPath = Bun.resolveSync(module, process.cwd());
343
+
const resolvedPath = module;
318
344
const mod = await import(resolvedPath);
319
-
const detectFn = mod.detect || mod.default;
345
+
const userFn = mode === 'detect' ? (mod.detect || mod.default) : (mod.process || mod.default);
320
346
321
-
callback = (op) => {
322
-
detectFn({ op });
347
+
callback = (op, position, bundleNum, line) => {
348
+
if (mode === 'detect') {
349
+
userFn({ op });
350
+
} else {
351
+
userFn({ op, position, bundle: bundleNum, line });
352
+
}
323
353
};
324
354
}
325
355
···
341
371
threads: number,
342
372
silent: boolean,
343
373
flush: boolean,
374
+
mode: 'detect' | 'process', // Add mode parameter
344
375
onProgress?: (stats: ProcessStats) => void,
345
376
onMatch?: (match: any) => void
346
377
): Promise<ProcessStats & { matches?: any[] }> {
···
406
437
modulePath,
407
438
silent,
408
439
flush,
440
+
mode, // Pass mode to worker
409
441
});
410
442
}
411
443
···
428
460
}
429
461
430
462
// Sort matches if not flushed
431
-
if (!flush) {
463
+
if (!flush && mode === 'detect') {
432
464
allMatches.sort((a, b) => {
433
465
if (a.bundle !== b.bundle) return a.bundle - b.bundle;
434
466
return a.position - b.position;
···
440
472
matchCount: 0,
441
473
totalBytes,
442
474
matchedBytes: 0,
443
-
matches: flush ? undefined : allMatches,
475
+
matches: flush || mode === 'process' ? undefined : allMatches,
444
476
};
445
477
}
446
478
+32
-22
src/worker.ts
+32
-22
src/worker.ts
···
7
7
modulePath: string;
8
8
silent?: boolean;
9
9
flush?: boolean;
10
+
mode?: 'detect' | 'process'; // Add mode parameter
10
11
}
11
12
12
13
export interface WorkerProgress {
···
38
39
}
39
40
40
41
self.onmessage = async (event: MessageEvent<WorkerTask>) => {
41
-
const { dir, start, end, modulePath, silent, flush } = event.data;
42
+
const { dir, start, end, modulePath, silent, flush, mode = 'detect' } = event.data;
42
43
43
44
// Override console if silent
44
45
if (silent) {
···
52
53
} as any;
53
54
}
54
55
55
-
// Load detect function
56
+
// Load the appropriate function based on mode
56
57
const mod = await import(modulePath);
57
-
const detectFn = mod.detect || mod.default;
58
+
const userFn = mode === 'detect'
59
+
? (mod.detect || mod.default)
60
+
: (mod.process || mod.default);
58
61
59
62
let totalOps = 0;
60
63
let totalBytes = 0;
···
78
81
totalBytes += line.length;
79
82
80
83
const op = JSON.parse(line);
81
-
const labels = detectFn({ op });
82
84
83
-
if (labels && labels.length > 0) {
84
-
const match = {
85
-
bundle: bundleNum,
86
-
position,
87
-
cid: op.cid.slice(-4),
88
-
size: line.length,
89
-
labels,
90
-
};
85
+
if (mode === 'detect') {
86
+
// Detection mode - look for labels
87
+
const labels = userFn({ op });
91
88
92
-
if (flush) {
93
-
// Send match immediately
94
-
self.postMessage({
95
-
type: 'match',
96
-
...match,
97
-
} as WorkerMatch);
98
-
} else {
99
-
// Buffer matches
100
-
matches.push(match);
89
+
if (labels && labels.length > 0) {
90
+
const match = {
91
+
bundle: bundleNum,
92
+
position,
93
+
cid: op.cid.slice(-4),
94
+
size: line.length,
95
+
labels,
96
+
};
97
+
98
+
if (flush) {
99
+
// Send match immediately
100
+
self.postMessage({
101
+
type: 'match',
102
+
...match,
103
+
} as WorkerMatch);
104
+
} else {
105
+
// Buffer matches
106
+
matches.push(match);
107
+
}
101
108
}
109
+
} else {
110
+
// Process mode - just call the function
111
+
userFn({ op, position, bundle: bundleNum, line });
102
112
}
103
113
104
114
if (totalOps % 10000 === 0) {
···
121
131
totalBytes,
122
132
matches: flush ? [] : matches,
123
133
} as WorkerResult);
124
-
};
134
+
};
+8
-9
tests/commands.test.ts
+8
-9
tests/commands.test.ts
···
1
1
import { describe, test, expect, beforeEach } from 'bun:test';
2
2
import { PLCBundle } from '../src/plcbundle';
3
3
import { TEMP_DIR, createMockIndex, createMockOperations } from './setup';
4
-
import { resolve } from 'path';
5
4
6
5
describe('CLI Commands', () => {
7
6
let detectModulePath: string;
···
20
19
await Bun.write(bundle.getBundlePath(i), compressed);
21
20
}
22
21
23
-
// Create test modules with Bun.resolveSync
24
-
detectModulePath = Bun.resolveSync(`${TEMP_DIR}/test-detect.ts`, process.cwd());
22
+
// Create test modules - use absolute paths
23
+
detectModulePath = `${process.cwd()}/${TEMP_DIR}/test-detect.ts`;
25
24
await Bun.write(detectModulePath, `
26
-
export function detect({ op }) {
25
+
export function detect({ op }) {
27
26
return op.did.includes('test') ? ['test'] : [];
28
-
}
27
+
}
29
28
`);
30
29
31
-
processModulePath = Bun.resolveSync(`${TEMP_DIR}/test-process.ts`, process.cwd());
30
+
processModulePath = `${process.cwd()}/${TEMP_DIR}/test-process.ts`;
32
31
await Bun.write(processModulePath, `
33
-
let count = 0;
34
-
export function process({ op }) {
32
+
let count = 0;
33
+
export function process({ op }) {
35
34
count++;
36
-
}
35
+
}
37
36
`);
38
37
});
39
38
+2
-2
tests/multithread.test.ts
+2
-2
tests/multithread.test.ts
···
20
20
await Bun.write(bundle.getBundlePath(i), compressed);
21
21
}
22
22
23
-
// Create test module with Bun.resolveSync
24
-
modulePath = Bun.resolveSync(`${TEMP_DIR}/test-module.ts`, process.cwd());
23
+
// Create test module - use absolute path
24
+
modulePath = `${process.cwd()}/${TEMP_DIR}/test-module.ts`;
25
25
await Bun.write(modulePath, `
26
26
export function detect({ op }) {
27
27
return op.did.length > 10 ? ['long-did'] : [];
+3
-3
tests/processing.test.ts
+3
-3
tests/processing.test.ts
···
196
196
197
197
describe('processBundles with module path', () => {
198
198
test('loads module and calls function', async () => {
199
-
// Create a test module
200
-
const testModulePath = Bun.resolveSync(`${TEMP_DIR}/test-module.ts`, process.cwd());
199
+
// Create a test module with absolute path
200
+
const testModulePath = `${process.cwd()}/${TEMP_DIR}/test-module.ts`;
201
201
await Bun.write(testModulePath, `
202
202
export function detect({ op }) {
203
203
return op.did.startsWith('did:plc:') ? ['test'] : [];
···
212
212
});
213
213
214
214
test('supports silent mode', async () => {
215
-
// Create absolute path directly (file doesn't exist yet to resolve)
215
+
// Create absolute path directly
216
216
const testModulePath = `${process.cwd()}/${TEMP_DIR}/noisy-module.ts`;
217
217
await Bun.write(testModulePath, `
218
218
export function detect({ op }) {