A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 216 lines 4.8 kB view raw
1// detector/runner.go 2package detector 3 4import ( 5 "context" 6 "fmt" 7 "sync" 8 "time" 9 10 "tangled.org/atscan.net/plcbundle/bundle" 11 "tangled.org/atscan.net/plcbundle/plc" 12) 13 14// Runner executes detectors against operations 15type Runner struct { 16 registry *Registry 17 config *Config 18 logger Logger 19} 20 21type Logger interface { 22 Printf(format string, v ...interface{}) 23} 24 25// NewRunner creates a new detector runner 26func NewRunner(registry *Registry, config *Config, logger Logger) *Runner { 27 if config == nil { 28 config = DefaultConfig() 29 } 30 return &Runner{ 31 registry: registry, 32 config: config, 33 logger: logger, 34 } 35} 36 37// RunOnBundle runs detector(s) on all operations in a bundle 38func (r *Runner) RunOnBundle(ctx context.Context, detectorName string, b *bundle.Bundle) ([]*Result, error) { 39 detector, err := r.registry.Get(detectorName) 40 if err != nil { 41 return nil, err 42 } 43 44 var results []*Result 45 46 if r.config.Parallel { 47 results = r.runParallel(ctx, detector, b) 48 } else { 49 results = r.runSequential(ctx, detector, b) 50 } 51 52 // Filter by minimum confidence 53 filtered := make([]*Result, 0) 54 for _, res := range results { 55 if res.Match != nil && res.Match.Confidence >= r.config.MinConfidence { 56 filtered = append(filtered, res) 57 } 58 } 59 60 return filtered, nil 61} 62 63func (r *Runner) runSequential(ctx context.Context, detector Detector, b *bundle.Bundle) []*Result { 64 results := make([]*Result, 0) 65 66 for pos, op := range b.Operations { 67 select { 68 case <-ctx.Done(): 69 return results 70 default: 71 } 72 73 result := r.detectOne(ctx, detector, b.BundleNumber, pos, op) 74 if result.Match != nil || result.Error != nil { 75 results = append(results, result) 76 } 77 } 78 79 return results 80} 81 82func (r *Runner) runParallel(ctx context.Context, detector Detector, b *bundle.Bundle) []*Result { 83 type job struct { 84 pos int 85 op plc.PLCOperation 86 } 87 88 jobs := make(chan job, len(b.Operations)) 89 resultsChan := make(chan *Result, len(b.Operations)) 90 91 // Start workers 92 var wg sync.WaitGroup 93 for i := 0; i < r.config.Workers; i++ { 94 wg.Add(1) 95 go func() { 96 defer wg.Done() 97 for j := range jobs { 98 select { 99 case <-ctx.Done(): 100 return 101 default: 102 } 103 104 result := r.detectOne(ctx, detector, b.BundleNumber, j.pos, j.op) 105 if result.Match != nil || result.Error != nil { 106 resultsChan <- result 107 } 108 } 109 }() 110 } 111 112 // Send jobs 113 for pos, op := range b.Operations { 114 jobs <- job{pos: pos, op: op} 115 } 116 close(jobs) 117 118 // Wait for completion 119 go func() { 120 wg.Wait() 121 close(resultsChan) 122 }() 123 124 // Collect results 125 results := make([]*Result, 0) 126 for result := range resultsChan { 127 results = append(results, result) 128 } 129 130 return results 131} 132 133func (r *Runner) detectOne(ctx context.Context, detector Detector, bundleNum, pos int, op plc.PLCOperation) *Result { 134 // Create timeout context 135 detectCtx, cancel := context.WithTimeout(ctx, r.config.Timeout) 136 defer cancel() 137 138 result := &Result{ 139 BundleNumber: bundleNum, 140 Position: pos, 141 DID: op.DID, 142 CID: op.CID, // ← Add this 143 DetectorName: detector.Name(), 144 DetectedAt: time.Now(), 145 } 146 147 match, err := detector.Detect(detectCtx, op) 148 result.Match = match 149 result.Error = err 150 151 return result 152} 153 154// RunMultipleDetectors runs multiple detectors on a bundle 155func (r *Runner) RunMultipleDetectors(ctx context.Context, detectorNames []string, b *bundle.Bundle) (map[string][]*Result, error) { 156 allResults := make(map[string][]*Result) 157 158 for _, name := range detectorNames { 159 results, err := r.RunOnBundle(ctx, name, b) 160 if err != nil { 161 return nil, fmt.Errorf("detector %s failed: %w", name, err) 162 } 163 allResults[name] = results 164 } 165 166 return allResults, nil 167} 168 169// Stats represents detection statistics 170type Stats struct { 171 TotalOperations int 172 MatchedCount int 173 MatchRate float64 174 ByReason map[string]int 175 ByCategory map[string]int 176 ByConfidence map[string]int // 0.9-1.0, 0.8-0.9, etc. 177} 178 179// CalculateStats computes statistics from results 180func CalculateStats(results []*Result, totalOps int) *Stats { 181 stats := &Stats{ 182 TotalOperations: totalOps, 183 MatchedCount: len(results), 184 ByReason: make(map[string]int), 185 ByCategory: make(map[string]int), 186 ByConfidence: make(map[string]int), 187 } 188 189 if totalOps > 0 { 190 stats.MatchRate = float64(len(results)) / float64(totalOps) 191 } 192 193 for _, res := range results { 194 if res.Match == nil { 195 continue 196 } 197 198 stats.ByReason[res.Match.Reason]++ 199 stats.ByCategory[res.Match.Category]++ 200 201 // Confidence buckets 202 conf := res.Match.Confidence 203 switch { 204 case conf >= 0.95: 205 stats.ByConfidence["0.95-1.00"]++ 206 case conf >= 0.90: 207 stats.ByConfidence["0.90-0.95"]++ 208 case conf >= 0.85: 209 stats.ByConfidence["0.85-0.90"]++ 210 default: 211 stats.ByConfidence["0.00-0.85"]++ 212 } 213 } 214 215 return stats 216}