+4
-1
Makefile
+4
-1
Makefile
+9
-2
cmd/atscanner.go
+9
-2
cmd/atscanner.go
···
87
87
// Initialize workers
88
88
log.Info("Initializing scanners...")
89
89
90
-
plcScanner := plc.NewScanner(db, cfg.PLC)
90
+
bundleManager, err := plc.NewBundleManager(cfg.PLC.BundleDir, cfg.PLC.DirectoryURL, db, cfg.PLC.IndexDIDs)
91
+
if err != nil {
92
+
log.Fatal("Failed to create bundle manager: %v", err)
93
+
}
94
+
defer bundleManager.Close()
95
+
log.Verbose("✓ Bundle manager initialized (shared)")
96
+
97
+
plcScanner := plc.NewScanner(db, cfg.PLC, bundleManager)
91
98
defer plcScanner.Close()
92
99
log.Verbose("✓ PLC scanner initialized")
93
100
···
114
121
115
122
// Start API server
116
123
log.Info("Starting API server on %s:%d...", cfg.API.Host, cfg.API.Port)
117
-
apiServer := api.NewServer(db, cfg.API, cfg.PLC)
124
+
apiServer := api.NewServer(db, cfg.API, cfg.PLC, bundleManager)
118
125
go func() {
119
126
if err := apiServer.Start(); err != nil {
120
127
log.Fatal("API server error: %v", err)
+1
-1
go.mod
+1
-1
go.mod
···
12
12
13
13
require (
14
14
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
15
+
github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7
15
16
github.com/gorilla/handlers v1.5.2
16
17
github.com/jackc/pgx/v5 v5.7.6
17
18
)
18
19
19
20
require (
20
-
github.com/atscan/plcbundle v0.0.0-20251027193653-3678d57c1dee // indirect
21
21
github.com/felixge/httpsnoop v1.0.3 // indirect
22
22
github.com/jackc/pgpassfile v1.0.0 // indirect
23
23
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
+2
-4
go.sum
+2
-4
go.sum
···
1
1
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
2
2
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
3
-
github.com/atscan/plcbundle v0.0.0-20251027192009-9350d30fd185 h1:E/fQ1jsaydY6x5JRv+gBiMZVHxKEGD4cK+JxZUZuskU=
4
-
github.com/atscan/plcbundle v0.0.0-20251027192009-9350d30fd185/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs=
5
-
github.com/atscan/plcbundle v0.0.0-20251027193653-3678d57c1dee h1:wepjgNZxBJGuWmVpplG2BTcoICGafaHALiQoXJV1Iwk=
6
-
github.com/atscan/plcbundle v0.0.0-20251027193653-3678d57c1dee/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs=
3
+
github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7 h1:u5mCzLGQPSThUPjQnAn64xs3ZWuPltKpua1M+bMxtww=
4
+
github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs=
7
5
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
8
6
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9
7
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+199
-426
internal/api/handlers.go
+199
-426
internal/api/handlers.go
···
2
2
3
3
import (
4
4
"context"
5
-
"crypto/sha256"
6
5
"database/sql"
7
-
"encoding/hex"
8
6
"encoding/json"
9
7
"fmt"
8
+
"io"
10
9
"net/http"
11
-
"os"
12
-
"path/filepath"
13
10
"strconv"
14
11
"strings"
15
12
"time"
···
18
15
"github.com/atscan/atscanner/internal/monitor"
19
16
"github.com/atscan/atscanner/internal/plc"
20
17
"github.com/atscan/atscanner/internal/storage"
18
+
"github.com/atscan/plcbundle"
21
19
"github.com/gorilla/mux"
22
20
)
23
21
···
40
38
http.Error(r.w, msg, code)
41
39
}
42
40
43
-
func (r *response) bundleHeaders(bundle *storage.PLCBundle) {
41
+
func (r *response) bundleHeaders(bundle *plcbundle.BundleMetadata) {
44
42
r.w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", bundle.BundleNumber))
45
43
r.w.Header().Set("X-Bundle-Hash", bundle.Hash)
46
44
r.w.Header().Set("X-Bundle-Compressed-Hash", bundle.CompressedHash)
···
77
75
78
76
// ===== FORMATTING HELPERS =====
79
77
80
-
func formatBundleResponse(bundle *storage.PLCBundle) map[string]interface{} {
78
+
func formatBundleResponse(bundle *plcbundle.BundleMetadata) map[string]interface{} {
81
79
return map[string]interface{}{
82
80
"plc_bundle_number": bundle.BundleNumber,
83
81
"start_time": bundle.StartTime,
···
703
701
return
704
702
}
705
703
706
-
lastBundle, err := s.db.GetLastBundleNumber(ctx)
707
-
if err != nil {
708
-
resp.error(err.Error(), http.StatusInternalServerError)
709
-
return
710
-
}
711
-
704
+
lastBundle := s.bundleManager.GetLastBundleNumber()
712
705
resp.json(map[string]interface{}{
713
706
"total_unique_dids": totalDIDs,
714
707
"last_bundle": lastBundle,
···
719
712
720
713
func (s *Server) handleGetPLCBundle(w http.ResponseWriter, r *http.Request) {
721
714
resp := newResponse(w)
722
-
723
715
bundleNum, err := getBundleNumber(r)
724
716
if err != nil {
725
717
resp.error("invalid bundle number", http.StatusBadRequest)
726
718
return
727
719
}
728
720
729
-
// Try to get existing bundle
730
-
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
731
-
if err == nil {
732
-
// Bundle exists, return it normally
733
-
resp.json(formatBundleResponse(bundle))
734
-
return
735
-
}
736
-
737
-
// Bundle not found - check if it's the next upcoming bundle
738
-
lastBundle, err := s.db.GetLastBundleNumber(r.Context())
721
+
// Get from library's index
722
+
index := s.bundleManager.GetIndex()
723
+
bundleMeta, err := index.GetBundle(bundleNum)
739
724
if err != nil {
740
-
resp.error("bundle not found", http.StatusNotFound)
741
-
return
742
-
}
743
-
744
-
if bundleNum == lastBundle+1 {
745
-
// This is the upcoming bundle - return preview based on mempool
746
-
upcomingBundle, err := s.createUpcomingBundlePreview(r.Context(), r, bundleNum)
747
-
if err != nil {
748
-
resp.error(fmt.Sprintf("failed to create upcoming bundle preview: %v", err), http.StatusInternalServerError)
725
+
// Check if it's upcoming bundle
726
+
lastBundle := index.GetLastBundle()
727
+
if lastBundle != nil && bundleNum == lastBundle.BundleNumber+1 {
728
+
upcomingBundle, err := s.createUpcomingBundlePreview(bundleNum)
729
+
if err != nil {
730
+
resp.error(err.Error(), http.StatusInternalServerError)
731
+
return
732
+
}
733
+
resp.json(upcomingBundle)
749
734
return
750
735
}
751
-
resp.json(upcomingBundle)
736
+
resp.error("bundle not found", http.StatusNotFound)
752
737
return
753
738
}
754
739
755
-
// Not an upcoming bundle, just not found
756
-
resp.error("bundle not found", http.StatusNotFound)
740
+
resp.json(formatBundleMetadata(bundleMeta))
757
741
}
758
742
759
-
func (s *Server) createUpcomingBundlePreview(ctx context.Context, r *http.Request, bundleNum int) (map[string]interface{}, error) {
760
-
// Get mempool stats
761
-
mempoolCount, err := s.db.GetMempoolCount(ctx)
762
-
if err != nil {
763
-
return nil, err
743
+
// Helper to format library's BundleMetadata
744
+
func formatBundleMetadata(meta *plcbundle.BundleMetadata) map[string]interface{} {
745
+
return map[string]interface{}{
746
+
"plc_bundle_number": meta.BundleNumber,
747
+
"start_time": meta.StartTime,
748
+
"end_time": meta.EndTime,
749
+
"operation_count": meta.OperationCount,
750
+
"did_count": meta.DIDCount,
751
+
"hash": meta.Hash,
752
+
"compressed_hash": meta.CompressedHash,
753
+
"compressed_size": meta.CompressedSize,
754
+
"uncompressed_size": meta.UncompressedSize,
755
+
"compression_ratio": float64(meta.UncompressedSize) / float64(meta.CompressedSize),
756
+
"cursor": meta.Cursor,
757
+
"prev_bundle_hash": meta.PrevBundleHash,
758
+
"created_at": meta.CreatedAt,
764
759
}
760
+
}
765
761
766
-
if mempoolCount == 0 {
762
+
func (s *Server) createUpcomingBundlePreview(bundleNum int) (map[string]interface{}, error) {
763
+
// Get mempool stats from library via wrapper
764
+
stats := s.bundleManager.GetMempoolStats()
765
+
766
+
count, ok := stats["count"].(int)
767
+
if !ok || count == 0 {
767
768
return map[string]interface{}{
768
769
"plc_bundle_number": bundleNum,
769
770
"is_upcoming": true,
···
773
774
}, nil
774
775
}
775
776
776
-
// Get first and last operations for time range
777
-
firstOp, err := s.db.GetFirstMempoolOperation(ctx)
778
-
if err != nil {
779
-
return nil, err
777
+
// Build response
778
+
result := map[string]interface{}{
779
+
"plc_bundle_number": bundleNum,
780
+
"is_upcoming": true,
781
+
"status": "filling",
782
+
"operation_count": count,
783
+
"target_operation_count": 10000,
784
+
"progress_percent": float64(count) / 100.0,
785
+
"operations_needed": 10000 - count,
780
786
}
781
787
782
-
lastOp, err := s.db.GetLastMempoolOperation(ctx)
783
-
if err != nil {
784
-
return nil, err
788
+
if count >= 10000 {
789
+
result["status"] = "ready"
785
790
}
786
791
787
-
// Get unique DID count
788
-
uniqueDIDCount, err := s.db.GetMempoolUniqueDIDCount(ctx)
789
-
if err != nil {
790
-
return nil, err
792
+
// Add time range if available
793
+
if firstTime, ok := stats["first_time"]; ok {
794
+
result["start_time"] = firstTime
791
795
}
792
-
793
-
// Get uncompressed size estimate
794
-
uncompressedSize, err := s.db.GetMempoolUncompressedSize(ctx)
795
-
if err != nil {
796
-
return nil, err
796
+
if lastTime, ok := stats["last_time"]; ok {
797
+
result["current_end_time"] = lastTime
797
798
}
798
799
799
-
// Estimate compressed size (typical ratio is ~0.1-0.15 for PLC data)
800
-
estimatedCompressedSize := int64(float64(uncompressedSize) * 0.12)
801
-
802
-
// Calculate completion estimate
803
-
var estimatedCompletionTime *time.Time
804
-
var operationsNeeded int
805
-
var currentRate float64
806
-
807
-
operationsNeeded = plc.BUNDLE_SIZE - mempoolCount
808
-
809
-
if mempoolCount < plc.BUNDLE_SIZE && mempoolCount > 0 {
810
-
timeSpan := lastOp.CreatedAt.Sub(firstOp.CreatedAt).Seconds()
811
-
if timeSpan > 0 {
812
-
currentRate = float64(mempoolCount) / timeSpan
813
-
if currentRate > 0 {
814
-
secondsNeeded := float64(operationsNeeded) / currentRate
815
-
completionTime := time.Now().Add(time.Duration(secondsNeeded) * time.Second)
816
-
estimatedCompletionTime = &completionTime
817
-
}
818
-
}
800
+
// Add size info if available
801
+
if sizeBytes, ok := stats["size_bytes"]; ok {
802
+
result["uncompressed_size"] = sizeBytes
803
+
result["estimated_compressed_size"] = int64(float64(sizeBytes.(int)) * 0.12)
819
804
}
820
805
821
-
// Get previous bundle for cursor context
822
-
var prevBundleHash string
823
-
var cursor string
806
+
// Get previous bundle info
824
807
if bundleNum > 1 {
825
-
prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNum-1)
826
-
if err == nil {
827
-
prevBundleHash = prevBundle.Hash
828
-
cursor = prevBundle.EndTime.Format(time.RFC3339Nano)
829
-
}
830
-
}
831
-
832
-
// Determine bundle status
833
-
status := "filling"
834
-
if mempoolCount >= plc.BUNDLE_SIZE {
835
-
status = "ready"
836
-
}
837
-
838
-
// Build upcoming bundle response
839
-
result := map[string]interface{}{
840
-
"plc_bundle_number": bundleNum,
841
-
"is_upcoming": true,
842
-
"status": status,
843
-
"operation_count": mempoolCount,
844
-
"target_operation_count": plc.BUNDLE_SIZE,
845
-
"progress_percent": float64(mempoolCount) / float64(plc.BUNDLE_SIZE) * 100,
846
-
"operations_needed": operationsNeeded,
847
-
"did_count": uniqueDIDCount,
848
-
"start_time": firstOp.CreatedAt,
849
-
"current_end_time": lastOp.CreatedAt,
850
-
"uncompressed_size": uncompressedSize,
851
-
"estimated_compressed_size": estimatedCompressedSize,
852
-
"compression_ratio": float64(uncompressedSize) / float64(estimatedCompressedSize),
853
-
"prev_bundle_hash": prevBundleHash,
854
-
"cursor": cursor,
855
-
}
856
-
857
-
if estimatedCompletionTime != nil {
858
-
result["estimated_completion_time"] = *estimatedCompletionTime
859
-
result["current_rate_per_second"] = currentRate
860
-
}
861
-
862
-
// Get actual mempool operations if requested (for DIDs list)
863
-
if r.URL.Query().Get("include_dids") == "true" {
864
-
ops, err := s.db.GetMempoolOperations(ctx, plc.BUNDLE_SIZE)
865
-
if err == nil {
866
-
// Extract unique DIDs
867
-
didSet := make(map[string]bool)
868
-
for _, op := range ops {
869
-
didSet[op.DID] = true
870
-
}
871
-
dids := make([]string, 0, len(didSet))
872
-
for did := range didSet {
873
-
dids = append(dids, did)
874
-
}
875
-
result["dids"] = dids
808
+
if prevBundle, err := s.bundleManager.GetBundleMetadata(bundleNum - 1); err == nil {
809
+
result["prev_bundle_hash"] = prevBundle.Hash
810
+
result["cursor"] = prevBundle.EndTime.Format(time.RFC3339Nano)
876
811
}
877
812
}
878
813
···
888
823
return
889
824
}
890
825
891
-
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
826
+
// Get from library
827
+
dids, didCount, err := s.bundleManager.GetDIDsForBundle(r.Context(), bundleNum)
892
828
if err != nil {
893
829
resp.error("bundle not found", http.StatusNotFound)
894
830
return
895
831
}
896
832
897
-
// Query DIDs from dids table instead
898
-
dids, err := s.db.GetDIDsForBundle(r.Context(), bundleNum)
899
-
if err != nil {
900
-
resp.error(fmt.Sprintf("failed to get DIDs: %v", err), http.StatusInternalServerError)
901
-
return
902
-
}
903
-
904
833
resp.json(map[string]interface{}{
905
-
"plc_bundle_number": bundle.BundleNumber,
906
-
"did_count": bundle.DIDCount,
834
+
"plc_bundle_number": bundleNum,
835
+
"did_count": didCount,
907
836
"dids": dids,
908
837
})
909
838
}
···
919
848
920
849
compressed := r.URL.Query().Get("compressed") != "false"
921
850
922
-
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
851
+
bundle, err := s.bundleManager.GetBundleMetadata(bundleNum)
923
852
if err == nil {
924
853
// Bundle exists, serve it normally
925
854
resp.bundleHeaders(bundle)
···
933
862
}
934
863
935
864
// Bundle not found - check if it's the upcoming bundle
936
-
lastBundle, err := s.db.GetLastBundleNumber(r.Context())
937
-
if err != nil {
938
-
resp.error("bundle not found", http.StatusNotFound)
939
-
return
940
-
}
941
-
865
+
lastBundle := s.bundleManager.GetLastBundleNumber()
942
866
if bundleNum == lastBundle+1 {
943
867
// This is the upcoming bundle - serve from mempool
944
-
s.serveUpcomingBundle(w, r, bundleNum)
868
+
s.serveUpcomingBundle(w, bundleNum)
945
869
return
946
870
}
947
871
···
949
873
resp.error("bundle not found", http.StatusNotFound)
950
874
}
951
875
952
-
func (s *Server) serveUpcomingBundle(w http.ResponseWriter, r *http.Request, bundleNum int) {
953
-
ctx := r.Context()
876
+
func (s *Server) serveUpcomingBundle(w http.ResponseWriter, bundleNum int) {
877
+
// Get mempool stats
878
+
stats := s.bundleManager.GetMempoolStats()
879
+
count, ok := stats["count"].(int)
954
880
955
-
// Get mempool count
956
-
mempoolCount, err := s.db.GetMempoolCount(ctx)
957
-
if err != nil {
958
-
http.Error(w, fmt.Sprintf("failed to get mempool count: %v", err), http.StatusInternalServerError)
959
-
return
960
-
}
961
-
962
-
if mempoolCount == 0 {
881
+
if !ok || count == 0 {
963
882
http.Error(w, "upcoming bundle is empty (no operations in mempool)", http.StatusNotFound)
964
883
return
965
884
}
966
885
967
-
// Get mempool operations (up to BUNDLE_SIZE)
968
-
mempoolOps, err := s.db.GetMempoolOperations(ctx, plc.BUNDLE_SIZE)
886
+
// Get operations from mempool
887
+
ops, err := s.bundleManager.GetMempoolOperations()
969
888
if err != nil {
970
889
http.Error(w, fmt.Sprintf("failed to get mempool operations: %v", err), http.StatusInternalServerError)
971
890
return
972
891
}
973
892
974
-
if len(mempoolOps) == 0 {
975
-
http.Error(w, "upcoming bundle is empty", http.StatusNotFound)
893
+
if len(ops) == 0 {
894
+
http.Error(w, "no operations in mempool", http.StatusNotFound)
976
895
return
977
896
}
978
897
979
-
// Get time range
980
-
firstOp := mempoolOps[0]
981
-
lastOp := mempoolOps[len(mempoolOps)-1]
898
+
// Calculate times
899
+
firstOp := ops[0]
900
+
lastOp := ops[len(ops)-1]
982
901
983
902
// Extract unique DIDs
984
903
didSet := make(map[string]bool)
985
-
for _, op := range mempoolOps {
904
+
for _, op := range ops {
986
905
didSet[op.DID] = true
987
906
}
988
907
908
+
// Calculate uncompressed size
909
+
uncompressedSize := int64(0)
910
+
for _, op := range ops {
911
+
uncompressedSize += int64(len(op.RawJSON)) + 1 // +1 for newline
912
+
}
913
+
989
914
// Get previous bundle hash
990
915
prevBundleHash := ""
991
916
if bundleNum > 1 {
992
-
if prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNum-1); err == nil {
917
+
if prevBundle, err := s.bundleManager.GetBundleMetadata(bundleNum - 1); err == nil {
993
918
prevBundleHash = prevBundle.Hash
994
919
}
995
920
}
996
921
997
-
// Serialize operations to JSONL
998
-
var buf []byte
999
-
for _, mop := range mempoolOps {
1000
-
buf = append(buf, []byte(mop.Operation)...)
1001
-
buf = append(buf, '\n')
1002
-
}
1003
-
1004
-
// Calculate size
1005
-
uncompressedSize := int64(len(buf))
1006
-
1007
922
// Set headers
1008
923
w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", bundleNum))
1009
924
w.Header().Set("X-Bundle-Is-Upcoming", "true")
1010
925
w.Header().Set("X-Bundle-Status", "preview")
1011
926
w.Header().Set("X-Bundle-Start-Time", firstOp.CreatedAt.Format(time.RFC3339Nano))
1012
927
w.Header().Set("X-Bundle-Current-End-Time", lastOp.CreatedAt.Format(time.RFC3339Nano))
1013
-
w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", len(mempoolOps)))
1014
-
w.Header().Set("X-Bundle-Target-Count", fmt.Sprintf("%d", plc.BUNDLE_SIZE))
1015
-
w.Header().Set("X-Bundle-Progress-Percent", fmt.Sprintf("%.2f", float64(len(mempoolOps))/float64(plc.BUNDLE_SIZE)*100))
928
+
w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", len(ops)))
929
+
w.Header().Set("X-Bundle-Target-Count", "10000")
930
+
w.Header().Set("X-Bundle-Progress-Percent", fmt.Sprintf("%.2f", float64(len(ops))/100.0))
1016
931
w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", len(didSet)))
1017
932
w.Header().Set("X-Bundle-Prev-Hash", prevBundleHash)
933
+
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", uncompressedSize))
1018
934
1019
935
w.Header().Set("Content-Type", "application/jsonl")
1020
936
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d-upcoming.jsonl", bundleNum))
1021
-
w.Header().Set("Content-Length", fmt.Sprintf("%d", uncompressedSize))
1022
-
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", uncompressedSize))
1023
937
938
+
// Stream operations as JSONL
1024
939
w.WriteHeader(http.StatusOK)
1025
-
w.Write(buf)
940
+
941
+
for _, op := range ops {
942
+
// Use RawJSON if available (preserves exact format)
943
+
if len(op.RawJSON) > 0 {
944
+
w.Write(op.RawJSON)
945
+
} else {
946
+
// Fallback to marshaling
947
+
data, _ := json.Marshal(op)
948
+
w.Write(data)
949
+
}
950
+
w.Write([]byte("\n"))
951
+
}
1026
952
}
1027
953
1028
-
func (s *Server) serveCompressedBundle(w http.ResponseWriter, r *http.Request, bundle *storage.PLCBundle) {
954
+
func (s *Server) serveCompressedBundle(w http.ResponseWriter, r *http.Request, bundle *plcbundle.BundleMetadata) {
1029
955
resp := newResponse(w)
1030
-
path := bundle.GetFilePath(s.plcBundleDir)
1031
956
1032
-
file, err := os.Open(path)
957
+
// Use the new streaming API for compressed data
958
+
reader, err := s.bundleManager.StreamRaw(r.Context(), bundle.BundleNumber)
1033
959
if err != nil {
1034
-
resp.error("bundle file not found on disk", http.StatusNotFound)
960
+
resp.error(fmt.Sprintf("error streaming compressed bundle: %v", err), http.StatusInternalServerError)
1035
961
return
1036
962
}
1037
-
defer file.Close()
1038
-
1039
-
fileInfo, _ := file.Stat()
963
+
defer reader.Close()
1040
964
1041
965
w.Header().Set("Content-Type", "application/zstd")
1042
966
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl.zst", bundle.BundleNumber))
1043
-
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
1044
-
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", fileInfo.Size()))
967
+
w.Header().Set("Content-Length", fmt.Sprintf("%d", bundle.CompressedSize))
968
+
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", bundle.CompressedSize))
1045
969
1046
-
http.ServeContent(w, r, filepath.Base(path), bundle.CreatedAt, file)
970
+
// Stream the data directly to the response
971
+
w.WriteHeader(http.StatusOK)
972
+
io.Copy(w, reader)
1047
973
}
1048
974
1049
-
func (s *Server) serveUncompressedBundle(w http.ResponseWriter, r *http.Request, bundle *storage.PLCBundle) {
975
+
func (s *Server) serveUncompressedBundle(w http.ResponseWriter, r *http.Request, bundle *plcbundle.BundleMetadata) {
1050
976
resp := newResponse(w)
1051
977
1052
-
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), bundle.BundleNumber)
978
+
// Use the new streaming API for decompressed data
979
+
reader, err := s.bundleManager.StreamDecompressed(r.Context(), bundle.BundleNumber)
1053
980
if err != nil {
1054
-
resp.error(fmt.Sprintf("error loading bundle: %v", err), http.StatusInternalServerError)
981
+
resp.error(fmt.Sprintf("error streaming decompressed bundle: %v", err), http.StatusInternalServerError)
1055
982
return
1056
983
}
1057
-
1058
-
// Serialize to JSONL
1059
-
var buf []byte
1060
-
for _, op := range ops {
1061
-
buf = append(buf, op.RawJSON...)
1062
-
buf = append(buf, '\n')
1063
-
}
1064
-
1065
-
fileInfo, _ := os.Stat(bundle.GetFilePath(s.plcBundleDir))
1066
-
compressedSize := int64(0)
1067
-
if fileInfo != nil {
1068
-
compressedSize = fileInfo.Size()
1069
-
}
984
+
defer reader.Close()
1070
985
1071
986
w.Header().Set("Content-Type", "application/jsonl")
1072
987
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl", bundle.BundleNumber))
1073
-
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(buf)))
1074
-
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", compressedSize))
1075
-
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", len(buf)))
1076
-
if compressedSize > 0 {
1077
-
w.Header().Set("X-Compression-Ratio", fmt.Sprintf("%.2f", float64(len(buf))/float64(compressedSize)))
988
+
w.Header().Set("Content-Length", fmt.Sprintf("%d", bundle.UncompressedSize))
989
+
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", bundle.CompressedSize))
990
+
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", bundle.UncompressedSize))
991
+
if bundle.CompressedSize > 0 {
992
+
w.Header().Set("X-Compression-Ratio", fmt.Sprintf("%.2f", float64(bundle.UncompressedSize)/float64(bundle.CompressedSize)))
1078
993
}
1079
994
995
+
// Stream the data directly to the response
1080
996
w.WriteHeader(http.StatusOK)
1081
-
w.Write(buf)
997
+
io.Copy(w, reader)
1082
998
}
1083
999
1084
1000
func (s *Server) handleGetPLCBundles(w http.ResponseWriter, r *http.Request) {
1085
1001
resp := newResponse(w)
1086
1002
limit := getQueryInt(r, "limit", 50)
1087
1003
1088
-
bundles, err := s.db.GetBundles(r.Context(), limit)
1089
-
if err != nil {
1090
-
resp.error(err.Error(), http.StatusInternalServerError)
1091
-
return
1092
-
}
1004
+
bundles := s.bundleManager.GetBundles(limit)
1093
1005
1094
1006
response := make([]map[string]interface{}, len(bundles))
1095
1007
for i, bundle := range bundles {
···
1102
1014
func (s *Server) handleGetPLCBundleStats(w http.ResponseWriter, r *http.Request) {
1103
1015
resp := newResponse(w)
1104
1016
1105
-
count, compressedSize, uncompressedSize, lastBundle, err := s.db.GetBundleStats(r.Context())
1106
-
if err != nil {
1107
-
resp.error(err.Error(), http.StatusInternalServerError)
1108
-
return
1109
-
}
1017
+
stats := s.bundleManager.GetBundleStats()
1018
+
1019
+
bundleCount := stats["bundle_count"].(int64)
1020
+
totalSize := stats["total_size"].(int64)
1021
+
totalUncompressedSize := stats["total_uncompressed_size"].(int64)
1022
+
lastBundle := stats["last_bundle"].(int64)
1110
1023
1111
1024
resp.json(map[string]interface{}{
1112
-
"plc_bundle_count": count,
1025
+
"plc_bundle_count": bundleCount,
1113
1026
"last_bundle_number": lastBundle,
1114
-
"total_compressed_size": compressedSize,
1115
-
"total_compressed_size_mb": float64(compressedSize) / 1024 / 1024,
1116
-
"total_compressed_size_gb": float64(compressedSize) / 1024 / 1024 / 1024,
1117
-
"total_uncompressed_size": uncompressedSize,
1118
-
"total_uncompressed_size_mb": float64(uncompressedSize) / 1024 / 1024,
1119
-
"total_uncompressed_size_gb": float64(uncompressedSize) / 1024 / 1024 / 1024,
1120
-
"compression_ratio": float64(uncompressedSize) / float64(compressedSize),
1027
+
"total_compressed_size": totalSize,
1028
+
"total_compressed_size_mb": float64(totalSize) / 1024 / 1024,
1029
+
"total_compressed_size_gb": float64(totalSize) / 1024 / 1024 / 1024,
1030
+
"total_uncompressed_size": totalUncompressedSize,
1031
+
"total_uncompressed_size_mb": float64(totalUncompressedSize) / 1024 / 1024,
1032
+
"total_uncompressed_size_gb": float64(totalUncompressedSize) / 1024 / 1024 / 1024,
1033
+
"overall_compression_ratio": float64(totalUncompressedSize) / float64(totalSize),
1121
1034
})
1122
1035
}
1123
1036
···
1125
1038
1126
1039
func (s *Server) handleGetMempoolStats(w http.ResponseWriter, r *http.Request) {
1127
1040
resp := newResponse(w)
1128
-
ctx := r.Context()
1129
1041
1130
-
count, err := s.db.GetMempoolCount(ctx)
1131
-
if err != nil {
1132
-
resp.error(err.Error(), http.StatusInternalServerError)
1133
-
return
1134
-
}
1042
+
// Get stats from library's mempool via wrapper method
1043
+
stats := s.bundleManager.GetMempoolStats()
1135
1044
1136
-
uniqueDIDCount, err := s.db.GetMempoolUniqueDIDCount(ctx)
1137
-
if err != nil {
1138
-
resp.error(err.Error(), http.StatusInternalServerError)
1139
-
return
1045
+
// Convert to API response format
1046
+
result := map[string]interface{}{
1047
+
"operation_count": stats["count"],
1048
+
"can_create_bundle": stats["can_create_bundle"],
1140
1049
}
1141
1050
1142
-
uncompressedSize, err := s.db.GetMempoolUncompressedSize(ctx)
1143
-
if err != nil {
1144
-
resp.error(err.Error(), http.StatusInternalServerError)
1145
-
return
1051
+
// Add size information
1052
+
if sizeBytes, ok := stats["size_bytes"]; ok {
1053
+
result["uncompressed_size"] = sizeBytes
1054
+
result["uncompressed_size_mb"] = float64(sizeBytes.(int)) / 1024 / 1024
1146
1055
}
1147
1056
1148
-
result := map[string]interface{}{
1149
-
"operation_count": count,
1150
-
"unique_did_count": uniqueDIDCount,
1151
-
"uncompressed_size": uncompressedSize,
1152
-
"uncompressed_size_mb": float64(uncompressedSize) / 1024 / 1024,
1153
-
"can_create_bundle": count >= plc.BUNDLE_SIZE,
1154
-
}
1057
+
// Add time range and calculate estimated completion
1058
+
if count, ok := stats["count"].(int); ok && count > 0 {
1059
+
if firstTime, ok := stats["first_time"].(time.Time); ok {
1060
+
result["mempool_start_time"] = firstTime
1155
1061
1156
-
if count > 0 {
1157
-
if firstOp, err := s.db.GetFirstMempoolOperation(ctx); err == nil && firstOp != nil {
1158
-
result["mempool_start_time"] = firstOp.CreatedAt
1062
+
if lastTime, ok := stats["last_time"].(time.Time); ok {
1063
+
result["mempool_end_time"] = lastTime
1159
1064
1160
-
if count < plc.BUNDLE_SIZE {
1161
-
if lastOp, err := s.db.GetLastMempoolOperation(ctx); err == nil && lastOp != nil {
1162
-
timeSpan := lastOp.CreatedAt.Sub(firstOp.CreatedAt).Seconds()
1065
+
// Calculate estimated next bundle time if not complete
1066
+
if count < 10000 {
1067
+
timeSpan := lastTime.Sub(firstTime).Seconds()
1163
1068
if timeSpan > 0 {
1164
1069
opsPerSecond := float64(count) / timeSpan
1165
1070
if opsPerSecond > 0 {
1166
-
remainingOps := plc.BUNDLE_SIZE - count
1071
+
remainingOps := 10000 - count
1167
1072
secondsNeeded := float64(remainingOps) / opsPerSecond
1168
-
result["estimated_next_bundle_time"] = time.Now().Add(time.Duration(secondsNeeded) * time.Second)
1073
+
estimatedTime := time.Now().Add(time.Duration(secondsNeeded) * time.Second)
1074
+
1075
+
result["estimated_next_bundle_time"] = estimatedTime
1076
+
result["current_rate_per_second"] = opsPerSecond
1169
1077
result["operations_needed"] = remainingOps
1170
-
result["current_rate_per_second"] = opsPerSecond
1171
1078
}
1172
1079
}
1080
+
result["progress_percent"] = float64(count) / 100.0
1081
+
} else {
1082
+
// Ready to create bundle
1083
+
result["estimated_next_bundle_time"] = time.Now()
1084
+
result["operations_needed"] = 0
1173
1085
}
1174
-
} else {
1175
-
result["estimated_next_bundle_time"] = time.Now()
1176
-
result["operations_needed"] = 0
1177
1086
}
1178
1087
}
1179
1088
} else {
1089
+
// Empty mempool
1180
1090
result["mempool_start_time"] = nil
1181
1091
result["estimated_next_bundle_time"] = nil
1182
1092
}
···
1201
1111
1202
1112
// ===== VERIFICATION HANDLERS =====
1203
1113
1204
-
func (s *Server) handleVerifyPLCBundle(w http.ResponseWriter, r *http.Request) {
1205
-
resp := newResponse(w)
1206
-
vars := mux.Vars(r)
1207
-
1208
-
bundleNumber, err := strconv.Atoi(vars["bundleNumber"])
1209
-
if err != nil {
1210
-
resp.error("Invalid bundle number", http.StatusBadRequest)
1211
-
return
1212
-
}
1213
-
1214
-
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNumber)
1215
-
if err != nil {
1216
-
resp.error("Bundle not found", http.StatusNotFound)
1217
-
return
1218
-
}
1219
-
1220
-
// Fetch from PLC and verify
1221
-
remoteOps, prevCIDs, err := s.fetchRemoteBundleOps(r.Context(), bundleNumber)
1222
-
if err != nil {
1223
-
resp.error(fmt.Sprintf("Failed to fetch from PLC directory: %v", err), http.StatusInternalServerError)
1224
-
return
1225
-
}
1226
-
1227
-
remoteHash := computeOperationsHash(remoteOps)
1228
-
verified := bundle.Hash == remoteHash
1229
-
1230
-
resp.json(map[string]interface{}{
1231
-
"bundle_number": bundleNumber,
1232
-
"verified": verified,
1233
-
"local_hash": bundle.Hash,
1234
-
"remote_hash": remoteHash,
1235
-
"local_op_count": plc.BUNDLE_SIZE,
1236
-
"remote_op_count": len(remoteOps),
1237
-
"boundary_cids_used": len(prevCIDs),
1238
-
})
1239
-
}
1240
-
1241
-
func (s *Server) fetchRemoteBundleOps(ctx context.Context, bundleNum int) ([]plc.PLCOperation, map[string]bool, error) {
1242
-
var after string
1243
-
var prevBoundaryCIDs map[string]bool
1244
-
1245
-
if bundleNum > 1 {
1246
-
prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNum-1)
1247
-
if err != nil {
1248
-
return nil, nil, fmt.Errorf("failed to get previous bundle: %w", err)
1249
-
}
1250
-
1251
-
after = prevBundle.EndTime.Format("2006-01-02T15:04:05.000Z")
1252
-
1253
-
if len(prevBundle.BoundaryCIDs) > 0 {
1254
-
prevBoundaryCIDs = make(map[string]bool)
1255
-
for _, cid := range prevBundle.BoundaryCIDs {
1256
-
prevBoundaryCIDs[cid] = true
1257
-
}
1258
-
}
1259
-
}
1260
-
1261
-
var allRemoteOps []plc.PLCOperation
1262
-
seenCIDs := make(map[string]bool)
1263
-
1264
-
for cid := range prevBoundaryCIDs {
1265
-
seenCIDs[cid] = true
1266
-
}
1267
-
1268
-
currentAfter := after
1269
-
maxFetches := 20
1270
-
1271
-
for fetchNum := 0; fetchNum < maxFetches && len(allRemoteOps) < plc.BUNDLE_SIZE; fetchNum++ {
1272
-
batch, err := s.plcClient.Export(ctx, plc.ExportOptions{
1273
-
Count: 1000,
1274
-
After: currentAfter,
1275
-
})
1276
-
if err != nil || len(batch) == 0 {
1277
-
break
1278
-
}
1279
-
1280
-
for _, op := range batch {
1281
-
if !seenCIDs[op.CID] {
1282
-
seenCIDs[op.CID] = true
1283
-
allRemoteOps = append(allRemoteOps, op)
1284
-
if len(allRemoteOps) >= plc.BUNDLE_SIZE {
1285
-
break
1286
-
}
1287
-
}
1288
-
}
1289
-
1290
-
if len(batch) > 0 {
1291
-
lastOp := batch[len(batch)-1]
1292
-
currentAfter = lastOp.CreatedAt.Format("2006-01-02T15:04:05.000Z")
1293
-
}
1294
-
1295
-
if len(batch) < 1000 {
1296
-
break
1297
-
}
1298
-
}
1299
-
1300
-
if len(allRemoteOps) > plc.BUNDLE_SIZE {
1301
-
allRemoteOps = allRemoteOps[:plc.BUNDLE_SIZE]
1302
-
}
1303
-
1304
-
return allRemoteOps, prevBoundaryCIDs, nil
1305
-
}
1306
-
1307
1114
func (s *Server) handleVerifyChain(w http.ResponseWriter, r *http.Request) {
1308
1115
resp := newResponse(w)
1309
-
ctx := r.Context()
1310
1116
1311
-
lastBundle, err := s.db.GetLastBundleNumber(ctx)
1312
-
if err != nil {
1313
-
resp.error(err.Error(), http.StatusInternalServerError)
1314
-
return
1315
-
}
1316
-
1117
+
lastBundle := s.bundleManager.GetLastBundleNumber()
1317
1118
if lastBundle == 0 {
1318
1119
resp.json(map[string]interface{}{
1319
1120
"status": "empty",
···
1327
1128
var errorMsg string
1328
1129
1329
1130
for i := 1; i <= lastBundle; i++ {
1330
-
bundle, err := s.db.GetBundleByNumber(ctx, i)
1131
+
bundle, err := s.bundleManager.GetBundleMetadata(i)
1331
1132
if err != nil {
1332
1133
valid = false
1333
1134
brokenAt = i
···
1336
1137
}
1337
1138
1338
1139
if i > 1 {
1339
-
prevBundle, err := s.db.GetBundleByNumber(ctx, i-1)
1140
+
prevBundle, err := s.bundleManager.GetBundleMetadata(i - 1)
1340
1141
if err != nil {
1341
1142
valid = false
1342
1143
brokenAt = i
···
1368
1169
1369
1170
func (s *Server) handleGetChainInfo(w http.ResponseWriter, r *http.Request) {
1370
1171
resp := newResponse(w)
1371
-
ctx := r.Context()
1372
1172
1373
-
lastBundle, err := s.db.GetLastBundleNumber(ctx)
1374
-
if err != nil {
1375
-
resp.error(err.Error(), http.StatusInternalServerError)
1376
-
return
1377
-
}
1378
-
1173
+
lastBundle := s.bundleManager.GetLastBundleNumber()
1379
1174
if lastBundle == 0 {
1380
1175
resp.json(map[string]interface{}{
1381
1176
"chain_length": 0,
···
1384
1179
return
1385
1180
}
1386
1181
1387
-
firstBundle, _ := s.db.GetBundleByNumber(ctx, 1)
1388
-
lastBundleData, _ := s.db.GetBundleByNumber(ctx, lastBundle)
1389
-
1390
-
// Updated to receive 5 values instead of 3
1391
-
count, compressedSize, uncompressedSize, _, err := s.db.GetBundleStats(ctx)
1392
-
if err != nil {
1393
-
resp.error(err.Error(), http.StatusInternalServerError)
1394
-
return
1395
-
}
1182
+
firstBundle, _ := s.bundleManager.GetBundleMetadata(1)
1183
+
lastBundleData, _ := s.bundleManager.GetBundleMetadata(lastBundle)
1184
+
stats := s.bundleManager.GetBundleStats()
1396
1185
1397
1186
resp.json(map[string]interface{}{
1398
-
"chain_length": lastBundle,
1399
-
"total_bundles": count,
1400
-
"total_compressed_size": compressedSize,
1401
-
"total_compressed_size_mb": float64(compressedSize) / 1024 / 1024,
1402
-
"total_uncompressed_size": uncompressedSize,
1403
-
"total_uncompressed_size_mb": float64(uncompressedSize) / 1024 / 1024,
1404
-
"compression_ratio": float64(uncompressedSize) / float64(compressedSize),
1405
-
"chain_start_time": firstBundle.StartTime,
1406
-
"chain_end_time": lastBundleData.EndTime,
1407
-
"chain_head_hash": lastBundleData.Hash,
1408
-
"first_prev_hash": firstBundle.PrevBundleHash,
1409
-
"last_prev_hash": lastBundleData.PrevBundleHash,
1187
+
"chain_length": lastBundle,
1188
+
"total_bundles": stats["bundle_count"],
1189
+
"total_compressed_size": stats["total_size"],
1190
+
"total_compressed_size_mb": float64(stats["total_size"].(int64)) / 1024 / 1024,
1191
+
"chain_start_time": firstBundle.StartTime,
1192
+
"chain_end_time": lastBundleData.EndTime,
1193
+
"chain_head_hash": lastBundleData.Hash,
1194
+
"first_prev_hash": firstBundle.PrevBundleHash,
1195
+
"last_prev_hash": lastBundleData.PrevBundleHash,
1410
1196
})
1411
1197
}
1412
1198
···
1427
1213
return
1428
1214
}
1429
1215
1430
-
startBundle := s.findStartBundle(ctx, afterTime)
1216
+
startBundle := s.findStartBundle(afterTime)
1431
1217
ops := s.collectOperations(ctx, startBundle, afterTime, count)
1432
1218
1433
1219
w.Header().Set("Content-Type", "application/jsonl")
···
1467
1253
return time.Time{}, fmt.Errorf("invalid timestamp format")
1468
1254
}
1469
1255
1470
-
func (s *Server) findStartBundle(ctx context.Context, afterTime time.Time) int {
1256
+
func (s *Server) findStartBundle(afterTime time.Time) int {
1471
1257
if afterTime.IsZero() {
1472
1258
return 1
1473
1259
}
1474
1260
1475
-
foundBundle, err := s.db.GetBundleForTimestamp(ctx, afterTime)
1476
-
if err != nil {
1477
-
return 1
1478
-
}
1479
-
1261
+
foundBundle := s.bundleManager.FindBundleForTimestamp(afterTime)
1480
1262
if foundBundle > 1 {
1481
1263
return foundBundle - 1
1482
1264
}
···
1487
1269
var allOps []plc.PLCOperation
1488
1270
seenCIDs := make(map[string]bool)
1489
1271
1490
-
lastBundle, _ := s.db.GetLastBundleNumber(ctx)
1272
+
lastBundle := s.bundleManager.GetLastBundleNumber()
1491
1273
1492
1274
for bundleNum := startBundle; bundleNum <= lastBundle && len(allOps) < count; bundleNum++ {
1493
1275
ops, err := s.bundleManager.LoadBundleOperations(ctx, bundleNum)
···
1647
1429
limit := getQueryInt(r, "limit", 0)
1648
1430
fromBundle := getQueryInt(r, "from", 1)
1649
1431
1650
-
history, err := s.db.GetPLCHistory(r.Context(), limit, fromBundle)
1432
+
// Use BundleManager instead of database
1433
+
history, err := s.bundleManager.GetPLCHistory(r.Context(), limit, fromBundle)
1651
1434
if err != nil {
1652
1435
resp.error(err.Error(), http.StatusInternalServerError)
1653
1436
return
···
1720
1503
}
1721
1504
1722
1505
// ===== UTILITY FUNCTIONS =====
1723
-
1724
-
func computeOperationsHash(ops []plc.PLCOperation) string {
1725
-
var jsonlData []byte
1726
-
for _, op := range ops {
1727
-
jsonlData = append(jsonlData, op.RawJSON...)
1728
-
jsonlData = append(jsonlData, '\n')
1729
-
}
1730
-
hash := sha256.Sum256(jsonlData)
1731
-
return hex.EncodeToString(hash[:])
1732
-
}
1733
1506
1734
1507
func normalizeEndpoint(endpoint string) string {
1735
1508
endpoint = strings.TrimPrefix(endpoint, "https://")
+2
-9
internal/api/server.go
+2
-9
internal/api/server.go
···
18
18
router *mux.Router
19
19
server *http.Server
20
20
db storage.Database
21
-
plcClient *plc.Client
22
21
plcBundleDir string
23
22
bundleManager *plc.BundleManager
24
23
plcIndexDIDs bool
25
24
}
26
25
27
-
func NewServer(db storage.Database, apiCfg config.APIConfig, plcCfg config.PLCConfig) *Server {
28
-
bundleManager, err := plc.NewBundleManager(plcCfg.BundleDir, plcCfg.DirectoryURL, db, plcCfg.IndexDIDs)
29
-
if err != nil {
30
-
log.Fatal("Failed to create bundle manager: %v", err)
31
-
}
32
-
26
+
func NewServer(db storage.Database, apiCfg config.APIConfig, plcCfg config.PLCConfig, bundleManager *plc.BundleManager) *Server {
33
27
s := &Server{
34
28
router: mux.NewRouter(),
35
29
db: db,
36
30
plcBundleDir: plcCfg.BundleDir,
37
-
bundleManager: bundleManager,
31
+
bundleManager: bundleManager, // Use provided shared instance
38
32
plcIndexDIDs: plcCfg.IndexDIDs,
39
33
}
40
34
···
90
84
api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET")
91
85
api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET")
92
86
api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET")
93
-
api.HandleFunc("/plc/bundles/{bundleNumber}/verify", s.handleVerifyPLCBundle).Methods("POST")
94
87
95
88
// PLC history/metrics
96
89
api.HandleFunc("/plc/history", s.handleGetPLCHistory).Methods("GET")
+252
-38
internal/plc/manager.go
+252
-38
internal/plc/manager.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"io"
7
+
"sort"
6
8
"time"
7
9
8
10
"github.com/atscan/atscanner/internal/log"
···
62
64
return bm.libManager.LoadBundle(ctx, bundleNum)
63
65
}
64
66
65
-
// FetchAndSaveBundle fetches next bundle from PLC and saves to both disk and DB
67
+
// FetchAndSaveBundle fetches next bundle from PLC and saves
66
68
func (bm *BundleManager) FetchAndSaveBundle(ctx context.Context) (*plcbundle.Bundle, error) {
67
69
// Fetch from PLC using library
68
70
bundle, err := bm.libManager.FetchNextBundle(ctx)
···
70
72
return nil, err
71
73
}
72
74
73
-
// Save to disk (library)
75
+
// Save to disk (library handles this)
74
76
if err := bm.libManager.SaveBundle(ctx, bundle); err != nil {
75
77
return nil, fmt.Errorf("failed to save bundle to disk: %w", err)
76
78
}
77
79
78
-
// Save to database
79
-
if err := bm.saveBundleToDatabase(ctx, bundle); err != nil {
80
-
return nil, fmt.Errorf("failed to save bundle to database: %w", err)
81
-
}
82
-
83
-
log.Info("✓ Saved bundle %06d (disk + database)", bundle.BundleNumber)
84
-
85
-
return bundle, nil
86
-
}
87
-
88
-
// saveBundleToDatabase saves bundle metadata to PostgreSQL
89
-
func (bm *BundleManager) saveBundleToDatabase(ctx context.Context, bundle *plcbundle.Bundle) error {
90
-
// Convert library bundle to storage bundle
91
-
dbBundle := &storage.PLCBundle{
92
-
BundleNumber: bundle.BundleNumber,
93
-
StartTime: bundle.StartTime,
94
-
EndTime: bundle.EndTime,
95
-
DIDCount: bundle.DIDCount,
96
-
Hash: bundle.Hash,
97
-
CompressedHash: bundle.CompressedHash,
98
-
CompressedSize: bundle.CompressedSize,
99
-
UncompressedSize: bundle.UncompressedSize,
100
-
Cursor: bundle.Cursor,
101
-
PrevBundleHash: bundle.PrevBundleHash,
102
-
Compressed: bundle.Compressed,
103
-
CreatedAt: bundle.CreatedAt,
104
-
}
105
-
106
-
// Save to database
107
-
if err := bm.db.CreateBundle(ctx, dbBundle); err != nil {
108
-
return err
109
-
}
110
-
111
-
// Index DIDs if enabled
80
+
// Index DIDs if enabled (still use database for this)
112
81
if bm.indexDIDs && len(bundle.Operations) > 0 {
113
82
if err := bm.indexBundleDIDs(ctx, bundle); err != nil {
114
83
log.Error("Failed to index DIDs for bundle %d: %v", bundle.BundleNumber, err)
115
-
// Don't fail the entire operation
116
84
}
117
85
}
118
86
119
-
return nil
87
+
log.Info("✓ Saved bundle %06d", bundle.BundleNumber)
88
+
89
+
return bundle, nil
120
90
}
121
91
122
92
// indexBundleDIDs indexes DIDs from a bundle into the database
···
171
141
func (bm *BundleManager) GetChainInfo(ctx context.Context) (map[string]interface{}, error) {
172
142
return bm.libManager.GetInfo(), nil
173
143
}
144
+
145
+
// GetMempoolStats returns mempool statistics from the library
146
+
func (bm *BundleManager) GetMempoolStats() map[string]interface{} {
147
+
return bm.libManager.GetMempoolStats()
148
+
}
149
+
150
+
// GetMempoolOperations returns all operations currently in mempool
151
+
func (bm *BundleManager) GetMempoolOperations() ([]PLCOperation, error) {
152
+
return bm.libManager.GetMempoolOperations()
153
+
}
154
+
155
+
// GetIndex returns the library's bundle index
156
+
func (bm *BundleManager) GetIndex() *plcbundle.Index {
157
+
return bm.libManager.GetIndex()
158
+
}
159
+
160
+
// GetLastBundleNumber returns the last bundle number
161
+
func (bm *BundleManager) GetLastBundleNumber() int {
162
+
index := bm.libManager.GetIndex()
163
+
lastBundle := index.GetLastBundle()
164
+
if lastBundle == nil {
165
+
return 0
166
+
}
167
+
return lastBundle.BundleNumber
168
+
}
169
+
170
+
// GetBundleMetadata gets bundle metadata by number
171
+
func (bm *BundleManager) GetBundleMetadata(bundleNum int) (*plcbundle.BundleMetadata, error) {
172
+
index := bm.libManager.GetIndex()
173
+
return index.GetBundle(bundleNum)
174
+
}
175
+
176
+
// GetBundles returns the most recent bundles (newest first)
177
+
func (bm *BundleManager) GetBundles(limit int) []*plcbundle.BundleMetadata {
178
+
index := bm.libManager.GetIndex()
179
+
allBundles := index.GetBundles()
180
+
181
+
// Determine how many bundles to return
182
+
count := limit
183
+
if count <= 0 || count > len(allBundles) {
184
+
count = len(allBundles)
185
+
}
186
+
187
+
// Build result in reverse order (newest first)
188
+
result := make([]*plcbundle.BundleMetadata, count)
189
+
for i := 0; i < count; i++ {
190
+
result[i] = allBundles[len(allBundles)-1-i]
191
+
}
192
+
193
+
return result
194
+
}
195
+
196
+
// GetBundleStats returns bundle statistics
197
+
func (bm *BundleManager) GetBundleStats() map[string]interface{} {
198
+
index := bm.libManager.GetIndex()
199
+
stats := index.GetStats()
200
+
201
+
// Convert to expected format
202
+
lastBundle := stats["last_bundle"]
203
+
if lastBundle == nil {
204
+
lastBundle = int64(0)
205
+
}
206
+
207
+
// Calculate total uncompressed size by iterating through all bundles
208
+
totalUncompressedSize := int64(0)
209
+
allBundles := index.GetBundles()
210
+
for _, bundle := range allBundles {
211
+
totalUncompressedSize += bundle.UncompressedSize
212
+
}
213
+
214
+
return map[string]interface{}{
215
+
"bundle_count": int64(stats["bundle_count"].(int)),
216
+
"total_size": stats["total_size"].(int64),
217
+
"total_uncompressed_size": totalUncompressedSize,
218
+
"last_bundle": int64(lastBundle.(int)),
219
+
}
220
+
}
221
+
222
+
// GetDIDsForBundle gets DIDs from a bundle (loads and extracts)
223
+
func (bm *BundleManager) GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, int, error) {
224
+
bundle, err := bm.libManager.LoadBundle(ctx, bundleNum)
225
+
if err != nil {
226
+
return nil, 0, err
227
+
}
228
+
229
+
// Extract unique DIDs
230
+
didSet := make(map[string]bool)
231
+
for _, op := range bundle.Operations {
232
+
didSet[op.DID] = true
233
+
}
234
+
235
+
dids := make([]string, 0, len(didSet))
236
+
for did := range didSet {
237
+
dids = append(dids, did)
238
+
}
239
+
240
+
return dids, bundle.DIDCount, nil
241
+
}
242
+
243
+
// FindBundleForTimestamp finds bundle containing a timestamp
244
+
func (bm *BundleManager) FindBundleForTimestamp(afterTime time.Time) int {
245
+
index := bm.libManager.GetIndex()
246
+
bundles := index.GetBundles()
247
+
248
+
// Find bundle containing this time
249
+
for _, bundle := range bundles {
250
+
if (bundle.StartTime.Before(afterTime) || bundle.StartTime.Equal(afterTime)) &&
251
+
(bundle.EndTime.After(afterTime) || bundle.EndTime.Equal(afterTime)) {
252
+
return bundle.BundleNumber
253
+
}
254
+
}
255
+
256
+
// Return closest bundle before this time
257
+
for i := len(bundles) - 1; i >= 0; i-- {
258
+
if bundles[i].EndTime.Before(afterTime) {
259
+
return bundles[i].BundleNumber
260
+
}
261
+
}
262
+
263
+
return 1 // Default to first bundle
264
+
}
265
+
266
+
// StreamRaw streams raw compressed bundle data
267
+
func (bm *BundleManager) StreamRaw(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
268
+
return bm.libManager.StreamBundleRaw(ctx, bundleNumber)
269
+
}
270
+
271
+
// StreamDecompressed streams decompressed bundle data
272
+
func (bm *BundleManager) StreamDecompressed(ctx context.Context, bundleNumber int) (io.ReadCloser, error) {
273
+
return bm.libManager.StreamBundleDecompressed(ctx, bundleNumber)
274
+
}
275
+
276
+
// GetPLCHistory calculates historical statistics from the bundle index
277
+
func (bm *BundleManager) GetPLCHistory(ctx context.Context, limit int, fromBundle int) ([]*storage.PLCHistoryPoint, error) {
278
+
index := bm.libManager.GetIndex()
279
+
allBundles := index.GetBundles()
280
+
281
+
// Filter bundles >= fromBundle
282
+
var filtered []*plcbundle.BundleMetadata
283
+
for _, b := range allBundles {
284
+
if b.BundleNumber >= fromBundle {
285
+
filtered = append(filtered, b)
286
+
}
287
+
}
288
+
289
+
if len(filtered) == 0 {
290
+
return []*storage.PLCHistoryPoint{}, nil
291
+
}
292
+
293
+
// Sort bundles by bundle number to ensure proper cumulative calculation
294
+
sort.Slice(filtered, func(i, j int) bool {
295
+
return filtered[i].BundleNumber < filtered[j].BundleNumber
296
+
})
297
+
298
+
// Group by date
299
+
type dailyStat struct {
300
+
lastBundle int
301
+
bundleCount int
302
+
totalUncompressed int64
303
+
totalCompressed int64
304
+
}
305
+
306
+
dailyStats := make(map[string]*dailyStat)
307
+
308
+
// Map to store the cumulative values at the end of each date
309
+
dateCumulatives := make(map[string]struct {
310
+
uncompressed int64
311
+
compressed int64
312
+
})
313
+
314
+
// Calculate cumulative totals as we iterate through sorted bundles
315
+
cumulativeUncompressed := int64(0)
316
+
cumulativeCompressed := int64(0)
317
+
318
+
for _, bundle := range filtered {
319
+
dateStr := bundle.StartTime.Format("2006-01-02")
320
+
321
+
// Update cumulative totals
322
+
cumulativeUncompressed += bundle.UncompressedSize
323
+
cumulativeCompressed += bundle.CompressedSize
324
+
325
+
if stat, exists := dailyStats[dateStr]; exists {
326
+
// Update existing day
327
+
if bundle.BundleNumber > stat.lastBundle {
328
+
stat.lastBundle = bundle.BundleNumber
329
+
}
330
+
stat.bundleCount++
331
+
stat.totalUncompressed += bundle.UncompressedSize
332
+
stat.totalCompressed += bundle.CompressedSize
333
+
} else {
334
+
// Create new day entry
335
+
dailyStats[dateStr] = &dailyStat{
336
+
lastBundle: bundle.BundleNumber,
337
+
bundleCount: 1,
338
+
totalUncompressed: bundle.UncompressedSize,
339
+
totalCompressed: bundle.CompressedSize,
340
+
}
341
+
}
342
+
343
+
// Store the cumulative values at the end of this date
344
+
// (will be overwritten if there are multiple bundles on the same day)
345
+
dateCumulatives[dateStr] = struct {
346
+
uncompressed int64
347
+
compressed int64
348
+
}{
349
+
uncompressed: cumulativeUncompressed,
350
+
compressed: cumulativeCompressed,
351
+
}
352
+
}
353
+
354
+
// Convert map to sorted slice by date
355
+
var dates []string
356
+
for date := range dailyStats {
357
+
dates = append(dates, date)
358
+
}
359
+
sort.Strings(dates)
360
+
361
+
// Build history points with cumulative operations
362
+
var history []*storage.PLCHistoryPoint
363
+
cumulativeOps := 0
364
+
365
+
for _, date := range dates {
366
+
stat := dailyStats[date]
367
+
cumulativeOps += stat.bundleCount * 10000
368
+
cumulative := dateCumulatives[date]
369
+
370
+
history = append(history, &storage.PLCHistoryPoint{
371
+
Date: date,
372
+
BundleNumber: stat.lastBundle,
373
+
OperationCount: cumulativeOps,
374
+
UncompressedSize: stat.totalUncompressed,
375
+
CompressedSize: stat.totalCompressed,
376
+
CumulativeUncompressed: cumulative.uncompressed,
377
+
CumulativeCompressed: cumulative.compressed,
378
+
})
379
+
}
380
+
381
+
// Apply limit if specified
382
+
if limit > 0 && len(history) > limit {
383
+
history = history[:limit]
384
+
}
385
+
386
+
return history, nil
387
+
}
+3
-12
internal/plc/scanner.go
+3
-12
internal/plc/scanner.go
···
17
17
config config.PLCConfig
18
18
}
19
19
20
-
func NewScanner(db storage.Database, cfg config.PLCConfig) *Scanner {
20
+
func NewScanner(db storage.Database, cfg config.PLCConfig, bundleManager *BundleManager) *Scanner {
21
21
log.Verbose("NewScanner: IndexDIDs config = %v", cfg.IndexDIDs)
22
22
23
-
bundleManager, err := NewBundleManager(cfg.BundleDir, cfg.DirectoryURL, db, cfg.IndexDIDs)
24
-
if err != nil {
25
-
log.Error("Failed to initialize bundle manager: %v", err)
26
-
return nil
27
-
}
28
-
29
23
return &Scanner{
30
-
bundleManager: bundleManager,
24
+
bundleManager: bundleManager, // Use provided instance
31
25
db: db,
32
26
config: cfg,
33
27
}
34
28
}
35
29
36
30
func (s *Scanner) Close() {
37
-
if s.bundleManager != nil {
38
-
s.bundleManager.Close()
39
-
}
31
+
// Don't close bundleManager here - it's shared
40
32
}
41
33
42
34
func (s *Scanner) Scan(ctx context.Context) error {
···
237
229
238
230
// ScanMetrics tracks scan progress
239
231
type ScanMetrics struct {
240
-
totalFetched int64
241
232
totalProcessed int64
242
233
newEndpoints int64
243
234
endpointCounts map[string]int64
-21
internal/storage/db.go
-21
internal/storage/db.go
···
50
50
GetScanCursor(ctx context.Context, source string) (*ScanCursor, error)
51
51
UpdateScanCursor(ctx context.Context, cursor *ScanCursor) error
52
52
53
-
// Bundle operations
54
-
CreateBundle(ctx context.Context, bundle *PLCBundle) error
55
-
GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error)
56
-
GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error)
57
-
GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error)
58
-
GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, error)
59
-
GetBundleStats(ctx context.Context) (count, compressedSize, uncompressedSize, lastBundle int64, err error)
60
-
GetLastBundleNumber(ctx context.Context) (int, error)
61
-
GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error)
62
-
GetPLCHistory(ctx context.Context, limit int, fromBundle int) ([]*PLCHistoryPoint, error)
63
-
64
-
// Mempool operations
65
-
AddToMempool(ctx context.Context, ops []MempoolOperation) error
66
-
GetMempoolCount(ctx context.Context) (int, error)
67
-
GetMempoolOperations(ctx context.Context, limit int) ([]MempoolOperation, error)
68
-
DeleteFromMempool(ctx context.Context, ids []int64) error
69
-
GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error)
70
-
GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error)
71
-
GetMempoolUniqueDIDCount(ctx context.Context) (int, error)
72
-
GetMempoolUncompressedSize(ctx context.Context) (int64, error)
73
-
74
53
// Metrics
75
54
StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
76
55
GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error)
-531
internal/storage/postgres.go
-531
internal/storage/postgres.go
···
5
5
"database/sql"
6
6
"encoding/json"
7
7
"fmt"
8
-
"strings"
9
8
"time"
10
9
11
10
"github.com/atscan/atscanner/internal/log"
···
156
155
last_scan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
157
156
records_processed BIGINT DEFAULT 0
158
157
);
159
-
160
-
CREATE TABLE IF NOT EXISTS plc_bundles (
161
-
bundle_number INTEGER PRIMARY KEY,
162
-
start_time TIMESTAMP NOT NULL,
163
-
end_time TIMESTAMP NOT NULL,
164
-
did_count INTEGER NOT NULL DEFAULT 0,
165
-
hash TEXT NOT NULL,
166
-
compressed_hash TEXT NOT NULL,
167
-
compressed_size BIGINT NOT NULL,
168
-
uncompressed_size BIGINT NOT NULL,
169
-
cumulative_compressed_size BIGINT NOT NULL,
170
-
cumulative_uncompressed_size BIGINT NOT NULL,
171
-
cursor TEXT,
172
-
prev_bundle_hash TEXT,
173
-
compressed BOOLEAN DEFAULT true,
174
-
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
175
-
);
176
-
177
-
CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time);
178
-
CREATE INDEX IF NOT EXISTS idx_plc_bundles_hash ON plc_bundles(hash);
179
-
CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash);
180
-
CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC);
181
-
182
-
CREATE TABLE IF NOT EXISTS plc_mempool (
183
-
id BIGSERIAL PRIMARY KEY,
184
-
did TEXT NOT NULL,
185
-
operation TEXT NOT NULL,
186
-
cid TEXT NOT NULL UNIQUE,
187
-
created_at TIMESTAMP NOT NULL,
188
-
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
189
-
);
190
-
191
-
CREATE INDEX IF NOT EXISTS idx_mempool_created_at ON plc_mempool(created_at);
192
-
CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did);
193
-
CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid);
194
158
195
159
-- Minimal dids table
196
160
CREATE TABLE IF NOT EXISTS dids (
···
1190
1154
}
1191
1155
}
1192
1156
return 0
1193
-
}
1194
-
1195
-
// ===== BUNDLE OPERATIONS =====
1196
-
1197
-
func (p *PostgresDB) CreateBundle(ctx context.Context, bundle *PLCBundle) error {
1198
-
// Calculate cumulative sizes from previous bundle
1199
-
if bundle.BundleNumber > 1 {
1200
-
prevBundle, err := p.GetBundleByNumber(ctx, bundle.BundleNumber-1)
1201
-
if err == nil && prevBundle != nil {
1202
-
bundle.CumulativeCompressedSize = prevBundle.CumulativeCompressedSize + bundle.CompressedSize
1203
-
bundle.CumulativeUncompressedSize = prevBundle.CumulativeUncompressedSize + bundle.UncompressedSize
1204
-
} else {
1205
-
bundle.CumulativeCompressedSize = bundle.CompressedSize
1206
-
bundle.CumulativeUncompressedSize = bundle.UncompressedSize
1207
-
}
1208
-
} else {
1209
-
bundle.CumulativeCompressedSize = bundle.CompressedSize
1210
-
bundle.CumulativeUncompressedSize = bundle.UncompressedSize
1211
-
}
1212
-
1213
-
query := `
1214
-
INSERT INTO plc_bundles (
1215
-
bundle_number, start_time, end_time, did_count,
1216
-
hash, compressed_hash, compressed_size, uncompressed_size,
1217
-
cumulative_compressed_size, cumulative_uncompressed_size,
1218
-
cursor, prev_bundle_hash, compressed
1219
-
)
1220
-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
1221
-
ON CONFLICT(bundle_number) DO UPDATE SET
1222
-
start_time = EXCLUDED.start_time,
1223
-
end_time = EXCLUDED.end_time,
1224
-
did_count = EXCLUDED.did_count,
1225
-
hash = EXCLUDED.hash,
1226
-
compressed_hash = EXCLUDED.compressed_hash,
1227
-
compressed_size = EXCLUDED.compressed_size,
1228
-
uncompressed_size = EXCLUDED.uncompressed_size,
1229
-
cumulative_compressed_size = EXCLUDED.cumulative_compressed_size,
1230
-
cumulative_uncompressed_size = EXCLUDED.cumulative_uncompressed_size,
1231
-
cursor = EXCLUDED.cursor,
1232
-
prev_bundle_hash = EXCLUDED.prev_bundle_hash,
1233
-
compressed = EXCLUDED.compressed
1234
-
`
1235
-
_, err := p.db.ExecContext(ctx, query,
1236
-
bundle.BundleNumber, bundle.StartTime, bundle.EndTime,
1237
-
bundle.DIDCount, bundle.Hash, bundle.CompressedHash,
1238
-
bundle.CompressedSize, bundle.UncompressedSize,
1239
-
bundle.CumulativeCompressedSize, bundle.CumulativeUncompressedSize,
1240
-
bundle.Cursor, bundle.PrevBundleHash, bundle.Compressed,
1241
-
)
1242
-
1243
-
return err
1244
-
}
1245
-
1246
-
func (p *PostgresDB) GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) {
1247
-
query := `
1248
-
SELECT bundle_number, start_time, end_time, did_count, hash, compressed_hash,
1249
-
compressed_size, uncompressed_size, cumulative_compressed_size,
1250
-
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
1251
-
FROM plc_bundles
1252
-
WHERE bundle_number = $1
1253
-
`
1254
-
1255
-
var bundle PLCBundle
1256
-
var prevHash sql.NullString
1257
-
var cursor sql.NullString
1258
-
1259
-
err := p.db.QueryRowContext(ctx, query, bundleNumber).Scan(
1260
-
&bundle.BundleNumber, &bundle.StartTime, &bundle.EndTime,
1261
-
&bundle.DIDCount, &bundle.Hash, &bundle.CompressedHash,
1262
-
&bundle.CompressedSize, &bundle.UncompressedSize,
1263
-
&bundle.CumulativeCompressedSize, &bundle.CumulativeUncompressedSize,
1264
-
&cursor, &prevHash, &bundle.Compressed, &bundle.CreatedAt,
1265
-
)
1266
-
if err != nil {
1267
-
return nil, err
1268
-
}
1269
-
1270
-
if prevHash.Valid {
1271
-
bundle.PrevBundleHash = prevHash.String
1272
-
}
1273
-
if cursor.Valid {
1274
-
bundle.Cursor = cursor.String
1275
-
}
1276
-
1277
-
return &bundle, nil
1278
-
}
1279
-
1280
-
func (p *PostgresDB) GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) {
1281
-
query := `
1282
-
SELECT bundle_number, start_time, end_time, did_count, hash, compressed_hash,
1283
-
compressed_size, uncompressed_size, cumulative_compressed_size,
1284
-
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
1285
-
FROM plc_bundles
1286
-
ORDER BY bundle_number DESC
1287
-
LIMIT $1
1288
-
`
1289
-
1290
-
rows, err := p.db.QueryContext(ctx, query, limit)
1291
-
if err != nil {
1292
-
return nil, err
1293
-
}
1294
-
defer rows.Close()
1295
-
1296
-
return p.scanBundles(rows)
1297
-
}
1298
-
1299
-
func (p *PostgresDB) GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) {
1300
-
// Get bundle numbers from dids table
1301
-
var bundleNumbersJSON []byte
1302
-
err := p.db.QueryRowContext(ctx, `
1303
-
SELECT bundle_numbers FROM dids WHERE did = $1
1304
-
`, did).Scan(&bundleNumbersJSON)
1305
-
1306
-
if err == sql.ErrNoRows {
1307
-
return []*PLCBundle{}, nil
1308
-
}
1309
-
if err != nil {
1310
-
return nil, err
1311
-
}
1312
-
1313
-
var bundleNumbers []int
1314
-
if err := json.Unmarshal(bundleNumbersJSON, &bundleNumbers); err != nil {
1315
-
return nil, err
1316
-
}
1317
-
1318
-
if len(bundleNumbers) == 0 {
1319
-
return []*PLCBundle{}, nil
1320
-
}
1321
-
1322
-
// Build query with IN clause
1323
-
placeholders := make([]string, len(bundleNumbers))
1324
-
args := make([]interface{}, len(bundleNumbers))
1325
-
for i, num := range bundleNumbers {
1326
-
placeholders[i] = fmt.Sprintf("$%d", i+1)
1327
-
args[i] = num
1328
-
}
1329
-
1330
-
query := fmt.Sprintf(`
1331
-
SELECT bundle_number, start_time, end_time, did_count, hash, compressed_hash,
1332
-
compressed_size, uncompressed_size, cumulative_compressed_size,
1333
-
cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at
1334
-
FROM plc_bundles
1335
-
WHERE bundle_number IN (%s)
1336
-
ORDER BY bundle_number ASC
1337
-
`, strings.Join(placeholders, ","))
1338
-
1339
-
rows, err := p.db.QueryContext(ctx, query, args...)
1340
-
if err != nil {
1341
-
return nil, err
1342
-
}
1343
-
defer rows.Close()
1344
-
1345
-
return p.scanBundles(rows)
1346
-
}
1347
-
1348
-
func (p *PostgresDB) GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, error) {
1349
-
query := `
1350
-
SELECT did
1351
-
FROM dids
1352
-
WHERE bundle_numbers @> $1::jsonb
1353
-
ORDER BY did
1354
-
`
1355
-
1356
-
rows, err := p.db.QueryContext(ctx, query, fmt.Sprintf("[%d]", bundleNum))
1357
-
if err != nil {
1358
-
return nil, err
1359
-
}
1360
-
defer rows.Close()
1361
-
1362
-
var dids []string
1363
-
for rows.Next() {
1364
-
var did string
1365
-
if err := rows.Scan(&did); err != nil {
1366
-
return nil, err
1367
-
}
1368
-
dids = append(dids, did)
1369
-
}
1370
-
1371
-
return dids, rows.Err()
1372
-
}
1373
-
1374
-
func (p *PostgresDB) scanBundles(rows *sql.Rows) ([]*PLCBundle, error) {
1375
-
var bundles []*PLCBundle
1376
-
1377
-
for rows.Next() {
1378
-
var bundle PLCBundle
1379
-
var prevHash sql.NullString
1380
-
var cursor sql.NullString
1381
-
1382
-
if err := rows.Scan(
1383
-
&bundle.BundleNumber,
1384
-
&bundle.StartTime,
1385
-
&bundle.EndTime,
1386
-
&bundle.DIDCount,
1387
-
&bundle.Hash,
1388
-
&bundle.CompressedHash,
1389
-
&bundle.CompressedSize,
1390
-
&bundle.UncompressedSize,
1391
-
&bundle.CumulativeCompressedSize,
1392
-
&bundle.CumulativeUncompressedSize,
1393
-
&cursor,
1394
-
&prevHash,
1395
-
&bundle.Compressed,
1396
-
&bundle.CreatedAt,
1397
-
); err != nil {
1398
-
return nil, err
1399
-
}
1400
-
1401
-
if prevHash.Valid {
1402
-
bundle.PrevBundleHash = prevHash.String
1403
-
}
1404
-
if cursor.Valid {
1405
-
bundle.Cursor = cursor.String
1406
-
}
1407
-
1408
-
bundles = append(bundles, &bundle)
1409
-
}
1410
-
1411
-
return bundles, rows.Err()
1412
-
}
1413
-
1414
-
func (p *PostgresDB) GetBundleStats(ctx context.Context) (int64, int64, int64, int64, error) {
1415
-
var count, lastBundleNum int64
1416
-
err := p.db.QueryRowContext(ctx, `
1417
-
SELECT COUNT(*), COALESCE(MAX(bundle_number), 0)
1418
-
FROM plc_bundles
1419
-
`).Scan(&count, &lastBundleNum)
1420
-
if err != nil {
1421
-
return 0, 0, 0, 0, err
1422
-
}
1423
-
1424
-
if lastBundleNum == 0 {
1425
-
return 0, 0, 0, 0, nil
1426
-
}
1427
-
1428
-
var compressedSize, uncompressedSize int64
1429
-
err = p.db.QueryRowContext(ctx, `
1430
-
SELECT cumulative_compressed_size, cumulative_uncompressed_size
1431
-
FROM plc_bundles
1432
-
WHERE bundle_number = $1
1433
-
`, lastBundleNum).Scan(&compressedSize, &uncompressedSize)
1434
-
if err != nil {
1435
-
return 0, 0, 0, 0, err
1436
-
}
1437
-
1438
-
return count, compressedSize, uncompressedSize, lastBundleNum, nil
1439
-
}
1440
-
1441
-
func (p *PostgresDB) GetLastBundleNumber(ctx context.Context) (int, error) {
1442
-
query := "SELECT COALESCE(MAX(bundle_number), 0) FROM plc_bundles"
1443
-
var num int
1444
-
err := p.db.QueryRowContext(ctx, query).Scan(&num)
1445
-
return num, err
1446
-
}
1447
-
1448
-
func (p *PostgresDB) GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error) {
1449
-
query := `
1450
-
SELECT bundle_number
1451
-
FROM plc_bundles
1452
-
WHERE start_time <= $1 AND end_time >= $1
1453
-
ORDER BY bundle_number ASC
1454
-
LIMIT 1
1455
-
`
1456
-
1457
-
var bundleNum int
1458
-
err := p.db.QueryRowContext(ctx, query, afterTime).Scan(&bundleNum)
1459
-
if err == sql.ErrNoRows {
1460
-
query = `
1461
-
SELECT bundle_number
1462
-
FROM plc_bundles
1463
-
WHERE end_time < $1
1464
-
ORDER BY bundle_number DESC
1465
-
LIMIT 1
1466
-
`
1467
-
err = p.db.QueryRowContext(ctx, query, afterTime).Scan(&bundleNum)
1468
-
if err == sql.ErrNoRows {
1469
-
return 1, nil
1470
-
}
1471
-
if err != nil {
1472
-
return 0, err
1473
-
}
1474
-
return bundleNum, nil
1475
-
}
1476
-
if err != nil {
1477
-
return 0, err
1478
-
}
1479
-
1480
-
return bundleNum, nil
1481
-
}
1482
-
1483
-
func (p *PostgresDB) GetPLCHistory(ctx context.Context, limit int, fromBundle int) ([]*PLCHistoryPoint, error) {
1484
-
query := `
1485
-
WITH daily_stats AS (
1486
-
SELECT
1487
-
DATE(start_time) as date,
1488
-
MAX(bundle_number) as last_bundle,
1489
-
COUNT(*) as bundle_count,
1490
-
SUM(uncompressed_size) as total_uncompressed,
1491
-
SUM(compressed_size) as total_compressed,
1492
-
MAX(cumulative_uncompressed_size) as cumulative_uncompressed,
1493
-
MAX(cumulative_compressed_size) as cumulative_compressed
1494
-
FROM plc_bundles
1495
-
WHERE bundle_number >= $1
1496
-
GROUP BY DATE(start_time)
1497
-
)
1498
-
SELECT
1499
-
date::text,
1500
-
last_bundle,
1501
-
SUM(bundle_count * 10000) OVER (ORDER BY date) as cumulative_operations,
1502
-
total_uncompressed,
1503
-
total_compressed,
1504
-
cumulative_uncompressed,
1505
-
cumulative_compressed
1506
-
FROM daily_stats
1507
-
ORDER BY date ASC
1508
-
`
1509
-
1510
-
if limit > 0 {
1511
-
query += fmt.Sprintf(" LIMIT %d", limit)
1512
-
}
1513
-
1514
-
rows, err := p.db.QueryContext(ctx, query, fromBundle)
1515
-
if err != nil {
1516
-
return nil, err
1517
-
}
1518
-
defer rows.Close()
1519
-
1520
-
var history []*PLCHistoryPoint
1521
-
for rows.Next() {
1522
-
var point PLCHistoryPoint
1523
-
var cumulativeOps int64
1524
-
1525
-
err := rows.Scan(
1526
-
&point.Date,
1527
-
&point.BundleNumber,
1528
-
&cumulativeOps,
1529
-
&point.UncompressedSize,
1530
-
&point.CompressedSize,
1531
-
&point.CumulativeUncompressed,
1532
-
&point.CumulativeCompressed,
1533
-
)
1534
-
if err != nil {
1535
-
return nil, err
1536
-
}
1537
-
1538
-
point.OperationCount = int(cumulativeOps)
1539
-
1540
-
history = append(history, &point)
1541
-
}
1542
-
1543
-
return history, rows.Err()
1544
-
}
1545
-
1546
-
// ===== MEMPOOL OPERATIONS =====
1547
-
1548
-
func (p *PostgresDB) AddToMempool(ctx context.Context, ops []MempoolOperation) error {
1549
-
if len(ops) == 0 {
1550
-
return nil
1551
-
}
1552
-
1553
-
tx, err := p.db.BeginTx(ctx, nil)
1554
-
if err != nil {
1555
-
return err
1556
-
}
1557
-
defer tx.Rollback()
1558
-
1559
-
stmt, err := tx.PrepareContext(ctx, `
1560
-
INSERT INTO plc_mempool (did, operation, cid, created_at)
1561
-
VALUES ($1, $2, $3, $4)
1562
-
ON CONFLICT(cid) DO NOTHING
1563
-
`)
1564
-
if err != nil {
1565
-
return err
1566
-
}
1567
-
defer stmt.Close()
1568
-
1569
-
for _, op := range ops {
1570
-
_, err := stmt.ExecContext(ctx, op.DID, op.Operation, op.CID, op.CreatedAt)
1571
-
if err != nil {
1572
-
return err
1573
-
}
1574
-
}
1575
-
1576
-
return tx.Commit()
1577
-
}
1578
-
1579
-
func (p *PostgresDB) GetMempoolCount(ctx context.Context) (int, error) {
1580
-
query := "SELECT COUNT(*) FROM plc_mempool"
1581
-
var count int
1582
-
err := p.db.QueryRowContext(ctx, query).Scan(&count)
1583
-
return count, err
1584
-
}
1585
-
1586
-
func (p *PostgresDB) GetMempoolOperations(ctx context.Context, limit int) ([]MempoolOperation, error) {
1587
-
query := `
1588
-
SELECT id, did, operation, cid, created_at, added_at
1589
-
FROM plc_mempool
1590
-
ORDER BY created_at ASC
1591
-
LIMIT $1
1592
-
`
1593
-
1594
-
rows, err := p.db.QueryContext(ctx, query, limit)
1595
-
if err != nil {
1596
-
return nil, err
1597
-
}
1598
-
defer rows.Close()
1599
-
1600
-
var ops []MempoolOperation
1601
-
for rows.Next() {
1602
-
var op MempoolOperation
1603
-
err := rows.Scan(&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt)
1604
-
if err != nil {
1605
-
return nil, err
1606
-
}
1607
-
ops = append(ops, op)
1608
-
}
1609
-
1610
-
return ops, rows.Err()
1611
-
}
1612
-
1613
-
func (p *PostgresDB) DeleteFromMempool(ctx context.Context, ids []int64) error {
1614
-
if len(ids) == 0 {
1615
-
return nil
1616
-
}
1617
-
1618
-
placeholders := make([]string, len(ids))
1619
-
args := make([]interface{}, len(ids))
1620
-
for i, id := range ids {
1621
-
placeholders[i] = fmt.Sprintf("$%d", i+1)
1622
-
args[i] = id
1623
-
}
1624
-
1625
-
query := fmt.Sprintf("DELETE FROM plc_mempool WHERE id IN (%s)",
1626
-
strings.Join(placeholders, ","))
1627
-
1628
-
_, err := p.db.ExecContext(ctx, query, args...)
1629
-
return err
1630
-
}
1631
-
1632
-
func (p *PostgresDB) GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error) {
1633
-
query := `
1634
-
SELECT id, did, operation, cid, created_at, added_at
1635
-
FROM plc_mempool
1636
-
ORDER BY created_at ASC, id ASC
1637
-
LIMIT 1
1638
-
`
1639
-
1640
-
var op MempoolOperation
1641
-
err := p.db.QueryRowContext(ctx, query).Scan(
1642
-
&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt,
1643
-
)
1644
-
if err == sql.ErrNoRows {
1645
-
return nil, nil
1646
-
}
1647
-
if err != nil {
1648
-
return nil, err
1649
-
}
1650
-
1651
-
return &op, nil
1652
-
}
1653
-
1654
-
func (p *PostgresDB) GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error) {
1655
-
query := `
1656
-
SELECT id, did, operation, cid, created_at, added_at
1657
-
FROM plc_mempool
1658
-
ORDER BY created_at DESC, id DESC
1659
-
LIMIT 1
1660
-
`
1661
-
1662
-
var op MempoolOperation
1663
-
err := p.db.QueryRowContext(ctx, query).Scan(
1664
-
&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt,
1665
-
)
1666
-
if err == sql.ErrNoRows {
1667
-
return nil, nil
1668
-
}
1669
-
if err != nil {
1670
-
return nil, err
1671
-
}
1672
-
1673
-
return &op, nil
1674
-
}
1675
-
1676
-
func (p *PostgresDB) GetMempoolUniqueDIDCount(ctx context.Context) (int, error) {
1677
-
query := "SELECT COUNT(DISTINCT did) FROM plc_mempool"
1678
-
var count int
1679
-
err := p.db.QueryRowContext(ctx, query).Scan(&count)
1680
-
return count, err
1681
-
}
1682
-
1683
-
func (p *PostgresDB) GetMempoolUncompressedSize(ctx context.Context) (int64, error) {
1684
-
query := "SELECT COALESCE(SUM(LENGTH(operation)), 0) FROM plc_mempool"
1685
-
var size int64
1686
-
err := p.db.QueryRowContext(ctx, query).Scan(&size)
1687
-
return size, err
1688
1157
}
1689
1158
1690
1159
// ===== CURSOR OPERATIONS =====
-10
internal/storage/types.go
-10
internal/storage/types.go
···
153
153
CumulativeCompressed int64 `json:"cumulative_compressed"`
154
154
}
155
155
156
-
// MempoolOperation represents an operation waiting to be bundled
157
-
type MempoolOperation struct {
158
-
ID int64
159
-
DID string
160
-
Operation string
161
-
CID string
162
-
CreatedAt time.Time
163
-
AddedAt time.Time
164
-
}
165
-
166
156
// ScanCursor stores scanning progress
167
157
type ScanCursor struct {
168
158
Source string