+37
-4
internal/plc/bundle.go
+37
-4
internal/plc/bundle.go
···
262
262
if _, err := bm.db.GetBundleByNumber(ctx, bundleNum); err != nil {
263
263
bf.compressedHash = bm.hashFile(bf.path)
264
264
bf.uncompressedHash = bm.hash(bm.serializeJSONL(bf.operations))
265
-
cursor := "" // Unknown for existing files
265
+
266
+
// Calculate cursor from previous bundle
267
+
cursor := bm.calculateCursor(ctx, bundleNum)
268
+
266
269
bm.indexBundle(ctx, bundleNum, bf, cursor)
267
270
}
268
271
···
276
279
fetcher := newBundleFetcher(client, afterTime, prevCIDs)
277
280
278
281
ops, isComplete := fetcher.fetchUntilComplete(ctx, BUNDLE_SIZE)
282
+
283
+
log.Info(" Collected %d unique operations after %d fetches (complete=%v)",
284
+
len(ops), fetcher.fetchCount, isComplete)
279
285
280
286
if isComplete {
281
287
bf.operations = ops
282
288
if err := bm.save(bf); err != nil {
283
289
log.Error("Warning: failed to save bundle: %v", err)
284
290
} else {
285
-
cursor := afterTime // Store the cursor used
291
+
// The cursor is the afterTime that was used to fetch this bundle
292
+
cursor := afterTime
286
293
bm.indexBundle(ctx, bundleNum, bf, cursor)
287
-
log.Info("✓ Bundle %06d saved [%d ops, hash: %s...]",
288
-
bundleNum, len(ops), bf.uncompressedHash[:16])
294
+
log.Info("✓ Bundle %06d saved [%d ops, hash: %s..., cursor: %s]",
295
+
bundleNum, len(ops), bf.uncompressedHash[:16], cursor)
289
296
}
290
297
}
291
298
···
602
609
603
610
return bf.operations, nil
604
611
}
612
+
613
+
// calculateCursor determines the cursor value for a given bundle
614
+
// For bundle 1: returns empty string
615
+
// For bundle N: returns the end_time of bundle N-1 in RFC3339Nano format
616
+
func (bm *BundleManager) calculateCursor(ctx context.Context, bundleNum int) string {
617
+
if bundleNum == 1 {
618
+
return ""
619
+
}
620
+
621
+
// Try to get cursor from previous bundle in DB
622
+
if prevBundle, err := bm.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
623
+
return prevBundle.EndTime.Format(time.RFC3339Nano)
624
+
}
625
+
626
+
// If previous bundle not in DB, try to load it from file
627
+
prevBf := bm.newBundleFile(bundleNum - 1)
628
+
if prevBf.exists() {
629
+
if err := bm.load(prevBf); err == nil && len(prevBf.operations) > 0 {
630
+
// Return the createdAt of the last operation in previous bundle
631
+
lastOp := prevBf.operations[len(prevBf.operations)-1]
632
+
return lastOp.CreatedAt.Format(time.RFC3339Nano)
633
+
}
634
+
}
635
+
636
+
return ""
637
+
}