+562
-253
cmd/plcbundle/commands/detector.go
+562
-253
cmd/plcbundle/commands/detector.go
···
7
7
"net/http"
8
8
_ "net/http/pprof"
9
9
"os"
10
+
"runtime"
10
11
"sort"
11
12
"strings"
12
-
13
-
flag "github.com/spf13/pflag"
13
+
"sync"
14
+
"sync/atomic"
14
15
15
16
"github.com/goccy/go-json"
17
+
"github.com/spf13/cobra"
18
+
"tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui"
16
19
"tangled.org/atscan.net/plcbundle/detector"
17
20
"tangled.org/atscan.net/plcbundle/internal/plcclient"
18
21
)
19
22
20
-
// DetectorCommand handles the detector subcommand
21
-
func DetectorCommand(args []string) error {
22
-
if len(args) < 1 {
23
-
printDetectorUsage()
24
-
return fmt.Errorf("subcommand required")
25
-
}
23
+
func NewDetectorCommand() *cobra.Command {
24
+
cmd := &cobra.Command{
25
+
Use: "detector",
26
+
Aliases: []string{"detect"},
27
+
Short: "Detect and filter spam/invalid operations",
28
+
Long: `Detect and filter spam/invalid operations
26
29
27
-
subcommand := args[0]
30
+
Run detection algorithms on PLC operations to identify spam,
31
+
invalid handles, service abuse, and other problematic patterns.
28
32
29
-
switch subcommand {
30
-
case "list":
31
-
return detectorList(args[1:])
32
-
case "test":
33
-
return detectorTest(args[1:])
34
-
case "run":
35
-
return detectorRun(args[1:])
36
-
case "filter":
37
-
return detectorFilter(args[1:])
38
-
case "info":
39
-
return detectorInfo(args[1:])
40
-
default:
41
-
printDetectorUsage()
42
-
return fmt.Errorf("unknown detector subcommand: %s", subcommand)
43
-
}
44
-
}
33
+
Built-in detectors:
34
+
• invalid_handle - Invalid handle patterns (underscores, etc)
35
+
• aka_spam - Excessive/garbage alsoKnownAs entries
36
+
• spam_pds - Known spam PDS endpoints
37
+
• service_abuse - Abused service structures
38
+
• noop - Benchmark detector (returns no matches)
45
39
46
-
func printDetectorUsage() {
47
-
fmt.Printf(`Usage: plcbundle detector <command> [options]
48
-
49
-
Commands:
50
-
list List available detectors
51
-
test Test a detector on specific bundles
52
-
run Run detector and output CSV results
53
-
filter Filter JSONL operations from stdin
54
-
info Show detailed detector information
40
+
Custom detectors:
41
+
Load JavaScript detectors from .js files with a detect() function.`,
55
42
56
-
Examples:
43
+
Example: ` # List available detectors
57
44
plcbundle detector list
45
+
46
+
# Run detector on bundles
58
47
plcbundle detector run invalid_handle --bundles 1-100
48
+
49
+
# Run with parallel processing
50
+
plcbundle detector run invalid_handle --bundles 1-100 --workers 8
51
+
52
+
# Run custom detector script
59
53
plcbundle detector run ./my_detector.js --bundles 1-100
54
+
55
+
# Run multiple detectors
56
+
plcbundle detector run invalid_handle aka_spam --bundles 1-100
57
+
58
+
# Run all detectors
60
59
plcbundle detector run all --bundles 1-100
61
-
plcbundle backfill | plcbundle detector filter ./my_detector.js > clean.jsonl
62
-
`)
63
-
}
64
60
65
-
func detectorList(_ []string) error {
66
-
registry := detector.DefaultRegistry()
67
-
detectors := registry.List()
61
+
# Filter JSONL from stdin
62
+
cat ops.jsonl | plcbundle detector filter invalid_handle > clean.jsonl
68
63
69
-
sort.Slice(detectors, func(i, j int) bool {
70
-
return detectors[i].Name() < detectors[j].Name()
71
-
})
64
+
# Get detector info
65
+
plcbundle detector info invalid_handle`,
66
+
}
72
67
73
-
fmt.Printf("Available detectors:\n\n")
74
-
for _, d := range detectors {
75
-
fmt.Printf(" %-20s %s (v%s)\n", d.Name(), d.Description(), d.Version())
76
-
}
77
-
fmt.Printf("\nUse 'plcbundle detector info <name>' for details\n")
68
+
// Add subcommands
69
+
cmd.AddCommand(newDetectorListCommand())
70
+
cmd.AddCommand(newDetectorTestCommand())
71
+
cmd.AddCommand(newDetectorRunCommand())
72
+
cmd.AddCommand(newDetectorFilterCommand())
73
+
cmd.AddCommand(newDetectorInfoCommand())
78
74
79
-
return nil
75
+
return cmd
80
76
}
81
77
82
-
func detectorTest(args []string) error {
83
-
if len(args) < 1 {
84
-
return fmt.Errorf("usage: plcbundle detector test <detector-name> --bundle N")
85
-
}
78
+
// ============================================================================
79
+
// DETECTOR LIST
80
+
// ============================================================================
81
+
82
+
func newDetectorListCommand() *cobra.Command {
83
+
return &cobra.Command{
84
+
Use: "list",
85
+
Short: "List available detectors",
86
+
Long: `List all available built-in and loaded detectors`,
86
87
87
-
detectorName := args[0]
88
+
Example: ` # List all detectors
89
+
plcbundle detector list`,
88
90
89
-
fs := flag.NewFlagSet("detector test", flag.ExitOnError)
90
-
bundleNum := fs.Int("bundle", 0, "bundle number to test")
91
-
confidence := fs.Float64("confidence", 0.90, "minimum confidence threshold")
92
-
verbose := fs.Bool("v", false, "verbose output")
91
+
RunE: func(cmd *cobra.Command, args []string) error {
92
+
registry := detector.DefaultRegistry()
93
+
detectors := registry.List()
94
+
95
+
sort.Slice(detectors, func(i, j int) bool {
96
+
return detectors[i].Name() < detectors[j].Name()
97
+
})
98
+
99
+
fmt.Printf("Available detectors:\n\n")
100
+
for _, d := range detectors {
101
+
fmt.Printf(" %-20s %s (v%s)\n", d.Name(), d.Description(), d.Version())
102
+
}
103
+
fmt.Printf("\nUse 'plcbundle detector info <name>' for details\n")
93
104
94
-
if err := fs.Parse(args[1:]); err != nil {
95
-
return err
105
+
return nil
106
+
},
96
107
}
108
+
}
97
109
98
-
if *bundleNum == 0 {
99
-
return fmt.Errorf("--bundle required")
110
+
// ============================================================================
111
+
// DETECTOR TEST
112
+
// ============================================================================
113
+
114
+
func newDetectorTestCommand() *cobra.Command {
115
+
var (
116
+
bundleNum int
117
+
confidence float64
118
+
verbose bool
119
+
)
120
+
121
+
cmd := &cobra.Command{
122
+
Use: "test <detector-name>",
123
+
Short: "Test detector on specific bundle",
124
+
Long: `Test a detector on a specific bundle and show results`,
125
+
126
+
Example: ` # Test on bundle 42
127
+
plcbundle detector test invalid_handle --bundle 42
128
+
129
+
# Verbose output with samples
130
+
plcbundle detector test aka_spam --bundle 100 -v
131
+
132
+
# Custom confidence threshold
133
+
plcbundle detector test spam_pds --bundle 50 --confidence 0.85`,
134
+
135
+
Args: cobra.ExactArgs(1),
136
+
137
+
RunE: func(cmd *cobra.Command, args []string) error {
138
+
detectorName := args[0]
139
+
140
+
mgr, _, err := getManager(&ManagerOptions{Cmd: cmd})
141
+
if err != nil {
142
+
return err
143
+
}
144
+
defer mgr.Close()
145
+
146
+
ctx := context.Background()
147
+
bundle, err := mgr.LoadBundle(ctx, bundleNum)
148
+
if err != nil {
149
+
return fmt.Errorf("failed to load bundle: %w", err)
150
+
}
151
+
152
+
fmt.Printf("Testing detector '%s' on bundle %06d...\n", detectorName, bundleNum)
153
+
fmt.Printf("Min confidence: %.2f\n\n", confidence)
154
+
155
+
registry := detector.DefaultRegistry()
156
+
config := detector.DefaultConfig()
157
+
config.MinConfidence = confidence
158
+
159
+
runner := detector.NewRunner(registry, config, &commandLogger{})
160
+
results, err := runner.RunOnBundle(ctx, detectorName, bundle)
161
+
if err != nil {
162
+
return fmt.Errorf("detection failed: %w", err)
163
+
}
164
+
165
+
stats := detector.CalculateStats(results, len(bundle.Operations))
166
+
167
+
fmt.Printf("Results:\n")
168
+
fmt.Printf(" Total operations: %d\n", stats.TotalOperations)
169
+
fmt.Printf(" Matches found: %d (%.2f%%)\n\n", stats.MatchedCount, stats.MatchRate*100)
170
+
171
+
if len(stats.ByReason) > 0 {
172
+
fmt.Printf("Breakdown by reason:\n")
173
+
for reason, count := range stats.ByReason {
174
+
pct := float64(count) / float64(stats.MatchedCount) * 100
175
+
fmt.Printf(" %-25s %d (%.1f%%)\n", reason, count, pct)
176
+
}
177
+
fmt.Printf("\n")
178
+
}
179
+
180
+
if verbose && len(results) > 0 {
181
+
fmt.Printf("Sample matches (first 10):\n")
182
+
displayCount := min(10, len(results))
183
+
184
+
for i := 0; i < displayCount; i++ {
185
+
res := results[i]
186
+
globalPos := (res.BundleNumber * 10000) + res.Position
187
+
fmt.Printf(" %d. Position %d: %s\n", i+1, globalPos, res.DID)
188
+
fmt.Printf(" Reason: %s (confidence: %.2f)\n", res.Match.Reason, res.Match.Confidence)
189
+
if res.Match.Note != "" {
190
+
fmt.Printf(" Note: %s\n", res.Match.Note)
191
+
}
192
+
}
193
+
}
194
+
195
+
return nil
196
+
},
100
197
}
101
198
102
-
mgr, _, err := getManager(nil)
103
-
if err != nil {
104
-
return err
199
+
cmd.Flags().IntVar(&bundleNum, "bundle", 0, "Bundle number to test (required)")
200
+
cmd.Flags().Float64Var(&confidence, "confidence", 0.90, "Minimum confidence threshold")
201
+
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Show sample matches")
202
+
203
+
cmd.MarkFlagRequired("bundle")
204
+
205
+
return cmd
206
+
}
207
+
208
+
// ============================================================================
209
+
// DETECTOR RUN (with parallel processing)
210
+
// ============================================================================
211
+
212
+
func newDetectorRunCommand() *cobra.Command {
213
+
var (
214
+
bundleRange string
215
+
confidence float64
216
+
pprofPort string
217
+
workers int
218
+
noProgress bool
219
+
)
220
+
221
+
cmd := &cobra.Command{
222
+
Use: "run <detector1|script.js> [detector2...] [flags]",
223
+
Short: "Run detector(s) and output CSV results",
224
+
Long: `Run one or more detectors on bundles and output results as CSV
225
+
226
+
Output format uses global position: (bundleNumber × 10,000) + position
227
+
Example: 88410345 = bundle 8841, position 345
228
+
229
+
Supports parallel processing across multiple workers for better performance.
230
+
231
+
Detectors can be:
232
+
• Built-in detector names (invalid_handle, aka_spam, etc)
233
+
• Path to JavaScript detector file (./my_detector.js)
234
+
• Special keyword 'all' to run all built-in detectors`,
235
+
236
+
Example: ` # Run single detector
237
+
plcbundle detector run invalid_handle --bundles 1-100
238
+
239
+
# Run with 8 parallel workers (faster)
240
+
plcbundle detector run invalid_handle --bundles 1-1000 --workers 8
241
+
242
+
# Run multiple detectors in parallel
243
+
plcbundle detector run invalid_handle aka_spam --bundles 1-100 -w 4
244
+
245
+
# Run custom script
246
+
plcbundle detector run ./my_detector.js --bundles 1-100
247
+
248
+
# Run all built-in detectors
249
+
plcbundle detector run all --bundles 1-100 --workers 8
250
+
251
+
# Save results to file
252
+
plcbundle detector run all --bundles 1-100 -w 8 > results.csv
253
+
254
+
# Disable progress bar (for scripting)
255
+
plcbundle detector run spam --bundles 1-100 --no-progress
256
+
257
+
# Enable profiling
258
+
plcbundle detector run all --bundles 1-100 --pprof :6060`,
259
+
260
+
Args: cobra.MinimumNArgs(1),
261
+
262
+
RunE: func(cmd *cobra.Command, args []string) error {
263
+
detectorNames := args
264
+
265
+
// Start pprof if requested
266
+
if pprofPort != "" {
267
+
go func() {
268
+
fmt.Fprintf(os.Stderr, "pprof server starting on http://localhost%s/debug/pprof/\n", pprofPort)
269
+
http.ListenAndServe(pprofPort, nil)
270
+
}()
271
+
}
272
+
273
+
// Auto-detect workers if not specified
274
+
if workers <= 0 {
275
+
workers = runtime.NumCPU()
276
+
if workers < 1 {
277
+
workers = 1
278
+
}
279
+
}
280
+
281
+
mgr, _, err := getManager(&ManagerOptions{Cmd: cmd})
282
+
if err != nil {
283
+
return err
284
+
}
285
+
defer mgr.Close()
286
+
287
+
// Determine range
288
+
var start, end int
289
+
if bundleRange == "" {
290
+
index := mgr.GetIndex()
291
+
bundles := index.GetBundles()
292
+
if len(bundles) == 0 {
293
+
return fmt.Errorf("no bundles available")
294
+
}
295
+
start = bundles[0].BundleNumber
296
+
end = bundles[len(bundles)-1].BundleNumber
297
+
fmt.Fprintf(os.Stderr, "Using all bundles: %d-%d\n", start, end)
298
+
} else {
299
+
start, end, err = parseBundleRange(bundleRange)
300
+
if err != nil {
301
+
return err
302
+
}
303
+
}
304
+
305
+
// Load detectors
306
+
setup, err := parseAndLoadDetectors(detectorNames, confidence)
307
+
if err != nil {
308
+
return err
309
+
}
310
+
defer setup.cleanup()
311
+
312
+
fmt.Fprintf(os.Stderr, "Running %d detector(s) on bundles %d-%d (%d workers)\n",
313
+
len(setup.detectors), start, end, workers)
314
+
fmt.Fprintf(os.Stderr, "Min confidence: %.2f\n\n", confidence)
315
+
316
+
return runDetectionParallel(cmd.Context(), mgr, setup, start, end, workers, !noProgress)
317
+
},
105
318
}
106
-
defer mgr.Close()
107
319
108
-
ctx := context.Background()
109
-
bundle, err := mgr.LoadBundle(ctx, *bundleNum)
110
-
if err != nil {
111
-
return fmt.Errorf("error loading bundle: %w", err)
320
+
cmd.Flags().StringVar(&bundleRange, "bundles", "", "Bundle range (e.g., '1-100', default: all)")
321
+
cmd.Flags().Float64Var(&confidence, "confidence", 0.90, "Minimum confidence threshold")
322
+
cmd.Flags().IntVarP(&workers, "workers", "w", 0, "Number of parallel workers (0 = auto-detect CPU count)")
323
+
cmd.Flags().BoolVar(&noProgress, "no-progress", false, "Disable progress bar")
324
+
cmd.Flags().StringVar(&pprofPort, "pprof", "", "Enable pprof on port (e.g., ':6060')")
325
+
326
+
return cmd
327
+
}
328
+
329
+
// ============================================================================
330
+
// DETECTOR FILTER
331
+
// ============================================================================
332
+
333
+
func newDetectorFilterCommand() *cobra.Command {
334
+
var confidence float64
335
+
336
+
cmd := &cobra.Command{
337
+
Use: "filter <detector1|script.js> [detector2...]",
338
+
Short: "Filter JSONL operations from stdin",
339
+
Long: `Filter JSONL operations from stdin using detectors
340
+
341
+
Reads operations from stdin, runs detectors, and outputs only
342
+
operations that DO NOT match (clean operations).
343
+
344
+
Perfect for cleaning datasets or pre-processing.`,
345
+
346
+
Example: ` # Filter with built-in detector
347
+
cat ops.jsonl | plcbundle detector filter invalid_handle > clean.jsonl
348
+
349
+
# Filter with custom script
350
+
plcbundle export --all | plcbundle detector filter ./spam.js > clean.jsonl
351
+
352
+
# Chain multiple detectors
353
+
cat ops.jsonl | plcbundle detector filter invalid_handle aka_spam > clean.jsonl
354
+
355
+
# Custom confidence
356
+
cat ops.jsonl | plcbundle detector filter spam_pds --confidence 0.95 > clean.jsonl`,
357
+
358
+
Args: cobra.MinimumNArgs(1),
359
+
360
+
RunE: func(cmd *cobra.Command, args []string) error {
361
+
detectorNames := args
362
+
363
+
setup, err := parseAndLoadDetectors(detectorNames, confidence)
364
+
if err != nil {
365
+
return err
366
+
}
367
+
defer setup.cleanup()
368
+
369
+
fmt.Fprintf(os.Stderr, "Filtering with %d detector(s), min confidence: %.2f\n\n", len(setup.detectors), confidence)
370
+
371
+
return filterFromStdin(cmd.Context(), setup)
372
+
},
112
373
}
113
374
114
-
fmt.Printf("Testing detector '%s' on bundle %06d...\n", detectorName, *bundleNum)
115
-
fmt.Printf("Min confidence: %.2f\n\n", *confidence)
375
+
cmd.Flags().Float64Var(&confidence, "confidence", 0.90, "Minimum confidence threshold")
116
376
117
-
registry := detector.DefaultRegistry()
118
-
config := detector.DefaultConfig()
119
-
config.MinConfidence = *confidence
377
+
return cmd
378
+
}
120
379
121
-
runner := detector.NewRunner(registry, config, &defaultLogger{})
122
-
results, err := runner.RunOnBundle(ctx, detectorName, bundle)
123
-
if err != nil {
124
-
return fmt.Errorf("detection failed: %w", err)
125
-
}
380
+
// ============================================================================
381
+
// DETECTOR INFO
382
+
// ============================================================================
126
383
127
-
stats := detector.CalculateStats(results, len(bundle.Operations))
384
+
func newDetectorInfoCommand() *cobra.Command {
385
+
return &cobra.Command{
386
+
Use: "info <detector-name>",
387
+
Short: "Show detailed detector information",
388
+
Long: `Show detailed information about a specific detector`,
128
389
129
-
fmt.Printf("Results:\n")
130
-
fmt.Printf(" Total operations: %d\n", stats.TotalOperations)
131
-
fmt.Printf(" Matches found: %d (%.2f%%)\n\n", stats.MatchedCount, stats.MatchRate*100)
390
+
Example: ` # Show detector info
391
+
plcbundle detector info invalid_handle
392
+
plcbundle detector info aka_spam`,
132
393
133
-
if len(stats.ByReason) > 0 {
134
-
fmt.Printf("Breakdown by reason:\n")
135
-
for reason, count := range stats.ByReason {
136
-
pct := float64(count) / float64(stats.MatchedCount) * 100
137
-
fmt.Printf(" %-25s %d (%.1f%%)\n", reason, count, pct)
138
-
}
139
-
fmt.Printf("\n")
140
-
}
394
+
Args: cobra.ExactArgs(1),
141
395
142
-
if *verbose && len(results) > 0 {
143
-
fmt.Printf("Sample matches (first 10):\n")
144
-
displayCount := 10
145
-
if len(results) < displayCount {
146
-
displayCount = len(results)
147
-
}
396
+
RunE: func(cmd *cobra.Command, args []string) error {
397
+
detectorName := args[0]
148
398
149
-
for i := 0; i < displayCount; i++ {
150
-
res := results[i]
151
-
fmt.Printf(" %d. Position %d: %s\n", i+1, res.Position, res.DID)
152
-
fmt.Printf(" Reason: %s (confidence: %.2f)\n", res.Match.Reason, res.Match.Confidence)
153
-
if res.Match.Note != "" {
154
-
fmt.Printf(" Note: %s\n", res.Match.Note)
399
+
registry := detector.DefaultRegistry()
400
+
d, err := registry.Get(detectorName)
401
+
if err != nil {
402
+
return err
155
403
}
156
-
}
404
+
405
+
fmt.Printf("Detector: %s\n", d.Name())
406
+
fmt.Printf("Version: %s\n", d.Version())
407
+
fmt.Printf("Description: %s\n\n", d.Description())
408
+
409
+
fmt.Printf("Usage examples:\n")
410
+
fmt.Printf(" # Test on single bundle\n")
411
+
fmt.Printf(" plcbundle detector test %s --bundle 42\n\n", d.Name())
412
+
fmt.Printf(" # Run on range and save\n")
413
+
fmt.Printf(" plcbundle detector run %s --bundles 1-100 > results.csv\n\n", d.Name())
414
+
fmt.Printf(" # Filter JSONL stream\n")
415
+
fmt.Printf(" cat ops.jsonl | plcbundle detector filter %s > clean.jsonl\n\n", d.Name())
416
+
417
+
return nil
418
+
},
157
419
}
420
+
}
421
+
422
+
// ============================================================================
423
+
// PARALLEL DETECTION IMPLEMENTATION
424
+
// ============================================================================
158
425
159
-
return nil
426
+
type detectionMatch struct {
427
+
globalPos int
428
+
cid string
429
+
size int
430
+
confidence float64
431
+
labels []string
160
432
}
161
433
162
-
func detectorRun(args []string) error {
163
-
if len(args) < 1 {
164
-
return fmt.Errorf("usage: plcbundle detector run <detector1|script.js> [detector2...] [--bundles 1-100]")
165
-
}
434
+
// detectionResult holds results from processing a bundle
435
+
type detectionResult struct {
436
+
bundleNum int
437
+
matches []detectionMatch
438
+
totalOps int
439
+
totalBytes int64
440
+
matchedBytes int64
441
+
err error
442
+
}
166
443
167
-
var detectorNames []string
168
-
var flagArgs []string
169
-
for i := 0; i < len(args); i++ {
170
-
if strings.HasPrefix(args[i], "-") {
171
-
flagArgs = args[i:]
172
-
break
173
-
}
174
-
detectorNames = append(detectorNames, args[i])
175
-
}
444
+
func runDetectionParallel(ctx context.Context, mgr BundleManager, setup *detectorSetup, start, end int, workers int, showProgress bool) error {
445
+
totalBundles := end - start + 1
176
446
177
-
if len(detectorNames) == 0 {
178
-
return fmt.Errorf("at least one detector name required")
447
+
// ✨ FIX: Don't create more workers than bundles
448
+
if workers > totalBundles {
449
+
workers = totalBundles
179
450
}
180
451
181
-
fs := flag.NewFlagSet("detector run", flag.ExitOnError)
182
-
bundleRange := fs.String("bundles", "", "bundle range (default: all)")
183
-
confidence := fs.Float64("confidence", 0.90, "minimum confidence")
184
-
pprofPort := fs.String("pprof", "", "enable pprof on port (e.g., :6060)")
452
+
// ✨ FIX: Use unbuffered channels to avoid blocking issues
453
+
jobs := make(chan int, workers*2) // Small buffer for job numbers only
454
+
results := make(chan detectionResult, workers*2)
185
455
186
-
if err := fs.Parse(flagArgs); err != nil {
187
-
return err
456
+
// Shared counters
457
+
var (
458
+
totalOps int64
459
+
matchCount int64
460
+
totalBytes int64
461
+
matchedBytes int64
462
+
)
463
+
464
+
// Progress tracking
465
+
var progress *ui.ProgressBar
466
+
if showProgress {
467
+
progress = ui.NewProgressBar(totalBundles)
188
468
}
189
469
190
-
// Start pprof if requested
191
-
if *pprofPort != "" {
192
-
go func() {
193
-
fmt.Fprintf(os.Stderr, "pprof server starting on http://localhost%s/debug/pprof/\n", *pprofPort)
194
-
http.ListenAndServe(*pprofPort, nil)
195
-
}()
196
-
}
470
+
// CSV header
471
+
fmt.Println("position,cid,size,confidence,labels")
472
+
473
+
// Start workers BEFORE sending jobs
474
+
var wg sync.WaitGroup
475
+
for w := 0; w < workers; w++ {
476
+
wg.Add(1)
477
+
go func(workerID int) {
478
+
defer wg.Done()
197
479
198
-
mgr, _, err := getManager(nil)
199
-
if err != nil {
200
-
return err
480
+
for bundleNum := range jobs {
481
+
select {
482
+
case <-ctx.Done():
483
+
return
484
+
default:
485
+
}
486
+
487
+
// Process bundle
488
+
res := processBundleForDetection(ctx, mgr, setup, bundleNum)
489
+
490
+
// Send result (may block if collector is slow, but that's OK)
491
+
select {
492
+
case results <- res:
493
+
case <-ctx.Done():
494
+
return
495
+
}
496
+
}
497
+
}(w)
201
498
}
202
-
defer mgr.Close()
203
499
204
-
// Determine range
205
-
var start, end int
206
-
if *bundleRange == "" {
207
-
index := mgr.GetIndex()
208
-
bundles := index.GetBundles()
209
-
if len(bundles) == 0 {
210
-
return fmt.Errorf("no bundles available")
500
+
// Send jobs in separate goroutine
501
+
go func() {
502
+
defer close(jobs)
503
+
for bundleNum := start; bundleNum <= end; bundleNum++ {
504
+
select {
505
+
case jobs <- bundleNum:
506
+
case <-ctx.Done():
507
+
return
508
+
}
211
509
}
212
-
start = bundles[0].BundleNumber
213
-
end = bundles[len(bundles)-1].BundleNumber
214
-
fmt.Fprintf(os.Stderr, "Using all bundles: %d-%d\n", start, end)
215
-
} else {
216
-
start, end, err = parseBundleRange(*bundleRange)
217
-
if err != nil {
218
-
return err
219
-
}
220
-
}
510
+
}()
511
+
512
+
// Collect results in separate goroutine
513
+
done := make(chan struct{})
514
+
go func() {
515
+
defer close(done)
516
+
wg.Wait()
517
+
close(results)
518
+
}()
221
519
222
-
// Load detectors
223
-
setup, err := parseAndLoadDetectors(detectorNames, *confidence)
224
-
if err != nil {
225
-
return err
226
-
}
227
-
defer setup.cleanup()
520
+
// Process results in main goroutine
521
+
processed := 0
522
+
allMatches := make([]detectionMatch, 0, 1000)
228
523
229
-
fmt.Fprintf(os.Stderr, "Running %d detector(s) on bundles %d-%d\n", len(setup.detectors), start, end)
230
-
fmt.Fprintf(os.Stderr, "Min confidence: %.2f\n\n", *confidence)
524
+
for res := range results {
525
+
processed++
231
526
232
-
ctx := context.Background()
233
-
fmt.Println("bundle,position,cid,size,confidence,labels")
527
+
if res.err != nil {
528
+
if res.err != context.Canceled {
529
+
fmt.Fprintf(os.Stderr, "\nWarning: bundle %06d failed: %v\n", res.bundleNum, res.err)
530
+
}
531
+
} else {
532
+
// Update counters
533
+
atomic.AddInt64(&totalOps, int64(res.totalOps))
534
+
atomic.AddInt64(&matchCount, int64(len(res.matches)))
535
+
atomic.AddInt64(&totalBytes, res.totalBytes)
536
+
atomic.AddInt64(&matchedBytes, res.matchedBytes)
234
537
235
-
totalOps, matchCount := 0, 0
236
-
totalBytes, matchedBytes := int64(0), int64(0)
538
+
// Collect matches
539
+
allMatches = append(allMatches, res.matches...)
237
540
238
-
for bundleNum := start; bundleNum <= end; bundleNum++ {
239
-
bundle, err := mgr.LoadBundle(ctx, bundleNum)
240
-
if err != nil {
241
-
continue
541
+
// ✨ FIX: Output immediately (don't buffer too much)
542
+
if len(allMatches) >= 500 {
543
+
for _, match := range allMatches {
544
+
fmt.Printf("%d,%s,%d,%.2f,%s\n",
545
+
match.globalPos, match.cid, match.size,
546
+
match.confidence, strings.Join(match.labels, ";"))
547
+
}
548
+
allMatches = allMatches[:0]
549
+
}
242
550
}
243
551
244
-
totalOps += len(bundle.Operations)
552
+
if progress != nil {
553
+
progress.Set(processed)
554
+
}
555
+
}
245
556
246
-
for position, op := range bundle.Operations {
247
-
opSize := len(op.RawJSON)
248
-
if opSize == 0 {
249
-
data, _ := json.Marshal(op)
250
-
opSize = len(data)
251
-
}
252
-
totalBytes += int64(opSize)
557
+
// Flush remaining matches
558
+
for _, match := range allMatches {
559
+
fmt.Printf("%d,%s,%d,%.2f,%s\n",
560
+
match.globalPos, match.cid, match.size,
561
+
match.confidence, strings.Join(match.labels, ";"))
562
+
}
253
563
254
-
labels, conf := detectOperation(ctx, setup.detectors, op, setup.confidence)
564
+
if progress != nil {
565
+
progress.Finish()
566
+
}
255
567
256
-
if len(labels) > 0 {
257
-
matchCount++
258
-
matchedBytes += int64(opSize)
568
+
// Wait for cleanup
569
+
<-done
259
570
260
-
cidShort := op.CID
261
-
if len(cidShort) > 4 {
262
-
cidShort = cidShort[len(cidShort)-4:]
263
-
}
571
+
// Summary
572
+
finalTotalOps := atomic.LoadInt64(&totalOps)
573
+
finalMatchCount := atomic.LoadInt64(&matchCount)
574
+
finalTotalBytes := atomic.LoadInt64(&totalBytes)
575
+
finalMatchedBytes := atomic.LoadInt64(&matchedBytes)
264
576
265
-
fmt.Printf("%d,%d,%s,%d,%.2f,%s\n",
266
-
bundleNum, position, cidShort, opSize, conf, strings.Join(labels, ";"))
267
-
}
268
-
}
577
+
if finalTotalOps == 0 {
578
+
fmt.Fprintf(os.Stderr, "\n⚠️ No operations processed\n")
579
+
return nil
269
580
}
270
581
271
582
fmt.Fprintf(os.Stderr, "\n✓ Detection complete\n")
272
-
fmt.Fprintf(os.Stderr, " Total operations: %d\n", totalOps)
273
-
fmt.Fprintf(os.Stderr, " Matches found: %d (%.2f%%)\n", matchCount, float64(matchCount)/float64(totalOps)*100)
583
+
fmt.Fprintf(os.Stderr, " Total operations: %d\n", finalTotalOps)
584
+
fmt.Fprintf(os.Stderr, " Matches found: %d (%.2f%%)\n",
585
+
finalMatchCount, float64(finalMatchCount)/float64(finalTotalOps)*100)
586
+
fmt.Fprintf(os.Stderr, " Total size: %s\n", formatBytes(finalTotalBytes))
587
+
fmt.Fprintf(os.Stderr, " Matched size: %s (%.2f%%)\n",
588
+
formatBytes(finalMatchedBytes),
589
+
float64(finalMatchedBytes)/float64(finalTotalBytes)*100)
274
590
275
591
return nil
276
592
}
277
593
278
-
func detectorFilter(args []string) error {
279
-
if len(args) < 1 {
280
-
return fmt.Errorf("usage: plcbundle detector filter <detector1|script.js> [detector2...] [--confidence 0.9]")
594
+
func processBundleForDetection(ctx context.Context, mgr BundleManager, setup *detectorSetup, bundleNum int) detectionResult {
595
+
res := detectionResult{bundleNum: bundleNum}
596
+
597
+
bundle, err := mgr.LoadBundle(ctx, bundleNum)
598
+
if err != nil {
599
+
res.err = err
600
+
return res
281
601
}
282
602
283
-
var detectorNames []string
284
-
var flagArgs []string
285
-
for i := 0; i < len(args); i++ {
286
-
if strings.HasPrefix(args[i], "-") {
287
-
flagArgs = args[i:]
288
-
break
603
+
res.totalOps = len(bundle.Operations)
604
+
matches := make([]detectionMatch, 0)
605
+
606
+
for position, op := range bundle.Operations {
607
+
// Calculate global position
608
+
globalPos := (bundleNum * 10000) + position
609
+
610
+
opSize := len(op.RawJSON)
611
+
if opSize == 0 {
612
+
data, _ := json.Marshal(op)
613
+
opSize = len(data)
289
614
}
290
-
detectorNames = append(detectorNames, args[i])
291
-
}
615
+
res.totalBytes += int64(opSize)
616
+
617
+
labels, conf := detectOperation(ctx, setup.detectors, op, setup.confidence)
292
618
293
-
if len(detectorNames) == 0 {
294
-
return fmt.Errorf("at least one detector name required")
295
-
}
619
+
if len(labels) > 0 {
620
+
res.matchedBytes += int64(opSize)
296
621
297
-
fs := flag.NewFlagSet("detector filter", flag.ExitOnError)
298
-
confidence := fs.Float64("confidence", 0.90, "minimum confidence")
622
+
cidShort := op.CID
623
+
if len(cidShort) > 4 {
624
+
cidShort = cidShort[len(cidShort)-4:]
625
+
}
299
626
300
-
if err := fs.Parse(flagArgs); err != nil {
301
-
return err
627
+
matches = append(matches, detectionMatch{
628
+
globalPos: globalPos,
629
+
cid: cidShort,
630
+
size: opSize,
631
+
confidence: conf,
632
+
labels: labels,
633
+
})
634
+
}
302
635
}
303
636
304
-
setup, err := parseAndLoadDetectors(detectorNames, *confidence)
305
-
if err != nil {
306
-
return err
307
-
}
308
-
defer setup.cleanup()
637
+
res.matches = matches
638
+
return res
639
+
}
309
640
310
-
fmt.Fprintf(os.Stderr, "Filtering with %d detector(s), min confidence: %.2f\n\n", len(setup.detectors), *confidence)
641
+
// ============================================================================
642
+
// FILTER FROM STDIN
643
+
// ============================================================================
311
644
312
-
ctx := context.Background()
645
+
func filterFromStdin(ctx context.Context, setup *detectorSetup) error {
313
646
scanner := bufio.NewScanner(os.Stdin)
314
647
buf := make([]byte, 0, 64*1024)
315
648
scanner.Buffer(buf, 1024*1024)
···
334
667
labels, _ := detectOperation(ctx, setup.detectors, op, setup.confidence)
335
668
336
669
if len(labels) == 0 {
670
+
// Clean - output to stdout
337
671
cleanCount++
338
672
fmt.Println(string(line))
339
673
} else {
674
+
// Matched - filter out
340
675
filteredCount++
341
676
filteredBytes += int64(len(line))
342
677
}
···
355
690
return nil
356
691
}
357
692
358
-
func detectorInfo(args []string) error {
359
-
if len(args) < 1 {
360
-
return fmt.Errorf("usage: plcbundle detector info <name>")
361
-
}
362
-
363
-
detectorName := args[0]
364
-
365
-
registry := detector.DefaultRegistry()
366
-
d, err := registry.Get(detectorName)
367
-
if err != nil {
368
-
return err
369
-
}
370
-
371
-
fmt.Printf("Detector: %s\n", d.Name())
372
-
fmt.Printf("Version: %s\n", d.Version())
373
-
fmt.Printf("Description: %s\n\n", d.Description())
374
-
375
-
fmt.Printf("Usage examples:\n")
376
-
fmt.Printf(" # Test on single bundle\n")
377
-
fmt.Printf(" plcbundle detector test %s --bundle 42\n\n", d.Name())
378
-
fmt.Printf(" # Run on range and save\n")
379
-
fmt.Printf(" plcbundle detector run %s --bundles 1-100 > results.csv\n\n", d.Name())
380
-
381
-
return nil
382
-
}
383
-
384
-
// Helper functions
693
+
// ============================================================================
694
+
// SHARED HELPERS
695
+
// ============================================================================
385
696
386
697
type detectorSetup struct {
387
698
detectors []detector.Detector
···
398
709
func parseAndLoadDetectors(detectorNames []string, confidence float64) (*detectorSetup, error) {
399
710
registry := detector.DefaultRegistry()
400
711
712
+
// Handle "all" keyword
401
713
if len(detectorNames) == 1 && detectorNames[0] == "all" {
402
714
detectorNames = registry.Names()
403
715
fmt.Fprintf(os.Stderr, "Using all detectors: %s\n", strings.Join(detectorNames, ", "))
···
410
722
}
411
723
412
724
for _, name := range detectorNames {
725
+
// JavaScript detector
413
726
if strings.HasSuffix(name, ".js") {
414
727
sd, err := detector.NewScriptDetector(name)
415
728
if err != nil {
416
729
setup.cleanup()
417
-
return nil, fmt.Errorf("error loading script %s: %w", name, err)
730
+
return nil, fmt.Errorf("failed to load script %s: %w", name, err)
418
731
}
419
732
setup.scriptDetectors = append(setup.scriptDetectors, sd)
420
733
registry.Register(sd)
421
734
setup.detectors = append(setup.detectors, sd)
422
-
fmt.Fprintf(os.Stderr, "✓ Started detector server: %s\n", sd.Name())
735
+
fmt.Fprintf(os.Stderr, "✓ Loaded detector: %s\n", sd.Name())
423
736
} else {
737
+
// Built-in detector
424
738
d, err := registry.Get(name)
425
739
if err != nil {
426
740
setup.cleanup()
···
449
763
continue
450
764
}
451
765
766
+
// Extract labels
452
767
var labels []string
453
768
if labelList, ok := match.Metadata["labels"].([]string); ok {
454
769
labels = labelList
···
472
787
473
788
return matchedLabels, maxConfidence
474
789
}
475
-
476
-
type defaultLogger struct{}
477
-
478
-
func (d *defaultLogger) Printf(format string, v ...interface{}) {
479
-
fmt.Fprintf(os.Stderr, format+"\n", v...)
480
-
}
-166
cmd/plcbundle/commands/rebuild.go
-166
cmd/plcbundle/commands/rebuild.go
···
1
-
package commands
2
-
3
-
import (
4
-
"context"
5
-
"fmt"
6
-
"os"
7
-
"path/filepath"
8
-
"runtime"
9
-
"time"
10
-
11
-
flag "github.com/spf13/pflag"
12
-
13
-
"tangled.org/atscan.net/plcbundle/bundle"
14
-
"tangled.org/atscan.net/plcbundle/cmd/plcbundle/ui"
15
-
"tangled.org/atscan.net/plcbundle/internal/bundleindex"
16
-
)
17
-
18
-
// RebuildCommand handles the rebuild subcommand
19
-
func RebuildCommand(args []string) error {
20
-
fs := flag.NewFlagSet("rebuild", flag.ExitOnError)
21
-
verbose := fs.Bool("v", false, "verbose output")
22
-
workers := fs.Int("workers", 0, "number of parallel workers (0 = CPU count)")
23
-
noProgress := fs.Bool("no-progress", false, "disable progress bar")
24
-
25
-
if err := fs.Parse(args); err != nil {
26
-
return err
27
-
}
28
-
29
-
// Auto-detect CPU count
30
-
if *workers == 0 {
31
-
*workers = runtime.NumCPU()
32
-
}
33
-
34
-
// Get working directory
35
-
dir, err := os.Getwd()
36
-
if err != nil {
37
-
return err
38
-
}
39
-
40
-
if err := os.MkdirAll(dir, 0755); err != nil {
41
-
return err
42
-
}
43
-
44
-
// Create manager WITHOUT auto-rebuild
45
-
config := bundle.DefaultConfig(dir)
46
-
config.AutoRebuild = false
47
-
config.RebuildWorkers = *workers
48
-
49
-
mgr, err := bundle.NewManager(config, nil)
50
-
if err != nil {
51
-
return err
52
-
}
53
-
defer mgr.Close()
54
-
55
-
fmt.Printf("Rebuilding index from: %s\n", dir)
56
-
fmt.Printf("Using %d workers\n", *workers)
57
-
58
-
// Find all bundle files
59
-
files, err := filepath.Glob(filepath.Join(dir, "*.jsonl.zst"))
60
-
if err != nil {
61
-
return fmt.Errorf("error scanning directory: %w", err)
62
-
}
63
-
64
-
// Filter out hidden/temp files
65
-
files = filterBundleFiles(files)
66
-
67
-
if len(files) == 0 {
68
-
fmt.Println("No bundle files found")
69
-
return nil
70
-
}
71
-
72
-
fmt.Printf("Found %d bundle files\n\n", len(files))
73
-
74
-
start := time.Now()
75
-
76
-
// Create progress bar
77
-
var progress *ui.ProgressBar
78
-
var progressCallback func(int, int, int64)
79
-
80
-
if !*noProgress {
81
-
fmt.Println("Processing bundles:")
82
-
progress = ui.NewProgressBar(len(files))
83
-
84
-
progressCallback = func(current, total int, bytesProcessed int64) {
85
-
progress.SetWithBytes(current, bytesProcessed)
86
-
}
87
-
}
88
-
89
-
// Use parallel scan
90
-
result, err := mgr.ScanDirectoryParallel(*workers, progressCallback)
91
-
92
-
if err != nil {
93
-
if progress != nil {
94
-
progress.Finish()
95
-
}
96
-
return fmt.Errorf("rebuild failed: %w", err)
97
-
}
98
-
99
-
if progress != nil {
100
-
progress.Finish()
101
-
}
102
-
103
-
elapsed := time.Since(start)
104
-
105
-
fmt.Printf("\n✓ Index rebuilt in %s\n", elapsed.Round(time.Millisecond))
106
-
fmt.Printf(" Total bundles: %d\n", result.BundleCount)
107
-
fmt.Printf(" Compressed size: %s\n", formatBytes(result.TotalSize))
108
-
fmt.Printf(" Uncompressed size: %s\n", formatBytes(result.TotalUncompressed))
109
-
110
-
if result.TotalUncompressed > 0 {
111
-
ratio := float64(result.TotalUncompressed) / float64(result.TotalSize)
112
-
fmt.Printf(" Compression ratio: %.2fx\n", ratio)
113
-
}
114
-
115
-
fmt.Printf(" Average speed: %.1f bundles/sec\n", float64(result.BundleCount)/elapsed.Seconds())
116
-
117
-
if elapsed.Seconds() > 0 {
118
-
compressedThroughput := float64(result.TotalSize) / elapsed.Seconds() / (1000 * 1000)
119
-
uncompressedThroughput := float64(result.TotalUncompressed) / elapsed.Seconds() / (1000 * 1000)
120
-
fmt.Printf(" Throughput (compressed): %.1f MB/s\n", compressedThroughput)
121
-
fmt.Printf(" Throughput (uncompressed): %.1f MB/s\n", uncompressedThroughput)
122
-
}
123
-
124
-
fmt.Printf(" Index file: %s\n", filepath.Join(dir, bundleindex.INDEX_FILE))
125
-
126
-
if len(result.MissingGaps) > 0 {
127
-
fmt.Printf(" ⚠️ Missing gaps: %d bundles\n", len(result.MissingGaps))
128
-
}
129
-
130
-
// Verify chain if verbose
131
-
if *verbose {
132
-
fmt.Printf("\nVerifying chain integrity...\n")
133
-
134
-
ctx := context.Background()
135
-
verifyResult, err := mgr.VerifyChain(ctx)
136
-
if err != nil {
137
-
fmt.Printf(" ⚠️ Verification error: %v\n", err)
138
-
} else if verifyResult.Valid {
139
-
fmt.Printf(" ✓ Chain is valid (%d bundles verified)\n", len(verifyResult.VerifiedBundles))
140
-
141
-
// Show head hash
142
-
index := mgr.GetIndex()
143
-
if lastMeta := index.GetLastBundle(); lastMeta != nil {
144
-
fmt.Printf(" Chain head: %s...\n", lastMeta.Hash[:16])
145
-
}
146
-
} else {
147
-
fmt.Printf(" ✗ Chain verification failed\n")
148
-
fmt.Printf(" Broken at: bundle %06d\n", verifyResult.BrokenAt)
149
-
fmt.Printf(" Error: %s\n", verifyResult.Error)
150
-
}
151
-
}
152
-
153
-
return nil
154
-
}
155
-
156
-
func filterBundleFiles(files []string) []string {
157
-
filtered := make([]string, 0, len(files))
158
-
for _, file := range files {
159
-
basename := filepath.Base(file)
160
-
if len(basename) > 0 && (basename[0] == '.' || basename[0] == '_') {
161
-
continue
162
-
}
163
-
filtered = append(filtered, file)
164
-
}
165
-
return filtered
166
-
}
+2
-2
cmd/plcbundle/main.go
+2
-2
cmd/plcbundle/main.go
···
63
63
cmd.AddCommand(commands.NewDIDCommand())
64
64
cmd.AddCommand(commands.NewIndexCommand())
65
65
cmd.AddCommand(commands.NewMempoolCommand())
66
-
/*cmd.AddCommand(commands.NewDetectorCommand())
66
+
cmd.AddCommand(commands.NewDetectorCommand())
67
67
68
68
// Monitoring & maintenance
69
-
cmd.AddCommand(commands.NewWatchCommand())
69
+
/*cmd.AddCommand(commands.NewWatchCommand())
70
70
cmd.AddCommand(commands.NewHealCommand())
71
71
cmd.AddCommand(commands.NewCleanCommand())*/
72
72
+6
detector/script.go
+6
detector/script.go
···
10
10
"os/exec"
11
11
"path/filepath"
12
12
"strings"
13
+
"sync"
13
14
"time"
14
15
15
16
"github.com/goccy/go-json"
···
25
26
conn net.Conn
26
27
writer *bufio.Writer
27
28
reader *bufio.Reader
29
+
mu sync.Mutex
28
30
}
29
31
30
32
func NewScriptDetector(scriptPath string) (*ScriptDetector, error) {
···
143
145
if d.conn == nil {
144
146
return nil, fmt.Errorf("not connected to server")
145
147
}
148
+
149
+
// ✨ LOCK for entire socket communication
150
+
d.mu.Lock()
151
+
defer d.mu.Unlock()
146
152
147
153
// Use RawJSON directly
148
154
data := op.RawJSON
+4
-2
go.mod
+4
-2
go.mod
···
11
11
12
12
require (
13
13
github.com/spf13/cobra v1.10.1
14
-
github.com/spf13/pflag v1.0.10
15
14
golang.org/x/term v0.36.0
16
15
)
17
16
18
-
require github.com/inconshreveable/mousetrap v1.1.0 // indirect
17
+
require (
18
+
github.com/inconshreveable/mousetrap v1.1.0 // indirect
19
+
github.com/spf13/pflag v1.0.9 // indirect
20
+
)
+1
-2
go.sum
+1
-2
go.sum
···
10
10
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
11
11
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
12
12
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
13
+
github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
13
14
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
14
-
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
15
-
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
16
15
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
17
16
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
18
17
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=