+20
-2
internal/api/handlers.go
+20
-2
internal/api/handlers.go
···
84
84
"hash": bundle.Hash,
85
85
"compressed_hash": bundle.CompressedHash,
86
86
"compressed_size": bundle.CompressedSize,
87
+
"uncompressed_size": bundle.UncompressedSize,
88
+
"compression_ratio": float64(bundle.UncompressedSize) / float64(bundle.CompressedSize),
89
+
"cursor": bundle.Cursor,
87
90
"prev_bundle_hash": bundle.PrevBundleHash,
88
91
"created_at": bundle.CreatedAt,
89
92
}
···
423
426
return
424
427
}
425
428
429
+
uniqueDIDCount, err := s.db.GetMempoolUniqueDIDCount(ctx)
430
+
if err != nil {
431
+
resp.error(err.Error(), http.StatusInternalServerError)
432
+
return
433
+
}
434
+
435
+
uncompressedSize, err := s.db.GetMempoolUncompressedSize(ctx)
436
+
if err != nil {
437
+
resp.error(err.Error(), http.StatusInternalServerError)
438
+
return
439
+
}
440
+
426
441
result := map[string]interface{}{
427
-
"operation_count": count,
428
-
"can_create_bundle": count >= plc.BUNDLE_SIZE,
442
+
"operation_count": count,
443
+
"unique_did_count": uniqueDIDCount,
444
+
"uncompressed_size": uncompressedSize,
445
+
"uncompressed_size_mb": float64(uncompressedSize) / 1024 / 1024,
446
+
"can_create_bundle": count >= plc.BUNDLE_SIZE,
429
447
}
430
448
431
449
if count > 0 {
+26
-19
internal/plc/bundle.go
+26
-19
internal/plc/bundle.go
···
262
262
if _, err := bm.db.GetBundleByNumber(ctx, bundleNum); err != nil {
263
263
bf.compressedHash = bm.hashFile(bf.path)
264
264
bf.uncompressedHash = bm.hash(bm.serializeJSONL(bf.operations))
265
-
bm.indexBundle(ctx, bundleNum, bf)
265
+
cursor := "" // Unknown for existing files
266
+
bm.indexBundle(ctx, bundleNum, bf, cursor)
266
267
}
267
268
268
269
return bf.operations, true, nil
···
276
277
277
278
ops, isComplete := fetcher.fetchUntilComplete(ctx, BUNDLE_SIZE)
278
279
279
-
log.Info(" Collected %d unique operations after %d fetches (complete=%v)",
280
-
len(ops), fetcher.fetchCount, isComplete)
281
-
282
280
if isComplete {
283
281
bf.operations = ops
284
282
if err := bm.save(bf); err != nil {
285
283
log.Error("Warning: failed to save bundle: %v", err)
286
284
} else {
287
-
bm.indexBundle(ctx, bundleNum, bf)
285
+
cursor := afterTime // Store the cursor used
286
+
bm.indexBundle(ctx, bundleNum, bf, cursor)
288
287
log.Info("✓ Bundle %06d saved [%d ops, hash: %s...]",
289
288
bundleNum, len(ops), bf.uncompressedHash[:16])
290
289
}
···
328
327
329
328
// ===== BUNDLE INDEXING =====
330
329
331
-
func (bm *BundleManager) indexBundle(ctx context.Context, bundleNum int, bf *bundleFile) error {
330
+
func (bm *BundleManager) indexBundle(ctx context.Context, bundleNum int, bf *bundleFile, cursor string) error {
332
331
prevHash := ""
333
332
if bundleNum > 1 {
334
333
if prev, err := bm.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
···
337
336
}
338
337
339
338
dids := bm.extractUniqueDIDs(bf.operations)
340
-
fileSize := bm.getFileSize(bf.path)
339
+
compressedFileSize := bm.getFileSize(bf.path)
340
+
341
+
// Calculate uncompressed size
342
+
uncompressedSize := int64(0)
343
+
for _, op := range bf.operations {
344
+
uncompressedSize += int64(len(op.RawJSON)) + 1 // +1 for newline
345
+
}
341
346
342
347
bundle := &storage.PLCBundle{
343
-
BundleNumber: bundleNum,
344
-
StartTime: bf.operations[0].CreatedAt,
345
-
EndTime: bf.operations[len(bf.operations)-1].CreatedAt,
346
-
DIDs: dids,
347
-
Hash: bf.uncompressedHash,
348
-
CompressedHash: bf.compressedHash,
349
-
CompressedSize: fileSize,
350
-
PrevBundleHash: prevHash,
351
-
Compressed: true,
352
-
CreatedAt: time.Now(),
348
+
BundleNumber: bundleNum,
349
+
StartTime: bf.operations[0].CreatedAt,
350
+
EndTime: bf.operations[len(bf.operations)-1].CreatedAt,
351
+
DIDs: dids,
352
+
Hash: bf.uncompressedHash,
353
+
CompressedHash: bf.compressedHash,
354
+
CompressedSize: compressedFileSize,
355
+
UncompressedSize: uncompressedSize,
356
+
Cursor: cursor,
357
+
PrevBundleHash: prevHash,
358
+
Compressed: true,
359
+
CreatedAt: time.Now(),
353
360
}
354
361
355
362
return bm.db.CreateBundle(ctx, bundle)
···
370
377
371
378
// ===== MEMPOOL BUNDLE CREATION =====
372
379
373
-
func (bm *BundleManager) CreateBundleFromMempool(ctx context.Context, operations []PLCOperation) (int, error) {
380
+
func (bm *BundleManager) CreateBundleFromMempool(ctx context.Context, operations []PLCOperation, cursor string) (int, error) {
374
381
if !bm.enabled {
375
382
return 0, fmt.Errorf("bundle manager disabled")
376
383
}
···
392
399
return 0, err
393
400
}
394
401
395
-
if err := bm.indexBundle(ctx, bundleNum, bf); err != nil {
402
+
if err := bm.indexBundle(ctx, bundleNum, bf, cursor); err != nil {
396
403
return 0, err
397
404
}
398
405
+22
-12
internal/plc/scanner.go
+22
-12
internal/plc/scanner.go
···
90
90
91
91
// Handle existing mempool first
92
92
if hasMempool, _ := s.hasSufficientMempool(ctx); hasMempool {
93
-
return s.handleMempoolOnly(ctx, metrics, cursor)
93
+
return s.handleMempoolOnly(ctx, metrics)
94
94
}
95
95
96
96
// Process bundles until incomplete or error
···
141
141
return count > 0, nil
142
142
}
143
143
144
-
func (s *Scanner) handleMempoolOnly(ctx context.Context, m *ScanMetrics, cursor *storage.ScanCursor) error {
144
+
func (s *Scanner) handleMempoolOnly(ctx context.Context, m *ScanMetrics) error {
145
145
count, _ := s.db.GetMempoolCount(ctx)
146
146
log.Info("→ Mempool has %d operations, continuing to fill it before fetching new bundles", count)
147
147
···
354
354
355
355
log.Info("→ Creating bundle from mempool (%d operations available)...", count)
356
356
357
-
bundleNum, ops, err := s.createBundleFromMempool(ctx)
357
+
// Updated to receive 4 values instead of 3
358
+
bundleNum, ops, cursor, err := s.createBundleFromMempool(ctx)
358
359
if err != nil {
359
360
return err
360
361
}
···
366
367
newEndpointsFound := sumCounts(m.endpointCounts) - countsBefore
367
368
368
369
m.totalProcessed += int64(len(ops))
369
-
m.newEndpoints += newEndpointsFound // NEW: Track new endpoints
370
+
m.newEndpoints += newEndpointsFound
370
371
m.currentBundle = bundleNum
371
372
372
373
if err := s.updateCursorForBundle(ctx, bundleNum, m.totalProcessed); err != nil {
373
374
log.Error("Warning: failed to update cursor: %v", err)
374
375
}
375
376
376
-
log.Info("✓ Created bundle %06d from mempool", bundleNum)
377
+
log.Info("✓ Created bundle %06d from mempool (cursor: %s)", bundleNum, cursor)
377
378
}
378
379
}
379
380
380
-
func (s *Scanner) createBundleFromMempool(ctx context.Context) (int, []PLCOperation, error) {
381
+
func (s *Scanner) createBundleFromMempool(ctx context.Context) (int, []PLCOperation, string, error) {
381
382
mempoolOps, err := s.db.GetMempoolOperations(ctx, BUNDLE_SIZE)
382
383
if err != nil {
383
-
return 0, nil, err
384
+
return 0, nil, "", err
384
385
}
385
386
386
387
ops, ids := s.deduplicateMempool(mempoolOps)
387
388
if len(ops) < BUNDLE_SIZE {
388
-
return 0, nil, fmt.Errorf("only got %d unique operations from mempool, need %d", len(ops), BUNDLE_SIZE)
389
+
return 0, nil, "", fmt.Errorf("only got %d unique operations from mempool, need %d", len(ops), BUNDLE_SIZE)
389
390
}
390
391
391
-
bundleNum, err := s.bundleManager.CreateBundleFromMempool(ctx, ops)
392
+
// Determine cursor from last bundle
393
+
cursor := ""
394
+
lastBundle, err := s.db.GetLastBundleNumber(ctx)
395
+
if err == nil && lastBundle > 0 {
396
+
if bundle, err := s.db.GetBundleByNumber(ctx, lastBundle); err == nil {
397
+
cursor = bundle.EndTime.Format(time.RFC3339Nano)
398
+
}
399
+
}
400
+
401
+
bundleNum, err := s.bundleManager.CreateBundleFromMempool(ctx, ops, cursor)
392
402
if err != nil {
393
-
return 0, nil, err
403
+
return 0, nil, "", err
394
404
}
395
405
396
406
if err := s.db.DeleteFromMempool(ctx, ids[:len(ops)]); err != nil {
397
-
return 0, nil, err
407
+
return 0, nil, "", err
398
408
}
399
409
400
-
return bundleNum, ops, nil
410
+
return bundleNum, ops, cursor, nil
401
411
}
402
412
403
413
func (s *Scanner) deduplicateMempool(mempoolOps []storage.MempoolOperation) ([]PLCOperation, []int64) {
+2
internal/storage/db.go
+2
internal/storage/db.go
···
39
39
DeleteFromMempool(ctx context.Context, ids []int64) error
40
40
GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error)
41
41
GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error)
42
+
GetMempoolUniqueDIDCount(ctx context.Context) (int, error)
43
+
GetMempoolUncompressedSize(ctx context.Context) (int64, error)
42
44
43
45
// Metrics
44
46
StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
+45
-14
internal/storage/sqlite.go
+45
-14
internal/storage/sqlite.go
···
92
92
start_time TIMESTAMP NOT NULL,
93
93
end_time TIMESTAMP NOT NULL,
94
94
dids TEXT NOT NULL,
95
-
hash TEXT NOT NULL, -- SHA256 of uncompressed JSONL
96
-
compressed_hash TEXT NOT NULL, -- SHA256 of compressed file
97
-
compressed_size INTEGER NOT NULL, -- Size of compressed file
98
-
prev_bundle_hash TEXT, -- Chain link
95
+
hash TEXT NOT NULL,
96
+
compressed_hash TEXT NOT NULL,
97
+
compressed_size INTEGER NOT NULL,
98
+
uncompressed_size INTEGER NOT NULL, -- NEW
99
+
cursor TEXT, -- NEW
100
+
prev_bundle_hash TEXT,
99
101
compressed BOOLEAN DEFAULT 1,
100
102
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
101
103
);
···
126
128
// GetBundleByNumber
127
129
func (s *SQLiteDB) GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) {
128
130
query := `
129
-
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, compressed_size, prev_bundle_hash, compressed, created_at
131
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
132
+
compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
130
133
FROM plc_bundles
131
134
WHERE bundle_number = ?
132
135
`
···
134
137
var bundle PLCBundle
135
138
var didsJSON string
136
139
var prevHash sql.NullString
140
+
var cursor sql.NullString
137
141
138
142
err := s.db.QueryRowContext(ctx, query, bundleNumber).Scan(
139
143
&bundle.BundleNumber, &bundle.StartTime, &bundle.EndTime,
140
144
&didsJSON, &bundle.Hash, &bundle.CompressedHash,
141
-
&bundle.CompressedSize, &prevHash, &bundle.Compressed, &bundle.CreatedAt,
145
+
&bundle.CompressedSize, &bundle.UncompressedSize, &cursor,
146
+
&prevHash, &bundle.Compressed, &bundle.CreatedAt,
142
147
)
143
148
if err != nil {
144
149
return nil, err
···
146
151
147
152
if prevHash.Valid {
148
153
bundle.PrevBundleHash = prevHash.String
154
+
}
155
+
if cursor.Valid {
156
+
bundle.Cursor = cursor.String
149
157
}
150
158
151
159
json.Unmarshal([]byte(didsJSON), &bundle.DIDs)
···
341
349
query := `
342
350
INSERT INTO plc_bundles (
343
351
bundle_number, start_time, end_time, dids,
344
-
hash, compressed_hash, compressed_size, prev_bundle_hash, compressed
352
+
hash, compressed_hash, compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed
345
353
)
346
-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
354
+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
347
355
ON CONFLICT(bundle_number) DO UPDATE SET
348
356
start_time = excluded.start_time,
349
357
end_time = excluded.end_time,
···
351
359
hash = excluded.hash,
352
360
compressed_hash = excluded.compressed_hash,
353
361
compressed_size = excluded.compressed_size,
362
+
uncompressed_size = excluded.uncompressed_size,
363
+
cursor = excluded.cursor,
354
364
prev_bundle_hash = excluded.prev_bundle_hash,
355
365
compressed = excluded.compressed
356
366
`
357
367
_, err = s.db.ExecContext(ctx, query,
358
368
bundle.BundleNumber, bundle.StartTime, bundle.EndTime,
359
369
string(didsJSON), bundle.Hash, bundle.CompressedHash,
360
-
bundle.CompressedSize, bundle.PrevBundleHash, bundle.Compressed,
370
+
bundle.CompressedSize, bundle.UncompressedSize, bundle.Cursor,
371
+
bundle.PrevBundleHash, bundle.Compressed,
361
372
)
362
373
363
374
return err
364
375
}
365
376
377
+
// GetMempoolUniqueDIDCount returns the number of unique DIDs in mempool
378
+
func (s *SQLiteDB) GetMempoolUniqueDIDCount(ctx context.Context) (int, error) {
379
+
query := "SELECT COUNT(DISTINCT did) FROM plc_mempool"
380
+
var count int
381
+
err := s.db.QueryRowContext(ctx, query).Scan(&count)
382
+
return count, err
383
+
}
384
+
385
+
// GetMempoolUncompressedSize returns total uncompressed size of all operations
386
+
func (s *SQLiteDB) GetMempoolUncompressedSize(ctx context.Context) (int64, error) {
387
+
query := "SELECT COALESCE(SUM(LENGTH(operation)), 0) FROM plc_mempool"
388
+
var size int64
389
+
err := s.db.QueryRowContext(ctx, query).Scan(&size)
390
+
return size, err
391
+
}
392
+
366
393
// GetBundles
367
394
func (s *SQLiteDB) GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) {
368
395
query := `
···
457
484
return &bundle, nil
458
485
}
459
486
460
-
// scanBundles - Make sure it reads 10 columns
461
487
func (s *SQLiteDB) scanBundles(rows *sql.Rows) ([]*PLCBundle, error) {
462
488
var bundles []*PLCBundle
463
489
···
465
491
var bundle PLCBundle
466
492
var didsJSON string
467
493
var prevHash sql.NullString
494
+
var cursor sql.NullString
468
495
469
-
// Scan: bundle_number, start_time, end_time, dids, hash, compressed_hash, compressed_size, prev_bundle_hash, compressed, created_at
470
496
if err := rows.Scan(
471
497
&bundle.BundleNumber,
472
498
&bundle.StartTime,
473
499
&bundle.EndTime,
474
500
&didsJSON,
475
-
&bundle.Hash, // Uncompressed hash
476
-
&bundle.CompressedHash, // Compressed hash
477
-
&bundle.CompressedSize, // Compressed size
501
+
&bundle.Hash,
502
+
&bundle.CompressedHash,
503
+
&bundle.CompressedSize,
504
+
&bundle.UncompressedSize,
505
+
&cursor,
478
506
&prevHash,
479
507
&bundle.Compressed,
480
508
&bundle.CreatedAt,
···
484
512
485
513
if prevHash.Valid {
486
514
bundle.PrevBundleHash = prevHash.String
515
+
}
516
+
if cursor.Valid {
517
+
bundle.Cursor = cursor.String
487
518
}
488
519
489
520
json.Unmarshal([]byte(didsJSON), &bundle.DIDs)
+13
-11
internal/storage/types.go
+13
-11
internal/storage/types.go
···
106
106
107
107
// PLCBundle represents a cached bundle of PLC operations
108
108
type PLCBundle struct {
109
-
BundleNumber int
110
-
StartTime time.Time
111
-
EndTime time.Time
112
-
BoundaryCIDs []string
113
-
DIDs []string
114
-
Hash string
115
-
CompressedHash string
116
-
CompressedSize int64
117
-
PrevBundleHash string
118
-
Compressed bool
119
-
CreatedAt time.Time
109
+
BundleNumber int
110
+
StartTime time.Time
111
+
EndTime time.Time
112
+
BoundaryCIDs []string
113
+
DIDs []string
114
+
Hash string
115
+
CompressedHash string
116
+
CompressedSize int64
117
+
UncompressedSize int64 // NEW: uncompressed size
118
+
Cursor string // NEW: PLC cursor used to create this bundle
119
+
PrevBundleHash string
120
+
Compressed bool
121
+
CreatedAt time.Time
120
122
}
121
123
122
124
// GetFilePath returns the computed file path for this bundle