wip
1package plc
2
3import (
4 "context"
5 "fmt"
6 "strings"
7 "time"
8
9 "github.com/atscan/atscand/internal/config"
10 "github.com/atscan/atscand/internal/log"
11 "github.com/atscan/atscand/internal/storage"
12)
13
14type Scanner struct {
15 bundleManager *BundleManager
16 db storage.Database
17 config config.PLCConfig
18}
19
20func NewScanner(db storage.Database, cfg config.PLCConfig, bundleManager *BundleManager) *Scanner {
21 log.Verbose("NewScanner: IndexDIDs config = %v", cfg.IndexDIDs)
22
23 return &Scanner{
24 bundleManager: bundleManager, // Use provided instance
25 db: db,
26 config: cfg,
27 }
28}
29
30func (s *Scanner) Close() {
31 // Don't close bundleManager here - it's shared
32}
33
34func (s *Scanner) Scan(ctx context.Context) error {
35 log.Info("Starting PLC directory scan...")
36
37 cursor, err := s.db.GetScanCursor(ctx, "plc_directory")
38 if err != nil {
39 return fmt.Errorf("failed to get scan cursor: %w", err)
40 }
41
42 metrics := newMetrics(cursor.LastBundleNumber + 1)
43
44 // Main processing loop
45 for {
46 if err := ctx.Err(); err != nil {
47 return err
48 }
49
50 // Fetch and save bundle (library handles mempool internally)
51 bundle, err := s.bundleManager.FetchAndSaveBundle(ctx)
52 if err != nil {
53 if isInsufficientOpsError(err) {
54 // Show mempool status
55 stats := s.bundleManager.libManager.GetMempoolStats()
56 mempoolCount := stats["count"].(int)
57
58 if mempoolCount > 0 {
59 log.Info("→ Waiting for more operations (mempool has %d/%d ops)",
60 mempoolCount, BUNDLE_SIZE)
61 } else {
62 log.Info("→ Caught up! No operations available")
63 }
64 break
65 }
66
67 if strings.Contains(err.Error(), "rate limited") {
68 log.Info("⚠ Rate limited, pausing for 5 minutes...")
69 time.Sleep(5 * time.Minute)
70 continue
71 }
72
73 return fmt.Errorf("failed to fetch bundle: %w", err)
74 }
75
76 // Process operations for endpoint discovery
77 counts, err := s.processBatch(ctx, bundle.Operations)
78 if err != nil {
79 log.Error("Failed to process batch: %v", err)
80 // Continue anyway
81 }
82
83 // Update metrics
84 s.mergeCounts(metrics.endpointCounts, counts)
85 metrics.totalProcessed += int64(len(bundle.Operations))
86 metrics.newEndpoints += sumCounts(counts)
87 metrics.currentBundle = bundle.BundleNumber
88
89 log.Info("✓ Processed bundle %06d: %d operations, %d new endpoints",
90 bundle.BundleNumber, len(bundle.Operations), sumCounts(counts))
91
92 // Update cursor
93 if err := s.updateCursorForBundle(ctx, bundle.BundleNumber, metrics.totalProcessed); err != nil {
94 log.Error("Warning: failed to update cursor: %v", err)
95 }
96 }
97
98 // Show final mempool status
99 stats := s.bundleManager.libManager.GetMempoolStats()
100 if count, ok := stats["count"].(int); ok && count > 0 {
101 log.Info("Mempool contains %d operations (%.1f%% of next bundle)",
102 count, float64(count)/float64(BUNDLE_SIZE)*100)
103 }
104
105 metrics.logSummary()
106 return nil
107}
108
109// processBatch extracts endpoints from operations
110func (s *Scanner) processBatch(ctx context.Context, ops []PLCOperation) (map[string]int64, error) {
111 counts := make(map[string]int64)
112 seen := make(map[string]*PLCOperation)
113
114 // Collect unique endpoints
115 for i := range ops {
116 op := &ops[i]
117
118 if op.IsNullified() {
119 continue
120 }
121
122 for _, ep := range s.extractEndpointsFromOperation(*op) {
123 key := fmt.Sprintf("%s:%s", ep.Type, ep.Endpoint)
124 if _, exists := seen[key]; !exists {
125 seen[key] = op
126 }
127 }
128 }
129
130 // Store new endpoints
131 for key, firstOp := range seen {
132 parts := strings.SplitN(key, ":", 2)
133 epType, endpoint := parts[0], parts[1]
134
135 exists, err := s.db.EndpointExists(ctx, endpoint, epType)
136 if err != nil || exists {
137 continue
138 }
139
140 if err := s.storeEndpoint(ctx, epType, endpoint, firstOp.CreatedAt); err != nil {
141 log.Error("Error storing %s endpoint %s: %v", epType, endpoint, err)
142 continue
143 }
144
145 log.Info("✓ Discovered new %s endpoint: %s", epType, endpoint)
146 counts[epType]++
147 }
148
149 return counts, nil
150}
151
152func (s *Scanner) extractEndpointsFromOperation(op PLCOperation) []EndpointInfo {
153 var endpoints []EndpointInfo
154
155 services, ok := op.Operation["services"].(map[string]interface{})
156 if !ok {
157 return endpoints
158 }
159
160 // Extract PDS
161 if ep := s.extractServiceEndpoint(services, "atproto_pds", "AtprotoPersonalDataServer", "pds"); ep != nil {
162 endpoints = append(endpoints, *ep)
163 }
164
165 // Extract Labeler
166 if ep := s.extractServiceEndpoint(services, "atproto_labeler", "AtprotoLabeler", "labeler"); ep != nil {
167 endpoints = append(endpoints, *ep)
168 }
169
170 return endpoints
171}
172
173func (s *Scanner) extractServiceEndpoint(services map[string]interface{}, serviceKey, expectedType, resultType string) *EndpointInfo {
174 svc, ok := services[serviceKey].(map[string]interface{})
175 if !ok {
176 return nil
177 }
178
179 endpoint, hasEndpoint := svc["endpoint"].(string)
180 svcType, hasType := svc["type"].(string)
181
182 if hasEndpoint && hasType && svcType == expectedType {
183 return &EndpointInfo{
184 Type: resultType,
185 Endpoint: endpoint,
186 }
187 }
188
189 return nil
190}
191
192func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error {
193 valid := validateEndpoint(endpoint)
194 return s.db.UpsertEndpoint(ctx, &storage.Endpoint{
195 EndpointType: epType,
196 Endpoint: endpoint,
197 DiscoveredAt: discoveredAt,
198 LastChecked: time.Time{},
199 Status: storage.EndpointStatusUnknown,
200 Valid: valid,
201 })
202}
203
204func (s *Scanner) updateCursorForBundle(ctx context.Context, bundle int, totalProcessed int64) error {
205 return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
206 Source: "plc_directory",
207 LastBundleNumber: bundle,
208 LastScanTime: time.Now().UTC(),
209 RecordsProcessed: totalProcessed,
210 })
211}
212
213// Helper functions
214func (s *Scanner) mergeCounts(dest, src map[string]int64) {
215 for k, v := range src {
216 dest[k] += v
217 }
218}
219
220func sumCounts(counts map[string]int64) int64 {
221 total := int64(0)
222 for _, v := range counts {
223 total += v
224 }
225 return total
226}
227
228func isInsufficientOpsError(err error) bool {
229 return err != nil && strings.Contains(err.Error(), "insufficient operations")
230}
231
232// ScanMetrics tracks scan progress
233type ScanMetrics struct {
234 totalProcessed int64
235 newEndpoints int64
236 endpointCounts map[string]int64
237 currentBundle int
238 startTime time.Time
239}
240
241func newMetrics(startBundle int) *ScanMetrics {
242 return &ScanMetrics{
243 endpointCounts: make(map[string]int64),
244 currentBundle: startBundle,
245 startTime: time.Now(),
246 }
247}
248
249func (m *ScanMetrics) logSummary() {
250 if m.newEndpoints > 0 {
251 log.Info("PLC scan completed: %d operations processed, %d new endpoints in %v",
252 m.totalProcessed, m.newEndpoints, time.Since(m.startTime))
253 } else {
254 log.Info("PLC scan completed: %d operations processed, 0 new endpoints in %v",
255 m.totalProcessed, time.Since(m.startTime))
256 }
257}