+29
-13
internal/api/handlers.go
+29
-13
internal/api/handlers.go
···
401
401
func (s *Server) handleGetPLCBundleStats(w http.ResponseWriter, r *http.Request) {
402
402
resp := newResponse(w)
403
403
404
-
count, size, err := s.db.GetBundleStats(r.Context())
404
+
count, compressedSize, uncompressedSize, lastBundle, err := s.db.GetBundleStats(r.Context())
405
405
if err != nil {
406
406
resp.error(err.Error(), http.StatusInternalServerError)
407
407
return
408
408
}
409
409
410
410
resp.json(map[string]interface{}{
411
-
"plc_bundle_count": count,
412
-
"total_size": size,
413
-
"total_size_mb": float64(size) / 1024 / 1024,
411
+
"plc_bundle_count": count,
412
+
"last_bundle_number": lastBundle,
413
+
"total_compressed_size": compressedSize,
414
+
"total_compressed_size_mb": float64(compressedSize) / 1024 / 1024,
415
+
"total_compressed_size_gb": float64(compressedSize) / 1024 / 1024 / 1024,
416
+
"total_uncompressed_size": uncompressedSize,
417
+
"total_uncompressed_size_mb": float64(uncompressedSize) / 1024 / 1024,
418
+
"total_uncompressed_size_gb": float64(uncompressedSize) / 1024 / 1024 / 1024,
419
+
"compression_ratio": float64(uncompressedSize) / float64(compressedSize),
414
420
})
415
421
}
416
422
···
679
685
680
686
firstBundle, _ := s.db.GetBundleByNumber(ctx, 1)
681
687
lastBundleData, _ := s.db.GetBundleByNumber(ctx, lastBundle)
682
-
count, size, _ := s.db.GetBundleStats(ctx)
688
+
689
+
// Updated to receive 5 values instead of 3
690
+
count, compressedSize, uncompressedSize, _, err := s.db.GetBundleStats(ctx)
691
+
if err != nil {
692
+
resp.error(err.Error(), http.StatusInternalServerError)
693
+
return
694
+
}
683
695
684
696
resp.json(map[string]interface{}{
685
-
"chain_length": lastBundle,
686
-
"total_bundles": count,
687
-
"total_size_mb": float64(size) / 1024 / 1024,
688
-
"chain_start_time": firstBundle.StartTime,
689
-
"chain_end_time": lastBundleData.EndTime,
690
-
"chain_head_hash": lastBundleData.Hash,
691
-
"first_prev_hash": firstBundle.PrevBundleHash,
692
-
"last_prev_hash": lastBundleData.PrevBundleHash,
697
+
"chain_length": lastBundle,
698
+
"total_bundles": count,
699
+
"total_compressed_size": compressedSize,
700
+
"total_compressed_size_mb": float64(compressedSize) / 1024 / 1024,
701
+
"total_uncompressed_size": uncompressedSize,
702
+
"total_uncompressed_size_mb": float64(uncompressedSize) / 1024 / 1024,
703
+
"compression_ratio": float64(uncompressedSize) / float64(compressedSize),
704
+
"chain_start_time": firstBundle.StartTime,
705
+
"chain_end_time": lastBundleData.EndTime,
706
+
"chain_head_hash": lastBundleData.Hash,
707
+
"first_prev_hash": firstBundle.PrevBundleHash,
708
+
"last_prev_hash": lastBundleData.PrevBundleHash,
693
709
})
694
710
}
695
711
+2
-2
internal/plc/bundle.go
+2
-2
internal/plc/bundle.go
···
506
506
return 0
507
507
}
508
508
509
-
func (bm *BundleManager) GetStats(ctx context.Context) (int64, int64, error) {
509
+
func (bm *BundleManager) GetStats(ctx context.Context) (int64, int64, int64, int64, error) {
510
510
if !bm.enabled {
511
-
return 0, 0, nil
511
+
return 0, 0, 0, 0, nil
512
512
}
513
513
return bm.db.GetBundleStats(ctx)
514
514
}
+1
-1
internal/storage/db.go
+1
-1
internal/storage/db.go
···
28
28
GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error)
29
29
GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error)
30
30
GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error)
31
-
GetBundleStats(ctx context.Context) (int64, int64, error)
31
+
GetBundleStats(ctx context.Context) (count, compressedSize, uncompressedSize, lastBundle int64, err error)
32
32
GetLastBundleNumber(ctx context.Context) (int, error)
33
33
GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error)
34
34
+82
-26
internal/storage/sqlite.go
+82
-26
internal/storage/sqlite.go
···
95
95
hash TEXT NOT NULL,
96
96
compressed_hash TEXT NOT NULL,
97
97
compressed_size INTEGER NOT NULL,
98
-
uncompressed_size INTEGER NOT NULL, -- NEW
99
-
cursor TEXT, -- NEW
98
+
uncompressed_size INTEGER NOT NULL,
99
+
cumulative_compressed_size INTEGER NOT NULL, -- NEW
100
+
cumulative_uncompressed_size INTEGER NOT NULL, -- NEW
101
+
cursor TEXT,
100
102
prev_bundle_hash TEXT,
101
103
compressed BOOLEAN DEFAULT 1,
102
104
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
···
105
107
CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time);
106
108
CREATE INDEX IF NOT EXISTS idx_plc_bundles_hash ON plc_bundles(hash);
107
109
CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash);
110
+
CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC);
108
111
109
112
-- NEW: Mempool for pending operations
110
113
CREATE TABLE IF NOT EXISTS plc_mempool (
···
129
132
func (s *SQLiteDB) GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) {
130
133
query := `
131
134
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
132
-
compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
135
+
compressed_size, uncompressed_size, cumulative_compressed_size,
136
+
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
133
137
FROM plc_bundles
134
138
WHERE bundle_number = ?
135
139
`
···
142
146
err := s.db.QueryRowContext(ctx, query, bundleNumber).Scan(
143
147
&bundle.BundleNumber, &bundle.StartTime, &bundle.EndTime,
144
148
&didsJSON, &bundle.Hash, &bundle.CompressedHash,
145
-
&bundle.CompressedSize, &bundle.UncompressedSize, &cursor,
146
-
&prevHash, &bundle.Compressed, &bundle.CreatedAt,
149
+
&bundle.CompressedSize, &bundle.UncompressedSize,
150
+
&bundle.CumulativeCompressedSize, &bundle.CumulativeUncompressedSize,
151
+
&cursor, &prevHash, &bundle.Compressed, &bundle.CreatedAt,
147
152
)
148
153
if err != nil {
149
154
return nil, err
···
346
351
return err
347
352
}
348
353
354
+
// Calculate cumulative sizes from previous bundle
355
+
if bundle.BundleNumber > 1 {
356
+
prevBundle, err := s.GetBundleByNumber(ctx, bundle.BundleNumber-1)
357
+
if err == nil && prevBundle != nil {
358
+
bundle.CumulativeCompressedSize = prevBundle.CumulativeCompressedSize + bundle.CompressedSize
359
+
bundle.CumulativeUncompressedSize = prevBundle.CumulativeUncompressedSize + bundle.UncompressedSize
360
+
} else {
361
+
// Fallback: this shouldn't happen, but calculate from scratch
362
+
bundle.CumulativeCompressedSize = bundle.CompressedSize
363
+
bundle.CumulativeUncompressedSize = bundle.UncompressedSize
364
+
}
365
+
} else {
366
+
// First bundle
367
+
bundle.CumulativeCompressedSize = bundle.CompressedSize
368
+
bundle.CumulativeUncompressedSize = bundle.UncompressedSize
369
+
}
370
+
349
371
query := `
350
372
INSERT INTO plc_bundles (
351
373
bundle_number, start_time, end_time, dids,
352
-
hash, compressed_hash, compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed
374
+
hash, compressed_hash, compressed_size, uncompressed_size,
375
+
cumulative_compressed_size, cumulative_uncompressed_size,
376
+
cursor, prev_bundle_hash, compressed
353
377
)
354
-
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
378
+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
355
379
ON CONFLICT(bundle_number) DO UPDATE SET
356
380
start_time = excluded.start_time,
357
381
end_time = excluded.end_time,
···
360
384
compressed_hash = excluded.compressed_hash,
361
385
compressed_size = excluded.compressed_size,
362
386
uncompressed_size = excluded.uncompressed_size,
387
+
cumulative_compressed_size = excluded.cumulative_compressed_size,
388
+
cumulative_uncompressed_size = excluded.cumulative_uncompressed_size,
363
389
cursor = excluded.cursor,
364
390
prev_bundle_hash = excluded.prev_bundle_hash,
365
391
compressed = excluded.compressed
···
367
393
_, err = s.db.ExecContext(ctx, query,
368
394
bundle.BundleNumber, bundle.StartTime, bundle.EndTime,
369
395
string(didsJSON), bundle.Hash, bundle.CompressedHash,
370
-
bundle.CompressedSize, bundle.UncompressedSize, bundle.Cursor,
371
-
bundle.PrevBundleHash, bundle.Compressed,
396
+
bundle.CompressedSize, bundle.UncompressedSize,
397
+
bundle.CumulativeCompressedSize, bundle.CumulativeUncompressedSize,
398
+
bundle.Cursor, bundle.PrevBundleHash, bundle.Compressed,
372
399
)
373
400
374
401
return err
···
393
420
// GetBundles
394
421
func (s *SQLiteDB) GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) {
395
422
query := `
396
-
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, compressed_size, prev_bundle_hash, compressed, created_at
423
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
424
+
compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
397
425
FROM plc_bundles
398
426
ORDER BY bundle_number DESC
399
427
LIMIT ?
···
411
439
// GetBundlesForDID
412
440
func (s *SQLiteDB) GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) {
413
441
query := `
414
-
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, compressed_size, prev_bundle_hash, compressed, created_at
442
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
443
+
compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
415
444
FROM plc_bundles
416
445
WHERE EXISTS (
417
446
SELECT 1 FROM json_each(dids)
···
436
465
437
466
if afterTime.IsZero() {
438
467
query = `
439
-
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, compressed_size, prev_bundle_hash, compressed, created_at
468
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
469
+
compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
440
470
FROM plc_bundles
441
471
ORDER BY start_time ASC
442
472
LIMIT 1
···
444
474
args = []interface{}{}
445
475
} else {
446
476
query = `
447
-
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, compressed_size, prev_bundle_hash, compressed, created_at
477
+
SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash,
478
+
compressed_size, uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
448
479
FROM plc_bundles
449
480
WHERE start_time >= ?
450
481
ORDER BY start_time ASC
···
456
487
var bundle PLCBundle
457
488
var didsJSON string
458
489
var prevHash sql.NullString
490
+
var cursor sql.NullString
459
491
460
492
err := s.db.QueryRowContext(ctx, query, args...).Scan(
461
493
&bundle.BundleNumber,
462
494
&bundle.StartTime,
463
495
&bundle.EndTime,
464
496
&didsJSON,
465
-
&bundle.Hash, // Uncompressed hash
466
-
&bundle.CompressedHash, // Compressed hash
467
-
&bundle.CompressedSize, // Compressed size (not FileSize!)
468
-
&prevHash, // Previous bundle hash
497
+
&bundle.Hash,
498
+
&bundle.CompressedHash,
499
+
&bundle.CompressedSize,
500
+
&bundle.UncompressedSize,
501
+
&cursor,
502
+
&prevHash,
469
503
&bundle.Compressed,
470
504
&bundle.CreatedAt,
471
505
)
···
479
513
if prevHash.Valid {
480
514
bundle.PrevBundleHash = prevHash.String
481
515
}
516
+
if cursor.Valid {
517
+
bundle.Cursor = cursor.String
518
+
}
482
519
483
520
json.Unmarshal([]byte(didsJSON), &bundle.DIDs)
484
521
return &bundle, nil
···
502
539
&bundle.CompressedHash,
503
540
&bundle.CompressedSize,
504
541
&bundle.UncompressedSize,
542
+
&bundle.CumulativeCompressedSize,
543
+
&bundle.CumulativeUncompressedSize,
505
544
&cursor,
506
545
&prevHash,
507
546
&bundle.Compressed,
···
524
563
return bundles, rows.Err()
525
564
}
526
565
527
-
// GetBundleStats - update to use compressed_size
528
-
func (s *SQLiteDB) GetBundleStats(ctx context.Context) (int64, int64, error) {
529
-
query := `
530
-
SELECT COUNT(*), COALESCE(SUM(compressed_size), 0)
531
-
FROM plc_bundles
532
-
`
566
+
func (s *SQLiteDB) GetBundleStats(ctx context.Context) (int64, int64, int64, int64, error) {
567
+
// Get count and last bundle number
568
+
var count, lastBundleNum int64
569
+
err := s.db.QueryRowContext(ctx, `
570
+
SELECT COUNT(*), COALESCE(MAX(bundle_number), 0)
571
+
FROM plc_bundles
572
+
`).Scan(&count, &lastBundleNum)
573
+
if err != nil {
574
+
return 0, 0, 0, 0, err
575
+
}
533
576
534
-
var count, totalSize int64
535
-
err := s.db.QueryRowContext(ctx, query).Scan(&count, &totalSize)
536
-
return count, totalSize, err
577
+
if lastBundleNum == 0 {
578
+
return 0, 0, 0, 0, nil
579
+
}
580
+
581
+
// Get cumulative sizes from last bundle (O(1) with index!)
582
+
var compressedSize, uncompressedSize int64
583
+
err = s.db.QueryRowContext(ctx, `
584
+
SELECT cumulative_compressed_size, cumulative_uncompressed_size
585
+
FROM plc_bundles
586
+
WHERE bundle_number = ?
587
+
`, lastBundleNum).Scan(&compressedSize, &uncompressedSize)
588
+
if err != nil {
589
+
return 0, 0, 0, 0, err
590
+
}
591
+
592
+
return count, compressedSize, uncompressedSize, lastBundleNum, nil
537
593
}
538
594
539
595
// UpsertEndpoint inserts or updates an endpoint
+15
-13
internal/storage/types.go
+15
-13
internal/storage/types.go
···
106
106
107
107
// PLCBundle represents a cached bundle of PLC operations
108
108
type PLCBundle struct {
109
-
BundleNumber int
110
-
StartTime time.Time
111
-
EndTime time.Time
112
-
BoundaryCIDs []string
113
-
DIDs []string
114
-
Hash string
115
-
CompressedHash string
116
-
CompressedSize int64
117
-
UncompressedSize int64 // NEW: uncompressed size
118
-
Cursor string // NEW: PLC cursor used to create this bundle
119
-
PrevBundleHash string
120
-
Compressed bool
121
-
CreatedAt time.Time
109
+
BundleNumber int
110
+
StartTime time.Time
111
+
EndTime time.Time
112
+
BoundaryCIDs []string
113
+
DIDs []string
114
+
Hash string
115
+
CompressedHash string
116
+
CompressedSize int64
117
+
UncompressedSize int64
118
+
CumulativeCompressedSize int64
119
+
CumulativeUncompressedSize int64
120
+
Cursor string
121
+
PrevBundleHash string
122
+
Compressed bool
123
+
CreatedAt time.Time
122
124
}
123
125
124
126
// GetFilePath returns the computed file path for this bundle