at main 6.7 kB view raw
1package plc 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/atscan/atscand/internal/config" 10 "github.com/atscan/atscand/internal/log" 11 "github.com/atscan/atscand/internal/storage" 12) 13 14type Scanner struct { 15 bundleManager *BundleManager 16 db storage.Database 17 config config.PLCConfig 18} 19 20func NewScanner(db storage.Database, cfg config.PLCConfig, bundleManager *BundleManager) *Scanner { 21 log.Verbose("NewScanner: IndexDIDs config = %v", cfg.IndexDIDs) 22 23 return &Scanner{ 24 bundleManager: bundleManager, // Use provided instance 25 db: db, 26 config: cfg, 27 } 28} 29 30func (s *Scanner) Close() { 31 // Don't close bundleManager here - it's shared 32} 33 34func (s *Scanner) Scan(ctx context.Context) error { 35 log.Info("Starting PLC directory scan...") 36 37 cursor, err := s.db.GetScanCursor(ctx, "plc_directory") 38 if err != nil { 39 return fmt.Errorf("failed to get scan cursor: %w", err) 40 } 41 42 metrics := newMetrics(cursor.LastBundleNumber + 1) 43 44 // Main processing loop 45 for { 46 if err := ctx.Err(); err != nil { 47 return err 48 } 49 50 // Fetch and save bundle (library handles mempool internally) 51 bundle, err := s.bundleManager.FetchAndSaveBundle(ctx) 52 if err != nil { 53 if isInsufficientOpsError(err) { 54 // Show mempool status 55 stats := s.bundleManager.libManager.GetMempoolStats() 56 mempoolCount := stats["count"].(int) 57 58 if mempoolCount > 0 { 59 log.Info("→ Waiting for more operations (mempool has %d/%d ops)", 60 mempoolCount, BUNDLE_SIZE) 61 } else { 62 log.Info("→ Caught up! No operations available") 63 } 64 break 65 } 66 67 if strings.Contains(err.Error(), "rate limited") { 68 log.Info("⚠ Rate limited, pausing for 5 minutes...") 69 time.Sleep(5 * time.Minute) 70 continue 71 } 72 73 return fmt.Errorf("failed to fetch bundle: %w", err) 74 } 75 76 // Process operations for endpoint discovery 77 counts, err := s.processBatch(ctx, bundle.Operations) 78 if err != nil { 79 log.Error("Failed to process batch: %v", err) 80 // Continue anyway 81 } 82 83 // Update metrics 84 s.mergeCounts(metrics.endpointCounts, counts) 85 metrics.totalProcessed += int64(len(bundle.Operations)) 86 metrics.newEndpoints += sumCounts(counts) 87 metrics.currentBundle = bundle.BundleNumber 88 89 log.Info("✓ Processed bundle %06d: %d operations, %d new endpoints", 90 bundle.BundleNumber, len(bundle.Operations), sumCounts(counts)) 91 92 // Update cursor 93 if err := s.updateCursorForBundle(ctx, bundle.BundleNumber, metrics.totalProcessed); err != nil { 94 log.Error("Warning: failed to update cursor: %v", err) 95 } 96 } 97 98 // Show final mempool status 99 stats := s.bundleManager.libManager.GetMempoolStats() 100 if count, ok := stats["count"].(int); ok && count > 0 { 101 log.Info("Mempool contains %d operations (%.1f%% of next bundle)", 102 count, float64(count)/float64(BUNDLE_SIZE)*100) 103 } 104 105 metrics.logSummary() 106 return nil 107} 108 109// processBatch extracts endpoints from operations 110func (s *Scanner) processBatch(ctx context.Context, ops []PLCOperation) (map[string]int64, error) { 111 counts := make(map[string]int64) 112 seen := make(map[string]*PLCOperation) 113 114 // Collect unique endpoints 115 for i := range ops { 116 op := &ops[i] 117 118 if op.IsNullified() { 119 continue 120 } 121 122 for _, ep := range s.extractEndpointsFromOperation(*op) { 123 key := fmt.Sprintf("%s:%s", ep.Type, ep.Endpoint) 124 if _, exists := seen[key]; !exists { 125 seen[key] = op 126 } 127 } 128 } 129 130 // Store new endpoints 131 for key, firstOp := range seen { 132 parts := strings.SplitN(key, ":", 2) 133 epType, endpoint := parts[0], parts[1] 134 135 exists, err := s.db.EndpointExists(ctx, endpoint, epType) 136 if err != nil || exists { 137 continue 138 } 139 140 if err := s.storeEndpoint(ctx, epType, endpoint, firstOp.CreatedAt); err != nil { 141 log.Error("Error storing %s endpoint %s: %v", epType, endpoint, err) 142 continue 143 } 144 145 log.Info("✓ Discovered new %s endpoint: %s", epType, endpoint) 146 counts[epType]++ 147 } 148 149 return counts, nil 150} 151 152func (s *Scanner) extractEndpointsFromOperation(op PLCOperation) []EndpointInfo { 153 var endpoints []EndpointInfo 154 155 services, ok := op.Operation["services"].(map[string]interface{}) 156 if !ok { 157 return endpoints 158 } 159 160 // Extract PDS 161 if ep := s.extractServiceEndpoint(services, "atproto_pds", "AtprotoPersonalDataServer", "pds"); ep != nil { 162 endpoints = append(endpoints, *ep) 163 } 164 165 // Extract Labeler 166 if ep := s.extractServiceEndpoint(services, "atproto_labeler", "AtprotoLabeler", "labeler"); ep != nil { 167 endpoints = append(endpoints, *ep) 168 } 169 170 return endpoints 171} 172 173func (s *Scanner) extractServiceEndpoint(services map[string]interface{}, serviceKey, expectedType, resultType string) *EndpointInfo { 174 svc, ok := services[serviceKey].(map[string]interface{}) 175 if !ok { 176 return nil 177 } 178 179 endpoint, hasEndpoint := svc["endpoint"].(string) 180 svcType, hasType := svc["type"].(string) 181 182 if hasEndpoint && hasType && svcType == expectedType { 183 return &EndpointInfo{ 184 Type: resultType, 185 Endpoint: endpoint, 186 } 187 } 188 189 return nil 190} 191 192func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error { 193 valid := validateEndpoint(endpoint) 194 return s.db.UpsertEndpoint(ctx, &storage.Endpoint{ 195 EndpointType: epType, 196 Endpoint: endpoint, 197 DiscoveredAt: discoveredAt, 198 LastChecked: time.Time{}, 199 Status: storage.EndpointStatusUnknown, 200 Valid: valid, 201 }) 202} 203 204func (s *Scanner) updateCursorForBundle(ctx context.Context, bundle int, totalProcessed int64) error { 205 return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{ 206 Source: "plc_directory", 207 LastBundleNumber: bundle, 208 LastScanTime: time.Now().UTC(), 209 RecordsProcessed: totalProcessed, 210 }) 211} 212 213// Helper functions 214func (s *Scanner) mergeCounts(dest, src map[string]int64) { 215 for k, v := range src { 216 dest[k] += v 217 } 218} 219 220func sumCounts(counts map[string]int64) int64 { 221 total := int64(0) 222 for _, v := range counts { 223 total += v 224 } 225 return total 226} 227 228func isInsufficientOpsError(err error) bool { 229 return err != nil && strings.Contains(err.Error(), "insufficient operations") 230} 231 232// ScanMetrics tracks scan progress 233type ScanMetrics struct { 234 totalProcessed int64 235 newEndpoints int64 236 endpointCounts map[string]int64 237 currentBundle int 238 startTime time.Time 239} 240 241func newMetrics(startBundle int) *ScanMetrics { 242 return &ScanMetrics{ 243 endpointCounts: make(map[string]int64), 244 currentBundle: startBundle, 245 startTime: time.Now(), 246 } 247} 248 249func (m *ScanMetrics) logSummary() { 250 if m.newEndpoints > 0 { 251 log.Info("PLC scan completed: %d operations processed, %d new endpoints in %v", 252 m.totalProcessed, m.newEndpoints, time.Since(m.startTime)) 253 } else { 254 log.Info("PLC scan completed: %d operations processed, 0 new endpoints in %v", 255 m.totalProcessed, time.Since(m.startTime)) 256 } 257}