+340
-288
internal/plc/scanner.go
+340
-288
internal/plc/scanner.go
···
41
41
}
42
42
}
43
43
44
+
// ScanMetrics tracks scan progress
45
+
type ScanMetrics struct {
46
+
totalProcessed int64
47
+
endpointCounts map[string]int64
48
+
currentBundle int
49
+
startTime time.Time
50
+
}
51
+
52
+
func newMetrics(startBundle int) *ScanMetrics {
53
+
return &ScanMetrics{
54
+
endpointCounts: make(map[string]int64),
55
+
currentBundle: startBundle,
56
+
startTime: time.Now(),
57
+
}
58
+
}
59
+
60
+
func (m *ScanMetrics) logSummary() {
61
+
summary := formatEndpointCounts(m.endpointCounts)
62
+
log.Info("PLC scan completed: %d operations, %s in %v",
63
+
m.totalProcessed, summary, time.Since(m.startTime))
64
+
}
65
+
44
66
func (s *Scanner) Scan(ctx context.Context) error {
45
-
startTime := time.Now()
46
67
log.Info("Starting PLC directory scan...")
47
68
log.Info("⚠ Note: PLC directory has rate limit of 500 requests per 5 minutes")
48
69
···
51
72
return fmt.Errorf("failed to get scan cursor: %w", err)
52
73
}
53
74
54
-
currentBundle := cursor.LastBundleNumber
55
-
if currentBundle == 0 {
56
-
currentBundle = 1
57
-
} else {
58
-
currentBundle++
59
-
}
60
-
61
-
log.Info("Starting from bundle %06d", currentBundle)
75
+
startBundle := s.calculateStartBundle(cursor.LastBundleNumber)
76
+
metrics := newMetrics(startBundle)
62
77
63
-
// Ensure bundle continuity (all previous bundles exist)
64
-
if currentBundle > 1 {
65
-
log.Info("Checking bundle continuity...")
66
-
if err := s.bundleManager.EnsureBundleContinuity(ctx, currentBundle); err != nil {
67
-
return fmt.Errorf("bundle continuity check failed: %w", err)
78
+
if startBundle > 1 {
79
+
if err := s.ensureContinuity(ctx, startBundle); err != nil {
80
+
return err
68
81
}
69
82
}
70
83
71
-
totalProcessed := int64(0)
72
-
newEndpointCounts := make(map[string]int64) // ✅ Changed from newPDSCount
73
-
74
-
// ✅ CHECK MEMPOOL FIRST - if it has data, continue filling it instead of fetching new bundle
75
-
mempoolCount, err := s.db.GetMempoolCount(ctx)
76
-
if err != nil {
77
-
return err
84
+
// Handle existing mempool first
85
+
if hasMempool, _ := s.hasSufficientMempool(ctx); hasMempool {
86
+
return s.handleMempoolOnly(ctx, metrics, cursor)
78
87
}
79
88
80
-
if mempoolCount > 0 {
81
-
log.Info("→ Mempool has %d operations, continuing to fill it before fetching new bundles", mempoolCount)
82
-
83
-
// Fill mempool until we have 10,000
84
-
if err := s.fillMempoolToSize(ctx, newEndpointCounts, &totalProcessed); err != nil {
85
-
log.Error("Error filling mempool: %v", err)
89
+
// Process bundles until incomplete or error
90
+
for {
91
+
if err := ctx.Err(); err != nil {
86
92
return err
87
93
}
88
94
89
-
// Try to create bundles from mempool
90
-
if err := s.processMempoolRecursive(ctx, newEndpointCounts, ¤tBundle, &totalProcessed); err != nil {
91
-
log.Error("Error processing mempool: %v", err)
95
+
if err := s.processSingleBundle(ctx, metrics); err != nil {
96
+
if s.shouldRetry(err) {
97
+
continue
98
+
}
99
+
break
92
100
}
93
101
94
-
endpointSummary := formatEndpointCounts(newEndpointCounts)
95
-
log.Info("PLC scan completed: %d operations, %s in %v",
96
-
totalProcessed, endpointSummary, time.Since(startTime))
97
-
return nil
102
+
if err := s.updateCursor(ctx, cursor, metrics); err != nil {
103
+
log.Error("Warning: failed to update cursor: %v", err)
104
+
}
98
105
}
99
106
100
-
// Process bundles sequentially (normal flow when mempool is empty)
101
-
for {
102
-
select {
103
-
case <-ctx.Done():
104
-
return ctx.Err()
105
-
default:
106
-
}
107
+
// Try to finalize mempool
108
+
s.finalizeMempool(ctx, metrics)
107
109
108
-
log.Verbose("→ Processing bundle %06d...", currentBundle)
110
+
metrics.logSummary()
111
+
return nil
112
+
}
109
113
110
-
// Load bundle (returns operations, isComplete flag, and error)
111
-
operations, isComplete, err := s.bundleManager.LoadBundle(ctx, currentBundle, s.client)
112
-
if err != nil {
113
-
log.Error("Failed to load bundle %06d: %v", currentBundle, err)
114
+
func (s *Scanner) calculateStartBundle(lastBundle int) int {
115
+
if lastBundle == 0 {
116
+
return 1
117
+
}
118
+
return lastBundle + 1
119
+
}
114
120
115
-
// If rate limited, wait and retry
116
-
if contains(err.Error(), "rate limited") {
117
-
log.Info("⚠ Rate limit hit, pausing for 5 minutes...")
118
-
time.Sleep(5 * time.Minute)
119
-
continue
120
-
}
121
+
func (s *Scanner) ensureContinuity(ctx context.Context, bundle int) error {
122
+
log.Info("Checking bundle continuity...")
123
+
if err := s.bundleManager.EnsureBundleContinuity(ctx, bundle); err != nil {
124
+
return fmt.Errorf("bundle continuity check failed: %w", err)
125
+
}
126
+
return nil
127
+
}
121
128
122
-
// Check if this is just end of data
123
-
if currentBundle > 1 {
124
-
log.Info("→ Reached end of available data")
125
-
// Try mempool processing
126
-
if err := s.processMempoolRecursive(ctx, newEndpointCounts, ¤tBundle, &totalProcessed); err != nil {
127
-
log.Error("Error processing mempool: %v", err)
128
-
}
129
-
}
130
-
break
131
-
}
129
+
func (s *Scanner) hasSufficientMempool(ctx context.Context) (bool, error) {
130
+
count, err := s.db.GetMempoolCount(ctx)
131
+
if err != nil {
132
+
return false, err
133
+
}
134
+
return count > 0, nil
135
+
}
132
136
133
-
if isComplete {
134
-
// Complete bundle
135
-
batchCounts, err := s.processBatch(ctx, operations)
136
-
if err != nil {
137
-
log.Error("Error processing bundle: %v", err)
138
-
}
139
-
for typ, count := range batchCounts {
140
-
newEndpointCounts[typ] += count
141
-
}
142
-
totalProcessed += int64(len(operations))
137
+
func (s *Scanner) handleMempoolOnly(ctx context.Context, m *ScanMetrics, cursor *storage.ScanCursor) error {
138
+
count, _ := s.db.GetMempoolCount(ctx)
139
+
log.Info("→ Mempool has %d operations, continuing to fill it before fetching new bundles", count)
143
140
144
-
// Calculate total for this batch for logging
145
-
batchTotal := int64(0)
146
-
for _, count := range batchCounts {
147
-
batchTotal += count
148
-
}
149
-
log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new endpoints",
150
-
currentBundle, len(operations), batchTotal)
141
+
if err := s.fillMempool(ctx, m); err != nil {
142
+
return err
143
+
}
151
144
152
-
// Update cursor
153
-
if err := s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
154
-
Source: "plc_directory",
155
-
LastBundleNumber: currentBundle,
156
-
LastScanTime: time.Now(),
157
-
RecordsProcessed: cursor.RecordsProcessed + totalProcessed,
158
-
}); err != nil {
159
-
log.Error("Warning: failed to update cursor: %v", err)
160
-
}
145
+
if err := s.processMempool(ctx, m); err != nil {
146
+
log.Error("Error processing mempool: %v", err)
147
+
}
161
148
162
-
currentBundle++
163
-
} else {
164
-
// Incomplete bundle - we've reached the end of available data
165
-
log.Info("→ Bundle %06d incomplete (%d ops), adding to mempool", currentBundle, len(operations))
149
+
m.logSummary()
150
+
return nil
151
+
}
166
152
167
-
if err := s.addToMempool(ctx, operations, newEndpointCounts); err != nil {
168
-
log.Error("Error adding to mempool: %v", err)
169
-
}
153
+
func (s *Scanner) processSingleBundle(ctx context.Context, m *ScanMetrics) error {
154
+
log.Verbose("→ Processing bundle %06d...", m.currentBundle)
170
155
171
-
// ✅ Now fill mempool to 10,000
172
-
if err := s.fillMempoolToSize(ctx, newEndpointCounts, &totalProcessed); err != nil {
173
-
log.Error("Error filling mempool: %v", err)
174
-
}
156
+
ops, isComplete, err := s.bundleManager.LoadBundle(ctx, m.currentBundle, s.client)
157
+
if err != nil {
158
+
return s.handleBundleError(err, m)
159
+
}
160
+
161
+
if isComplete {
162
+
return s.handleCompleteBundle(ctx, ops, m)
163
+
}
164
+
return s.handleIncompleteBundle(ctx, ops, m)
165
+
}
166
+
167
+
func (s *Scanner) handleBundleError(err error, m *ScanMetrics) error {
168
+
log.Error("Failed to load bundle %06d: %v", m.currentBundle, err)
169
+
170
+
if strings.Contains(err.Error(), "rate limited") {
171
+
log.Info("⚠ Rate limit hit, pausing for 5 minutes...")
172
+
time.Sleep(5 * time.Minute)
173
+
return fmt.Errorf("retry")
174
+
}
175
+
176
+
if m.currentBundle > 1 {
177
+
log.Info("→ Reached end of available data")
178
+
}
179
+
return err
180
+
}
175
181
176
-
// Process mempool
177
-
if err := s.processMempoolRecursive(ctx, newEndpointCounts, ¤tBundle, &totalProcessed); err != nil {
178
-
log.Error("Error processing mempool: %v", err)
179
-
}
182
+
func (s *Scanner) shouldRetry(err error) bool {
183
+
return err != nil && err.Error() == "retry"
184
+
}
180
185
181
-
break // End of scan
182
-
}
186
+
func (s *Scanner) handleCompleteBundle(ctx context.Context, ops []PLCOperation, m *ScanMetrics) error {
187
+
counts, err := s.processBatch(ctx, ops)
188
+
if err != nil {
189
+
return err
183
190
}
184
191
185
-
endpointSummary := formatEndpointCounts(newEndpointCounts)
186
-
log.Info("PLC scan completed: %d operations, %s in %v",
187
-
totalProcessed, endpointSummary, time.Since(startTime))
192
+
s.mergeCounts(m.endpointCounts, counts)
193
+
m.totalProcessed += int64(len(ops))
188
194
195
+
batchTotal := sumCounts(counts)
196
+
log.Verbose("✓ Processed bundle %06d: %d operations (after dedup), %d new endpoints",
197
+
m.currentBundle, len(ops), batchTotal)
198
+
199
+
m.currentBundle++
189
200
return nil
190
201
}
191
202
192
-
func (s *Scanner) fillMempoolToSize(ctx context.Context, newEndpointCounts map[string]int64, totalProcessed *int64) error {
193
-
const fetchLimit = 1000 // PLC directory limit
203
+
func (s *Scanner) handleIncompleteBundle(ctx context.Context, ops []PLCOperation, m *ScanMetrics) error {
204
+
log.Info("→ Bundle %06d incomplete (%d ops), adding to mempool", m.currentBundle, len(ops))
205
+
206
+
if err := s.addToMempool(ctx, ops, m.endpointCounts); err != nil {
207
+
return err
208
+
}
209
+
210
+
s.finalizeMempool(ctx, m)
211
+
return fmt.Errorf("incomplete") // Signal end of processing
212
+
}
213
+
214
+
func (s *Scanner) finalizeMempool(ctx context.Context, m *ScanMetrics) {
215
+
if err := s.fillMempool(ctx, m); err != nil {
216
+
log.Error("Error filling mempool: %v", err)
217
+
}
218
+
if err := s.processMempool(ctx, m); err != nil {
219
+
log.Error("Error processing mempool: %v", err)
220
+
}
221
+
}
222
+
223
+
func (s *Scanner) fillMempool(ctx context.Context, m *ScanMetrics) error {
224
+
const fetchLimit = 1000
194
225
195
226
for {
196
-
countBefore, err := s.db.GetMempoolCount(ctx)
227
+
count, err := s.db.GetMempoolCount(ctx)
197
228
if err != nil {
198
229
return err
199
230
}
200
231
201
-
if countBefore >= BUNDLE_SIZE {
202
-
log.Info("✓ Mempool filled to %d operations (target: %d)", countBefore, BUNDLE_SIZE)
232
+
if count >= BUNDLE_SIZE {
233
+
log.Info("✓ Mempool filled to %d operations (target: %d)", count, BUNDLE_SIZE)
203
234
return nil
204
235
}
205
236
206
-
log.Info("→ Mempool has %d/%d operations, fetching more from PLC directory...", countBefore, BUNDLE_SIZE)
237
+
log.Info("→ Mempool has %d/%d operations, fetching more from PLC directory...", count, BUNDLE_SIZE)
207
238
208
-
// ✅ Get just the last operation (much faster!)
209
-
lastOp, err := s.db.GetLastMempoolOperation(ctx)
239
+
// ✅ Fix: Don't capture unused 'ops' variable
240
+
shouldContinue, err := s.fetchNextBatch(ctx, fetchLimit, m)
210
241
if err != nil {
211
242
return err
212
243
}
213
244
214
-
var afterTimestamp string
215
-
if lastOp != nil {
216
-
afterTimestamp = lastOp.CreatedAt.Format(time.RFC3339Nano)
217
-
log.Verbose(" Using cursor: %s", afterTimestamp)
245
+
if !shouldContinue {
246
+
finalCount, _ := s.db.GetMempoolCount(ctx)
247
+
log.Info("→ Stopping fill, mempool has %d/%d operations", finalCount, BUNDLE_SIZE)
248
+
return nil
218
249
}
250
+
}
251
+
}
219
252
220
-
// ✅ Always fetch 1000 (PLC limit)
221
-
operations, err := s.client.Export(ctx, ExportOptions{
222
-
Count: fetchLimit,
223
-
After: afterTimestamp,
224
-
})
225
-
if err != nil {
226
-
return fmt.Errorf("failed to fetch from PLC: %w", err)
227
-
}
253
+
func (s *Scanner) fetchNextBatch(ctx context.Context, limit int, m *ScanMetrics) (bool, error) {
254
+
lastOp, err := s.db.GetLastMempoolOperation(ctx)
255
+
if err != nil {
256
+
return false, err
257
+
}
228
258
229
-
fetchedCount := len(operations)
230
-
log.Verbose(" Fetched %d operations from PLC", fetchedCount)
259
+
var after string
260
+
if lastOp != nil {
261
+
after = lastOp.CreatedAt.Format(time.RFC3339Nano)
262
+
log.Verbose(" Using cursor: %s", after)
263
+
}
231
264
232
-
// ✅ No data at all - we're done
233
-
if fetchedCount == 0 {
234
-
log.Info("→ No more data available from PLC directory (mempool has %d/%d)", countBefore, BUNDLE_SIZE)
235
-
return nil
236
-
}
265
+
ops, err := s.client.Export(ctx, ExportOptions{Count: limit, After: after})
266
+
if err != nil {
267
+
return false, fmt.Errorf("failed to fetch from PLC: %w", err)
268
+
}
237
269
238
-
// Add to mempool (with duplicate checking)
239
-
if err := s.addToMempool(ctx, operations, newEndpointCounts); err != nil {
240
-
return err
241
-
}
270
+
fetchedCount := len(ops)
271
+
log.Verbose(" Fetched %d operations from PLC", fetchedCount)
242
272
243
-
*totalProcessed += int64(fetchedCount)
273
+
if fetchedCount == 0 {
274
+
count, _ := s.db.GetMempoolCount(ctx)
275
+
log.Info("→ No more data available from PLC directory (mempool has %d/%d)", count, BUNDLE_SIZE)
276
+
return false, nil
277
+
}
244
278
245
-
// Check if mempool actually grew
246
-
countAfter, err := s.db.GetMempoolCount(ctx)
247
-
if err != nil {
248
-
return err
249
-
}
279
+
// ✅ Fix: Handle errors from GetMempoolCount
280
+
beforeCount, err := s.db.GetMempoolCount(ctx)
281
+
if err != nil {
282
+
return false, err
283
+
}
250
284
251
-
newOpsAdded := countAfter - countBefore
252
-
duplicateCount := fetchedCount - newOpsAdded
285
+
if err := s.addToMempool(ctx, ops, m.endpointCounts); err != nil {
286
+
return false, err
287
+
}
253
288
254
-
log.Verbose(" Added %d new unique operations to mempool (%d were duplicates)",
255
-
newOpsAdded, duplicateCount)
289
+
afterCount, err := s.db.GetMempoolCount(ctx)
290
+
if err != nil {
291
+
return false, err
292
+
}
256
293
257
-
// ✅ KEY LOGIC: Only repeat if we got a FULL batch (1000)
258
-
// If < 1000, it means we've caught up to the latest data
259
-
if fetchedCount < fetchLimit {
260
-
log.Info("→ Received incomplete batch (%d/%d), caught up to latest data",
261
-
fetchedCount, fetchLimit)
262
-
log.Info("→ Stopping fill, mempool has %d/%d operations", countAfter, BUNDLE_SIZE)
263
-
return nil
264
-
}
294
+
m.totalProcessed += int64(fetchedCount)
295
+
log.Verbose(" Added %d new unique operations to mempool (%d were duplicates)",
296
+
afterCount-beforeCount, fetchedCount-(afterCount-beforeCount))
265
297
266
-
// Got full batch (1000), might be more data - continue loop
267
-
log.Verbose(" Received full batch (%d), checking for more data...", fetchLimit)
298
+
// Continue only if got full batch
299
+
shouldContinue := fetchedCount >= limit
300
+
if !shouldContinue {
301
+
log.Info("→ Received incomplete batch (%d/%d), caught up to latest data", fetchedCount, limit)
268
302
}
303
+
304
+
return shouldContinue, nil
269
305
}
270
306
271
-
// addToMempool adds operations to mempool and processes them for endpoint discovery
272
-
func (s *Scanner) addToMempool(ctx context.Context, operations []PLCOperation, newEndpointCounts map[string]int64) error {
273
-
mempoolOps := make([]storage.MempoolOperation, len(operations))
274
-
275
-
for i, op := range operations {
276
-
// ✅ Store the original RawJSON directly
307
+
func (s *Scanner) addToMempool(ctx context.Context, ops []PLCOperation, counts map[string]int64) error {
308
+
mempoolOps := make([]storage.MempoolOperation, len(ops))
309
+
for i, op := range ops {
277
310
mempoolOps[i] = storage.MempoolOperation{
278
311
DID: op.DID,
279
-
Operation: string(op.RawJSON), // ✅ Use RawJSON instead of Marshal
312
+
Operation: string(op.RawJSON),
280
313
CID: op.CID,
281
314
CreatedAt: op.CreatedAt,
282
315
}
283
316
}
284
317
285
-
// Add to mempool
286
318
if err := s.db.AddToMempool(ctx, mempoolOps); err != nil {
287
319
return err
288
320
}
289
321
290
-
// Process for endpoint discovery immediately
291
-
batchCounts, err := s.processBatch(ctx, operations)
292
-
for typ, count := range batchCounts {
293
-
newEndpointCounts[typ] += count
294
-
}
322
+
// Process for endpoint discovery
323
+
batchCounts, err := s.processBatch(ctx, ops)
324
+
s.mergeCounts(counts, batchCounts)
295
325
return err
296
326
}
297
327
298
-
// processMempoolRecursive checks mempool and creates bundles when >= 1000 ops
299
-
func (s *Scanner) processMempoolRecursive(ctx context.Context, newEndpointCounts map[string]int64, currentBundle *int, totalProcessed *int64) error {
328
+
func (s *Scanner) processMempool(ctx context.Context, m *ScanMetrics) error {
300
329
for {
301
-
// Check mempool size
302
330
count, err := s.db.GetMempoolCount(ctx)
303
331
if err != nil {
304
332
return err
···
308
336
309
337
if count < BUNDLE_SIZE {
310
338
log.Info("Mempool has %d/%d operations, cannot create bundle yet", count, BUNDLE_SIZE)
311
-
break
339
+
return nil
312
340
}
313
341
314
342
log.Info("→ Creating bundle from mempool (%d operations available)...", count)
315
343
316
-
// Get first BUNDLE_SIZE operations ordered by timestamp
317
-
mempoolOps, err := s.db.GetMempoolOperations(ctx, BUNDLE_SIZE)
344
+
bundleNum, ops, err := s.createBundleFromMempool(ctx)
318
345
if err != nil {
319
346
return err
320
347
}
321
348
322
-
// Convert to PLCOperations and track IDs
323
-
operations := make([]PLCOperation, 0, BUNDLE_SIZE)
324
-
mempoolIDs := make([]int64, 0, BUNDLE_SIZE)
325
-
seenCIDs := make(map[string]bool)
349
+
// Process and update metrics
350
+
counts, _ := s.processBatch(ctx, ops)
351
+
s.mergeCounts(m.endpointCounts, counts)
352
+
m.currentBundle = bundleNum
326
353
327
-
for _, mop := range mempoolOps {
328
-
// ✅ Skip duplicates (shouldn't happen but safety check)
329
-
if seenCIDs[mop.CID] {
330
-
mempoolIDs = append(mempoolIDs, mop.ID) // Still delete it
331
-
continue
332
-
}
333
-
seenCIDs[mop.CID] = true
354
+
if err := s.updateCursorForBundle(ctx, bundleNum, m.totalProcessed); err != nil {
355
+
log.Error("Warning: failed to update cursor: %v", err)
356
+
}
334
357
335
-
var op PLCOperation
336
-
json.Unmarshal([]byte(mop.Operation), &op)
358
+
log.Info("✓ Created bundle %06d from mempool", bundleNum)
359
+
}
360
+
}
337
361
338
-
// ✅ Restore RawJSON from database
339
-
op.RawJSON = []byte(mop.Operation)
362
+
func (s *Scanner) createBundleFromMempool(ctx context.Context) (int, []PLCOperation, error) {
363
+
mempoolOps, err := s.db.GetMempoolOperations(ctx, BUNDLE_SIZE)
364
+
if err != nil {
365
+
return 0, nil, err
366
+
}
340
367
341
-
operations = append(operations, op)
342
-
mempoolIDs = append(mempoolIDs, mop.ID)
368
+
ops, ids := s.deduplicateMempool(mempoolOps)
369
+
if len(ops) < BUNDLE_SIZE {
370
+
return 0, nil, fmt.Errorf("only got %d unique operations from mempool, need %d", len(ops), BUNDLE_SIZE)
371
+
}
343
372
344
-
if len(operations) >= BUNDLE_SIZE {
345
-
break
346
-
}
347
-
}
373
+
bundleNum, err := s.bundleManager.CreateBundleFromMempool(ctx, ops)
374
+
if err != nil {
375
+
return 0, nil, err
376
+
}
348
377
349
-
// Final check
350
-
if len(operations) < BUNDLE_SIZE {
351
-
log.Error("⚠ Only got %d unique operations from mempool, need %d", len(operations), BUNDLE_SIZE)
352
-
break
353
-
}
378
+
if err := s.db.DeleteFromMempool(ctx, ids[:len(ops)]); err != nil {
379
+
return 0, nil, err
380
+
}
354
381
355
-
// Create bundle from these operations
356
-
bundleNum, err := s.bundleManager.CreateBundleFromMempool(ctx, operations)
357
-
if err != nil {
358
-
return err
359
-
}
382
+
return bundleNum, ops, nil
383
+
}
360
384
361
-
// Remove from mempool (only what we used)
362
-
if err := s.db.DeleteFromMempool(ctx, mempoolIDs[:len(operations)]); err != nil {
363
-
return err
364
-
}
385
+
func (s *Scanner) deduplicateMempool(mempoolOps []storage.MempoolOperation) ([]PLCOperation, []int64) {
386
+
ops := make([]PLCOperation, 0, BUNDLE_SIZE)
387
+
ids := make([]int64, 0, BUNDLE_SIZE)
388
+
seenCIDs := make(map[string]bool)
365
389
366
-
// Process for PDS
367
-
batchCounts, _ := s.processBatch(ctx, operations)
368
-
for typ, count := range batchCounts {
369
-
newEndpointCounts[typ] += count
390
+
for _, mop := range mempoolOps {
391
+
if seenCIDs[mop.CID] {
392
+
ids = append(ids, mop.ID)
393
+
continue
370
394
}
395
+
seenCIDs[mop.CID] = true
371
396
372
-
*currentBundle = bundleNum
397
+
var op PLCOperation
398
+
json.Unmarshal([]byte(mop.Operation), &op)
399
+
op.RawJSON = []byte(mop.Operation)
373
400
374
-
// Update cursor
375
-
s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
376
-
Source: "plc_directory",
377
-
LastBundleNumber: bundleNum,
378
-
LastScanTime: time.Now(),
379
-
RecordsProcessed: *totalProcessed,
380
-
})
401
+
ops = append(ops, op)
402
+
ids = append(ids, mop.ID)
381
403
382
-
log.Info("✓ Created bundle %06d from mempool", bundleNum)
404
+
if len(ops) >= BUNDLE_SIZE {
405
+
break
406
+
}
383
407
}
384
408
385
-
return nil
409
+
return ops, ids
386
410
}
387
411
388
-
// processBatch processes operations for PDS discovery
389
-
func (s *Scanner) processBatch(ctx context.Context, operations []PLCOperation) (map[string]int64, error) {
390
-
endpointCounts := make(map[string]int64) // Track by type
391
-
seenInBatch := make(map[string]*PLCOperation) // key: "type:endpoint"
412
+
func (s *Scanner) processBatch(ctx context.Context, ops []PLCOperation) (map[string]int64, error) {
413
+
counts := make(map[string]int64)
414
+
seen := make(map[string]*PLCOperation)
392
415
393
-
for _, op := range operations {
416
+
// Collect unique endpoints
417
+
for _, op := range ops {
394
418
if op.IsNullified() {
395
419
continue
396
420
}
397
-
398
-
endpoints := s.extractEndpointsFromOperation(op)
399
-
for _, ep := range endpoints {
421
+
for _, ep := range s.extractEndpointsFromOperation(op) {
400
422
key := fmt.Sprintf("%s:%s", ep.Type, ep.Endpoint)
401
-
if _, seen := seenInBatch[key]; !seen {
402
-
seenInBatch[key] = &op
423
+
if _, exists := seen[key]; !exists {
424
+
seen[key] = &op
403
425
}
404
426
}
405
427
}
406
428
407
-
for key, firstOp := range seenInBatch {
429
+
// Store new endpoints
430
+
for key, firstOp := range seen {
408
431
parts := strings.SplitN(key, ":", 2)
409
-
endpointType := parts[0]
410
-
endpoint := parts[1]
432
+
epType, endpoint := parts[0], parts[1]
411
433
412
-
exists, err := s.db.EndpointExists(ctx, endpoint, endpointType)
434
+
exists, err := s.db.EndpointExists(ctx, endpoint, epType)
413
435
if err != nil || exists {
414
436
continue
415
437
}
416
438
417
-
if err := s.db.UpsertEndpoint(ctx, &storage.Endpoint{
418
-
EndpointType: endpointType,
419
-
Endpoint: endpoint,
420
-
DiscoveredAt: firstOp.CreatedAt,
421
-
LastChecked: time.Time{},
422
-
Status: storage.EndpointStatusUnknown,
423
-
}); err != nil {
424
-
log.Error("Error storing %s endpoint %s: %v", endpointType, stripansi.Strip(endpoint), err)
439
+
if err := s.storeEndpoint(ctx, epType, endpoint, firstOp.CreatedAt); err != nil {
440
+
log.Error("Error storing %s endpoint %s: %v", epType, stripansi.Strip(endpoint), err)
425
441
continue
426
442
}
427
443
428
-
log.Info("✓ Discovered new %s endpoint: %s", endpointType, stripansi.Strip(endpoint))
429
-
endpointCounts[endpointType]++
444
+
log.Info("✓ Discovered new %s endpoint: %s", epType, stripansi.Strip(endpoint))
445
+
counts[epType]++
430
446
}
431
447
432
-
return endpointCounts, nil
448
+
return counts, nil
433
449
}
434
450
435
-
// extractEndpointsFromOperation extracts ALL service endpoints
451
+
func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error {
452
+
return s.db.UpsertEndpoint(ctx, &storage.Endpoint{
453
+
EndpointType: epType,
454
+
Endpoint: endpoint,
455
+
DiscoveredAt: discoveredAt,
456
+
LastChecked: time.Time{},
457
+
Status: storage.EndpointStatusUnknown,
458
+
})
459
+
}
460
+
436
461
func (s *Scanner) extractEndpointsFromOperation(op PLCOperation) []EndpointInfo {
437
462
var endpoints []EndpointInfo
438
463
439
-
if services, ok := op.Operation["services"].(map[string]interface{}); ok {
440
-
// Extract PDS
441
-
if atprotoPDS, ok := services["atproto_pds"].(map[string]interface{}); ok {
442
-
if endpoint, ok := atprotoPDS["endpoint"].(string); ok {
443
-
if svcType, ok := atprotoPDS["type"].(string); ok {
444
-
if svcType == "AtprotoPersonalDataServer" {
445
-
endpoints = append(endpoints, EndpointInfo{
446
-
Type: "pds",
447
-
Endpoint: endpoint,
448
-
})
449
-
}
450
-
}
451
-
}
452
-
}
464
+
services, ok := op.Operation["services"].(map[string]interface{})
465
+
if !ok {
466
+
return endpoints
467
+
}
453
468
454
-
// Extract Labeler
455
-
if atprotoLabeler, ok := services["atproto_labeler"].(map[string]interface{}); ok {
456
-
if endpoint, ok := atprotoLabeler["endpoint"].(string); ok {
457
-
if svcType, ok := atprotoLabeler["type"].(string); ok {
458
-
if svcType == "AtprotoLabeler" {
459
-
endpoints = append(endpoints, EndpointInfo{
460
-
Type: "labeler",
461
-
Endpoint: endpoint,
462
-
})
463
-
}
464
-
}
465
-
}
466
-
}
469
+
// Extract PDS
470
+
if ep := s.extractServiceEndpoint(services, "atproto_pds", "AtprotoPersonalDataServer", "pds"); ep != nil {
471
+
endpoints = append(endpoints, *ep)
472
+
}
467
473
468
-
// Add more service types as needed...
474
+
// Extract Labeler
475
+
if ep := s.extractServiceEndpoint(services, "atproto_labeler", "AtprotoLabeler", "labeler"); ep != nil {
476
+
endpoints = append(endpoints, *ep)
469
477
}
470
478
471
479
return endpoints
472
480
}
473
481
474
-
func contains(s, substr string) bool {
475
-
return len(s) >= len(substr) && s[:len(substr)] == substr
482
+
func (s *Scanner) extractServiceEndpoint(services map[string]interface{}, serviceKey, expectedType, resultType string) *EndpointInfo {
483
+
svc, ok := services[serviceKey].(map[string]interface{})
484
+
if !ok {
485
+
return nil
486
+
}
487
+
488
+
endpoint, hasEndpoint := svc["endpoint"].(string)
489
+
svcType, hasType := svc["type"].(string)
490
+
491
+
if hasEndpoint && hasType && svcType == expectedType {
492
+
return &EndpointInfo{
493
+
Type: resultType,
494
+
Endpoint: endpoint,
495
+
}
496
+
}
497
+
498
+
return nil
499
+
}
500
+
501
+
func (s *Scanner) updateCursor(ctx context.Context, cursor *storage.ScanCursor, m *ScanMetrics) error {
502
+
return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
503
+
Source: "plc_directory",
504
+
LastBundleNumber: m.currentBundle - 1,
505
+
LastScanTime: time.Now(),
506
+
RecordsProcessed: cursor.RecordsProcessed + m.totalProcessed,
507
+
})
508
+
}
509
+
510
+
func (s *Scanner) updateCursorForBundle(ctx context.Context, bundle int, totalProcessed int64) error {
511
+
return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
512
+
Source: "plc_directory",
513
+
LastBundleNumber: bundle,
514
+
LastScanTime: time.Now(),
515
+
RecordsProcessed: totalProcessed,
516
+
})
517
+
}
518
+
519
+
// Helper functions
520
+
func (s *Scanner) mergeCounts(dest, src map[string]int64) {
521
+
for k, v := range src {
522
+
dest[k] += v
523
+
}
524
+
}
525
+
526
+
func sumCounts(counts map[string]int64) int64 {
527
+
total := int64(0)
528
+
for _, v := range counts {
529
+
total += v
530
+
}
531
+
return total
476
532
}
477
533
478
534
func formatEndpointCounts(counts map[string]int64) string {
···
480
536
return "0 new endpoints"
481
537
}
482
538
483
-
total := int64(0)
484
-
for _, count := range counts {
485
-
total += count
486
-
}
539
+
total := sumCounts(counts)
487
540
488
541
if len(counts) == 1 {
489
542
for typ, count := range counts {
···
491
544
}
492
545
}
493
546
494
-
// Multiple types
495
547
parts := make([]string, 0, len(counts))
496
548
for typ, count := range counts {
497
549
parts = append(parts, fmt.Sprintf("%d %s", count, typ))