A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1// detector/runner.go
2package detector
3
4import (
5 "context"
6 "fmt"
7 "sync"
8 "time"
9
10 "tangled.org/atscan.net/plcbundle/bundle"
11 "tangled.org/atscan.net/plcbundle/plc"
12)
13
14// Runner executes detectors against operations
15type Runner struct {
16 registry *Registry
17 config *Config
18 logger Logger
19}
20
21type Logger interface {
22 Printf(format string, v ...interface{})
23}
24
25// NewRunner creates a new detector runner
26func NewRunner(registry *Registry, config *Config, logger Logger) *Runner {
27 if config == nil {
28 config = DefaultConfig()
29 }
30 return &Runner{
31 registry: registry,
32 config: config,
33 logger: logger,
34 }
35}
36
37// RunOnBundle runs detector(s) on all operations in a bundle
38func (r *Runner) RunOnBundle(ctx context.Context, detectorName string, b *bundle.Bundle) ([]*Result, error) {
39 detector, err := r.registry.Get(detectorName)
40 if err != nil {
41 return nil, err
42 }
43
44 var results []*Result
45
46 if r.config.Parallel {
47 results = r.runParallel(ctx, detector, b)
48 } else {
49 results = r.runSequential(ctx, detector, b)
50 }
51
52 // Filter by minimum confidence
53 filtered := make([]*Result, 0)
54 for _, res := range results {
55 if res.Match != nil && res.Match.Confidence >= r.config.MinConfidence {
56 filtered = append(filtered, res)
57 }
58 }
59
60 return filtered, nil
61}
62
63func (r *Runner) runSequential(ctx context.Context, detector Detector, b *bundle.Bundle) []*Result {
64 results := make([]*Result, 0)
65
66 for pos, op := range b.Operations {
67 select {
68 case <-ctx.Done():
69 return results
70 default:
71 }
72
73 result := r.detectOne(ctx, detector, b.BundleNumber, pos, op)
74 if result.Match != nil || result.Error != nil {
75 results = append(results, result)
76 }
77 }
78
79 return results
80}
81
82func (r *Runner) runParallel(ctx context.Context, detector Detector, b *bundle.Bundle) []*Result {
83 type job struct {
84 pos int
85 op plc.PLCOperation
86 }
87
88 jobs := make(chan job, len(b.Operations))
89 resultsChan := make(chan *Result, len(b.Operations))
90
91 // Start workers
92 var wg sync.WaitGroup
93 for i := 0; i < r.config.Workers; i++ {
94 wg.Add(1)
95 go func() {
96 defer wg.Done()
97 for j := range jobs {
98 select {
99 case <-ctx.Done():
100 return
101 default:
102 }
103
104 result := r.detectOne(ctx, detector, b.BundleNumber, j.pos, j.op)
105 if result.Match != nil || result.Error != nil {
106 resultsChan <- result
107 }
108 }
109 }()
110 }
111
112 // Send jobs
113 for pos, op := range b.Operations {
114 jobs <- job{pos: pos, op: op}
115 }
116 close(jobs)
117
118 // Wait for completion
119 go func() {
120 wg.Wait()
121 close(resultsChan)
122 }()
123
124 // Collect results
125 results := make([]*Result, 0)
126 for result := range resultsChan {
127 results = append(results, result)
128 }
129
130 return results
131}
132
133func (r *Runner) detectOne(ctx context.Context, detector Detector, bundleNum, pos int, op plc.PLCOperation) *Result {
134 // Create timeout context
135 detectCtx, cancel := context.WithTimeout(ctx, r.config.Timeout)
136 defer cancel()
137
138 result := &Result{
139 BundleNumber: bundleNum,
140 Position: pos,
141 DID: op.DID,
142 CID: op.CID, // ← Add this
143 DetectorName: detector.Name(),
144 DetectedAt: time.Now(),
145 }
146
147 match, err := detector.Detect(detectCtx, op)
148 result.Match = match
149 result.Error = err
150
151 return result
152}
153
154// RunMultipleDetectors runs multiple detectors on a bundle
155func (r *Runner) RunMultipleDetectors(ctx context.Context, detectorNames []string, b *bundle.Bundle) (map[string][]*Result, error) {
156 allResults := make(map[string][]*Result)
157
158 for _, name := range detectorNames {
159 results, err := r.RunOnBundle(ctx, name, b)
160 if err != nil {
161 return nil, fmt.Errorf("detector %s failed: %w", name, err)
162 }
163 allResults[name] = results
164 }
165
166 return allResults, nil
167}
168
169// Stats represents detection statistics
170type Stats struct {
171 TotalOperations int
172 MatchedCount int
173 MatchRate float64
174 ByReason map[string]int
175 ByCategory map[string]int
176 ByConfidence map[string]int // 0.9-1.0, 0.8-0.9, etc.
177}
178
179// CalculateStats computes statistics from results
180func CalculateStats(results []*Result, totalOps int) *Stats {
181 stats := &Stats{
182 TotalOperations: totalOps,
183 MatchedCount: len(results),
184 ByReason: make(map[string]int),
185 ByCategory: make(map[string]int),
186 ByConfidence: make(map[string]int),
187 }
188
189 if totalOps > 0 {
190 stats.MatchRate = float64(len(results)) / float64(totalOps)
191 }
192
193 for _, res := range results {
194 if res.Match == nil {
195 continue
196 }
197
198 stats.ByReason[res.Match.Reason]++
199 stats.ByCategory[res.Match.Category]++
200
201 // Confidence buckets
202 conf := res.Match.Confidence
203 switch {
204 case conf >= 0.95:
205 stats.ByConfidence["0.95-1.00"]++
206 case conf >= 0.90:
207 stats.ByConfidence["0.90-0.95"]++
208 case conf >= 0.85:
209 stats.ByConfidence["0.85-0.90"]++
210 default:
211 stats.ByConfidence["0.00-0.85"]++
212 }
213 }
214
215 return stats
216}