+18
-4
bundle/manager.go
+18
-4
bundle/manager.go
···
1134
1134
afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
1135
1135
prevBundleHash = lastBundle.Hash
1136
1136
1137
+
// ALWAYS get boundaries from last bundle initially
1137
1138
prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
1138
1139
if err == nil {
1139
1140
_, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
1140
1141
if !quiet {
1141
-
m.logger.Printf("Previous bundle %06d has %d boundary CIDs at %s",
1142
-
lastBundle.BundleNumber, len(prevBoundaryCIDs), lastBundle.EndTime.Format(time.RFC3339))
1142
+
m.logger.Printf("Loaded %d boundary CIDs from bundle %06d (at %s)",
1143
+
len(prevBoundaryCIDs), lastBundle.BundleNumber,
1144
+
lastBundle.EndTime.Format(time.RFC3339)[:19])
1143
1145
}
1144
1146
}
1145
1147
}
1146
1148
1147
-
// Use mempool's last time if available
1149
+
// If mempool has operations, update cursor but KEEP boundaries from bundle
1150
+
// (mempool operations already had boundary dedup applied when they were added)
1148
1151
if m.mempool.Count() > 0 {
1149
1152
mempoolLastTime := m.mempool.GetLastTime()
1150
1153
if mempoolLastTime != "" {
1151
1154
if !quiet {
1152
-
m.logger.Printf("Mempool has %d ops, last at %s", m.mempool.Count(), mempoolLastTime)
1155
+
m.logger.Printf("Mempool has %d ops, resuming from %s",
1156
+
m.mempool.Count(), mempoolLastTime[:19])
1153
1157
}
1154
1158
afterTime = mempoolLastTime
1159
+
1160
+
// Calculate boundaries from MEMPOOL for next fetch
1161
+
mempoolOps, _ := m.GetMempoolOperations()
1162
+
if len(mempoolOps) > 0 {
1163
+
_, mempoolBoundaries := m.operations.GetBoundaryCIDs(mempoolOps)
1164
+
prevBoundaryCIDs = mempoolBoundaries
1165
+
if !quiet {
1166
+
m.logger.Printf("Using %d boundary CIDs from mempool", len(prevBoundaryCIDs))
1167
+
}
1168
+
}
1155
1169
}
1156
1170
}
1157
1171
+56
-41
internal/sync/fetcher.go
+56
-41
internal/sync/fetcher.go
···
48
48
49
49
seenCIDs := make(map[string]bool)
50
50
51
-
// Mark previous boundary CIDs as seen
52
-
for cid := range prevBoundaryCIDs {
51
+
// ✅ Initialize current boundaries from previous bundle (or empty if first fetch)
52
+
currentBoundaryCIDs := prevBoundaryCIDs
53
+
if currentBoundaryCIDs == nil {
54
+
currentBoundaryCIDs = make(map[string]bool)
55
+
}
56
+
57
+
// Mark boundary CIDs as seen to prevent re-inclusion
58
+
for cid := range currentBoundaryCIDs {
53
59
seenCIDs[cid] = true
54
60
}
55
61
56
-
if !quiet && len(prevBoundaryCIDs) > 0 {
57
-
f.logger.Printf(" Tracking %d boundary CIDs from previous bundle", len(prevBoundaryCIDs))
62
+
if !quiet && len(currentBoundaryCIDs) > 0 {
63
+
f.logger.Printf(" Starting with %d boundary CIDs from previous bundle", len(currentBoundaryCIDs))
58
64
}
59
65
60
66
currentAfter := afterTime
···
112
118
originalBatchSize := len(batch)
113
119
totalReceived += originalBatchSize
114
120
115
-
// ✨ FIX: Collect new ops first (don't mark as seen yet)
121
+
// ✅ CRITICAL: Strip boundary duplicates using current boundaries
122
+
batch = f.operations.StripBoundaryDuplicates(
123
+
batch,
124
+
currentAfter,
125
+
currentBoundaryCIDs,
126
+
)
127
+
128
+
afterStripSize := len(batch)
129
+
strippedCount := originalBatchSize - afterStripSize
130
+
131
+
if !quiet && strippedCount > 0 {
132
+
f.logger.Printf(" Stripped %d boundary duplicates from fetch", strippedCount)
133
+
}
134
+
135
+
// Collect new ops (not in seenCIDs)
116
136
beforeDedup := len(allNewOps)
117
137
var batchNewOps []plcclient.PLCOperation
118
138
119
139
for _, op := range batch {
120
140
if !seenCIDs[op.CID] {
121
-
// Collect but don't mark as seen yet
122
141
batchNewOps = append(batchNewOps, op)
123
142
}
124
143
}
125
144
126
145
uniqueInBatch := len(batchNewOps)
127
-
dupesFiltered := originalBatchSize - uniqueInBatch
128
-
totalDupes += dupesFiltered
146
+
dupesFiltered := afterStripSize - uniqueInBatch
147
+
totalDupes += dupesFiltered + strippedCount
129
148
130
-
// ✨ FIX: Try to add to mempool BEFORE marking as seen
149
+
// Try to add to mempool
131
150
if uniqueInBatch > 0 && mempool != nil {
132
-
added, addErr := mempool.Add(batchNewOps)
151
+
_, addErr := mempool.Add(batchNewOps)
133
152
134
153
if addErr != nil {
135
-
// ✨ CRITICAL: Add failed - don't mark as seen!
154
+
// Add failed - don't mark as seen
136
155
if !quiet {
137
156
f.logger.Printf(" ❌ Mempool add failed: %v", addErr)
138
-
f.logger.Printf(" → %d operations NOT marked as seen (will retry on next sync)", uniqueInBatch)
139
157
}
140
-
141
-
// Save what we successfully added so far
142
158
mempool.Save()
143
-
return allNewOps, fetchesMade, fmt.Errorf("mempool add failed for %d operations: %w", uniqueInBatch, addErr)
159
+
return allNewOps, fetchesMade, fmt.Errorf("mempool add failed: %w", addErr)
144
160
}
145
161
146
-
// ✨ SUCCESS: Add succeeded, NOW mark as seen and add to result
147
-
if added != uniqueInBatch {
148
-
// Mempool deduplicated some - this is OK, just log
149
-
if !quiet {
150
-
f.logger.Printf(" ℹ️ Mempool deduplicated %d operations (already present)", uniqueInBatch-added)
151
-
}
152
-
}
153
-
154
-
// Mark successfully added ops as seen
162
+
// Success - mark as seen
155
163
for _, op := range batchNewOps {
156
164
seenCIDs[op.CID] = true
157
165
}
···
159
167
160
168
uniqueAdded := len(allNewOps) - beforeDedup
161
169
162
-
// Show fetch result with running totals
163
170
if !quiet {
164
171
opsPerSec := float64(originalBatchSize) / fetchDuration.Seconds()
165
-
166
-
if dupesFiltered > 0 {
167
-
f.logger.Printf(" → +%d unique (%d dupes) in %s • Running: %d/%d (%.0f ops/sec)",
168
-
uniqueAdded, dupesFiltered, fetchDuration, len(allNewOps), target, opsPerSec)
172
+
if dupesFiltered+strippedCount > 0 {
173
+
f.logger.Printf(" → +%d unique (%d dupes, %d boundary) in %s • Running: %d/%d (%.0f ops/sec)",
174
+
uniqueAdded, dupesFiltered, strippedCount, fetchDuration, len(allNewOps), target, opsPerSec)
169
175
} else {
170
176
f.logger.Printf(" → +%d unique in %s • Running: %d/%d (%.0f ops/sec)",
171
177
uniqueAdded, fetchDuration, len(allNewOps), target, opsPerSec)
172
178
}
173
179
}
174
180
175
-
// ✨ Save only if threshold met
181
+
// ✅ CRITICAL: Calculate NEW boundary CIDs from this fetch for next iteration
182
+
if len(batch) > 0 {
183
+
boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch)
184
+
currentBoundaryCIDs = newBoundaryCIDs
185
+
currentAfter = boundaryTime.Format(time.RFC3339Nano)
186
+
187
+
if !quiet && len(newBoundaryCIDs) > 1 {
188
+
f.logger.Printf(" Updated boundaries: %d CIDs at %s",
189
+
len(newBoundaryCIDs), currentAfter[:19])
190
+
}
191
+
}
192
+
193
+
// Save if threshold met
176
194
if err := mempool.SaveIfNeeded(); err != nil {
177
195
f.logger.Printf(" Warning: failed to save mempool: %v", err)
178
196
}
179
197
180
-
if !quiet && added > 0 {
181
-
cursor := mempool.GetLastTime()
182
-
f.logger.Printf(" Added to mempool: %d ops (total: %d, cursor: %s)",
183
-
added, mempool.Count(), cursor[:19])
184
-
}
185
198
} else if uniqueInBatch > 0 {
186
-
// No mempool provided - just collect
199
+
// No mempool - just collect
187
200
for _, op := range batchNewOps {
188
201
seenCIDs[op.CID] = true
189
202
}
190
203
allNewOps = append(allNewOps, batchNewOps...)
191
-
}
192
204
193
-
// Update cursor for next fetch
194
-
if len(batch) > 0 {
195
-
currentAfter = batch[len(batch)-1].CreatedAt.Format(time.RFC3339Nano)
205
+
// ✅ Still update boundaries even without mempool
206
+
if len(batch) > 0 {
207
+
boundaryTime, newBoundaryCIDs := f.operations.GetBoundaryCIDs(batch)
208
+
currentBoundaryCIDs = newBoundaryCIDs
209
+
currentAfter = boundaryTime.Format(time.RFC3339Nano)
210
+
}
196
211
}
197
212
198
-
// Check completeness
213
+
// Check if incomplete batch (caught up)
199
214
if originalBatchSize < batchSize {
200
215
if !quiet {
201
216
f.logger.Printf(" Incomplete batch (%d/%d) → caught up", originalBatchSize, batchSize)