+28
cmd/plcbundle/commands/op.go
+28
cmd/plcbundle/commands/op.go
···
343
343
// findOperationByCID searches for an operation by CID
344
344
func findOperationByCID(mgr BundleManager, cid string) error {
345
345
ctx := context.Background()
346
+
347
+
// ✨ CHECK MEMPOOL FIRST (most recent data)
348
+
fmt.Fprintf(os.Stderr, "Checking mempool...\n")
349
+
mempoolOps, err := mgr.GetMempoolOperations()
350
+
if err == nil && len(mempoolOps) > 0 {
351
+
for pos, op := range mempoolOps {
352
+
if op.CID == cid {
353
+
fmt.Printf("Found in mempool: position %d\n\n", pos)
354
+
fmt.Printf(" DID: %s\n", op.DID)
355
+
fmt.Printf(" Created: %s\n", op.CreatedAt.Format("2006-01-02 15:04:05"))
356
+
357
+
if op.IsNullified() {
358
+
fmt.Printf(" Status: ✗ Nullified")
359
+
if nullCID := op.GetNullifyingCID(); nullCID != "" {
360
+
fmt.Printf(" by %s", nullCID)
361
+
}
362
+
fmt.Printf("\n")
363
+
} else {
364
+
fmt.Printf(" Status: ✓ Active\n")
365
+
}
366
+
367
+
return nil
368
+
}
369
+
}
370
+
}
371
+
372
+
// Search bundles
346
373
index := mgr.GetIndex()
347
374
bundles := index.GetBundles()
348
375
···
390
417
}
391
418
392
419
fmt.Fprintf(os.Stderr, "\nCID not found: %s\n", cid)
420
+
fmt.Fprintf(os.Stderr, "(Searched %d bundles + mempool)\n", len(bundles))
393
421
return fmt.Errorf("CID not found")
394
422
}
+55
-24
internal/sync/fetcher.go
+55
-24
internal/sync/fetcher.go
···
42
42
prevBoundaryCIDs map[string]bool,
43
43
target int,
44
44
quiet bool,
45
-
mempool MempoolInterface, // NEW: pass mempool directly
45
+
mempool MempoolInterface,
46
46
totalFetchesSoFar int,
47
47
) ([]plcclient.PLCOperation, int, error) {
48
48
···
112
112
originalBatchSize := len(batch)
113
113
totalReceived += originalBatchSize
114
114
115
-
// Deduplicate
115
+
// ✨ FIX: Collect new ops first (don't mark as seen yet)
116
116
beforeDedup := len(allNewOps)
117
+
var batchNewOps []plcclient.PLCOperation
118
+
117
119
for _, op := range batch {
118
120
if !seenCIDs[op.CID] {
119
-
seenCIDs[op.CID] = true
120
-
allNewOps = append(allNewOps, op)
121
+
// Collect but don't mark as seen yet
122
+
batchNewOps = append(batchNewOps, op)
121
123
}
122
124
}
123
125
124
-
uniqueAdded := len(allNewOps) - beforeDedup
125
-
dupesFiltered := originalBatchSize - uniqueAdded
126
+
uniqueInBatch := len(batchNewOps)
127
+
dupesFiltered := originalBatchSize - uniqueInBatch
126
128
totalDupes += dupesFiltered
127
129
128
-
// Show fetch result with running totals
129
-
if !quiet {
130
-
opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds()
130
+
// ✨ FIX: Try to add to mempool BEFORE marking as seen
131
+
if uniqueInBatch > 0 && mempool != nil {
132
+
added, addErr := mempool.Add(batchNewOps)
133
+
134
+
if addErr != nil {
135
+
// ✨ CRITICAL: Add failed - don't mark as seen!
136
+
if !quiet {
137
+
f.logger.Printf(" ❌ Mempool add failed: %v", addErr)
138
+
f.logger.Printf(" → %d operations NOT marked as seen (will retry on next sync)", uniqueInBatch)
139
+
}
140
+
141
+
// Save what we successfully added so far
142
+
mempool.Save()
143
+
return allNewOps, fetchesMade, fmt.Errorf("mempool add failed for %d operations: %w", uniqueInBatch, addErr)
144
+
}
145
+
146
+
// ✨ SUCCESS: Add succeeded, NOW mark as seen and add to result
147
+
if added != uniqueInBatch {
148
+
// Mempool deduplicated some - this is OK, just log
149
+
if !quiet {
150
+
f.logger.Printf(" ℹ️ Mempool deduplicated %d operations (already present)", uniqueInBatch-added)
151
+
}
152
+
}
131
153
132
-
if dupesFiltered > 0 {
133
-
f.logger.Printf(" → +%d unique (%d dupes) in %s • Running: %d/%d (%.0f ops/sec)",
134
-
uniqueAdded, dupesFiltered, fetchDuration, len(allNewOps), target, opsPerSec)
135
-
} else {
136
-
f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)",
137
-
uniqueAdded, fetchDuration, len(allNewOps), target, opsPerSec)
154
+
// Mark successfully added ops as seen
155
+
for _, op := range batchNewOps {
156
+
seenCIDs[op.CID] = true
138
157
}
139
-
}
158
+
allNewOps = append(allNewOps, batchNewOps...)
140
159
141
-
// ✨ ADD TO MEMPOOL AND SAVE after each fetch
142
-
if uniqueAdded > 0 && mempool != nil {
143
-
added, addErr := mempool.Add(allNewOps[beforeDedup:])
144
-
if addErr != nil {
145
-
// Force save before returning error
146
-
mempool.Save()
147
-
return allNewOps, fetchesMade, fmt.Errorf("mempool add failed: %w", addErr)
160
+
uniqueAdded := len(allNewOps) - beforeDedup
161
+
162
+
// Show fetch result with running totals
163
+
if !quiet {
164
+
opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds()
165
+
166
+
if dupesFiltered > 0 {
167
+
f.logger.Printf(" → +%d unique (%d dupes) in %s • Running: %d/%d (%.0f ops/sec)",
168
+
uniqueAdded, dupesFiltered, fetchDuration, len(allNewOps), target, opsPerSec)
169
+
} else {
170
+
f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)",
171
+
uniqueAdded, fetchDuration, len(allNewOps), target, opsPerSec)
172
+
}
148
173
}
149
174
150
175
// ✨ Save only if threshold met
···
157
182
f.logger.Printf(" Added to mempool: %d ops (total: %d, cursor: %s)",
158
183
added, mempool.Count(), cursor[:19])
159
184
}
185
+
} else if uniqueInBatch > 0 {
186
+
// No mempool provided - just collect
187
+
for _, op := range batchNewOps {
188
+
seenCIDs[op.CID] = true
189
+
}
190
+
allNewOps = append(allNewOps, batchNewOps...)
160
191
}
161
192
162
-
// Update cursor
193
+
// Update cursor for next fetch
163
194
if len(batch) > 0 {
164
195
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
165
196
}