update

Changed files
+66 -29
internal
+66 -29
internal/plc/scanner.go
··· 69 69 } 70 70 71 71 totalProcessed := int64(0) 72 - newPDSCount := int64(0) 72 + newEndpointCounts := make(map[string]int64) // ✅ Changed from newPDSCount 73 73 74 74 // ✅ CHECK MEMPOOL FIRST - if it has data, continue filling it instead of fetching new bundle 75 75 mempoolCount, err := s.db.GetMempoolCount(ctx) ··· 81 81 log.Info("→ Mempool has %d operations, continuing to fill it before fetching new bundles", mempoolCount) 82 82 83 83 // Fill mempool until we have 10,000 84 - if err := s.fillMempoolToSize(ctx, &newPDSCount, &totalProcessed); err != nil { 84 + if err := s.fillMempoolToSize(ctx, newEndpointCounts, &totalProcessed); err != nil { 85 85 log.Error("Error filling mempool: %v", err) 86 86 return err 87 87 } 88 88 89 89 // Try to create bundles from mempool 90 - if err := s.processMempoolRecursive(ctx, &newPDSCount, &currentBundle, &totalProcessed); err != nil { 90 + if err := s.processMempoolRecursive(ctx, newEndpointCounts, &currentBundle, &totalProcessed); err != nil { 91 91 log.Error("Error processing mempool: %v", err) 92 92 } 93 93 94 - log.Info("PLC scan completed: %d operations, %d new PDS servers in %v", 95 - totalProcessed, newPDSCount, time.Since(startTime)) 94 + endpointSummary := formatEndpointCounts(newEndpointCounts) 95 + log.Info("PLC scan completed: %d operations, %s in %v", 96 + totalProcessed, endpointSummary, time.Since(startTime)) 96 97 return nil 97 98 } 98 99 ··· 122 123 if currentBundle > 1 { 123 124 log.Info("→ Reached end of available data") 124 125 // Try mempool processing 125 - if err := s.processMempoolRecursive(ctx, &newPDSCount, &currentBundle, &totalProcessed); err != nil { 126 + if err := s.processMempoolRecursive(ctx, newEndpointCounts, &currentBundle, &totalProcessed); err != nil { 126 127 log.Error("Error processing mempool: %v", err) 127 128 } 128 129 } ··· 131 132 132 133 if isComplete { 133 134 // Complete bundle 134 - batchNewPDS, err := s.processBatch(ctx, operations) 135 + batchCounts, err := s.processBatch(ctx, operations) 135 136 if err != nil { 136 137 log.Error("Error processing bundle: %v", err) 137 138 } 138 - 139 - newPDSCount += batchNewPDS 139 + for typ, count := range batchCounts { 140 + newEndpointCounts[typ] += count 141 + } 140 142 totalProcessed += int64(len(operations)) 141 143 142 - log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new PDS", 143 - currentBundle, len(operations), batchNewPDS) 144 + // Calculate total for this batch for logging 145 + batchTotal := int64(0) 146 + for _, count := range batchCounts { 147 + batchTotal += count 148 + } 149 + log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new endpoints", 150 + currentBundle, len(operations), batchTotal) 144 151 145 152 // Update cursor 146 153 if err := s.db.UpdateScanCursor(ctx, &storage.ScanCursor{ ··· 157 164 // Incomplete bundle - we've reached the end of available data 158 165 log.Info("→ Bundle %06d incomplete (%d ops), adding to mempool", currentBundle, len(operations)) 159 166 160 - if err := s.addToMempool(ctx, operations); err != nil { 167 + if err := s.addToMempool(ctx, operations, newEndpointCounts); err != nil { 161 168 log.Error("Error adding to mempool: %v", err) 162 169 } 163 170 164 171 // ✅ Now fill mempool to 10,000 165 - if err := s.fillMempoolToSize(ctx, &newPDSCount, &totalProcessed); err != nil { 172 + if err := s.fillMempoolToSize(ctx, newEndpointCounts, &totalProcessed); err != nil { 166 173 log.Error("Error filling mempool: %v", err) 167 174 } 168 175 169 176 // Process mempool 170 - if err := s.processMempoolRecursive(ctx, &newPDSCount, &currentBundle, &totalProcessed); err != nil { 177 + if err := s.processMempoolRecursive(ctx, newEndpointCounts, &currentBundle, &totalProcessed); err != nil { 171 178 log.Error("Error processing mempool: %v", err) 172 179 } 173 180 ··· 175 182 } 176 183 } 177 184 178 - log.Info("PLC scan completed: %d operations, %d new PDS servers in %v", 179 - totalProcessed, newPDSCount, time.Since(startTime)) 185 + endpointSummary := formatEndpointCounts(newEndpointCounts) 186 + log.Info("PLC scan completed: %d operations, %s in %v", 187 + totalProcessed, endpointSummary, time.Since(startTime)) 180 188 181 189 return nil 182 190 } 183 191 184 - func (s *Scanner) fillMempoolToSize(ctx context.Context, newPDSCount *int64, totalProcessed *int64) error { 192 + func (s *Scanner) fillMempoolToSize(ctx context.Context, newEndpointCounts map[string]int64, totalProcessed *int64) error { 185 193 const fetchLimit = 1000 // PLC directory limit 186 194 187 195 for { ··· 228 236 } 229 237 230 238 // Add to mempool (with duplicate checking) 231 - if err := s.addToMempool(ctx, operations); err != nil { 239 + if err := s.addToMempool(ctx, operations, newEndpointCounts); err != nil { 232 240 return err 233 241 } 234 242 ··· 260 268 } 261 269 } 262 270 263 - // addToMempool adds operations to mempool and processes them for PDS discovery 264 - func (s *Scanner) addToMempool(ctx context.Context, operations []PLCOperation) error { 271 + // addToMempool adds operations to mempool and processes them for endpoint discovery 272 + func (s *Scanner) addToMempool(ctx context.Context, operations []PLCOperation, newEndpointCounts map[string]int64) error { 265 273 mempoolOps := make([]storage.MempoolOperation, len(operations)) 266 274 267 275 for i, op := range operations { ··· 279 287 return err 280 288 } 281 289 282 - // Process for PDS discovery immediately 283 - _, err := s.processBatch(ctx, operations) 290 + // Process for endpoint discovery immediately 291 + batchCounts, err := s.processBatch(ctx, operations) 292 + for typ, count := range batchCounts { 293 + newEndpointCounts[typ] += count 294 + } 284 295 return err 285 296 } 286 297 287 298 // processMempoolRecursive checks mempool and creates bundles when >= 1000 ops 288 - func (s *Scanner) processMempoolRecursive(ctx context.Context, newPDSCount *int64, currentBundle *int, totalProcessed *int64) error { 299 + func (s *Scanner) processMempoolRecursive(ctx context.Context, newEndpointCounts map[string]int64, currentBundle *int, totalProcessed *int64) error { 289 300 for { 290 301 // Check mempool size 291 302 count, err := s.db.GetMempoolCount(ctx) ··· 353 364 } 354 365 355 366 // Process for PDS 356 - batchNewPDS, _ := s.processBatch(ctx, operations) 357 - *newPDSCount += batchNewPDS 367 + batchCounts, _ := s.processBatch(ctx, operations) 368 + for typ, count := range batchCounts { 369 + newEndpointCounts[typ] += count 370 + } 358 371 359 372 *currentBundle = bundleNum 360 373 ··· 373 386 } 374 387 375 388 // processBatch processes operations for PDS discovery 376 - func (s *Scanner) processBatch(ctx context.Context, operations []PLCOperation) (int64, error) { 377 - newEndpointCount := int64(0) 389 + func (s *Scanner) processBatch(ctx context.Context, operations []PLCOperation) (map[string]int64, error) { 390 + endpointCounts := make(map[string]int64) // Track by type 378 391 seenInBatch := make(map[string]*PLCOperation) // key: "type:endpoint" 379 392 380 393 for _, op := range operations { ··· 413 426 } 414 427 415 428 log.Info("✓ Discovered new %s endpoint: %s", endpointType, stripansi.Strip(endpoint)) 416 - newEndpointCount++ 429 + endpointCounts[endpointType]++ 417 430 } 418 431 419 - return newEndpointCount, nil 432 + return endpointCounts, nil 420 433 } 421 434 422 435 // extractEndpointsFromOperation extracts ALL service endpoints ··· 461 474 func contains(s, substr string) bool { 462 475 return len(s) >= len(substr) && s[:len(substr)] == substr 463 476 } 477 + 478 + func formatEndpointCounts(counts map[string]int64) string { 479 + if len(counts) == 0 { 480 + return "0 new endpoints" 481 + } 482 + 483 + total := int64(0) 484 + for _, count := range counts { 485 + total += count 486 + } 487 + 488 + if len(counts) == 1 { 489 + for typ, count := range counts { 490 + return fmt.Sprintf("%d new %s endpoint(s)", count, typ) 491 + } 492 + } 493 + 494 + // Multiple types 495 + parts := make([]string, 0, len(counts)) 496 + for typ, count := range counts { 497 + parts = append(parts, fmt.Sprintf("%d %s", count, typ)) 498 + } 499 + return fmt.Sprintf("%d new endpoints (%s)", total, strings.Join(parts, ", ")) 500 + }