update

Changed files
+119 -1309
internal
+8
Makefile
··· 1 + all: run 2 + 3 + run: 4 + go run cmd/atscanner.go -verbose 5 + 6 + clean-db: 7 + dropdb -U atscanner atscanner 8 + createdb atscanner -O atscanner
+8 -2
go.mod
··· 1 1 module github.com/atscan/atscanner 2 2 3 - go 1.22 3 + go 1.23.0 4 4 5 5 require ( 6 6 github.com/gorilla/mux v1.8.1 ··· 18 18 19 19 require ( 20 20 github.com/felixge/httpsnoop v1.0.3 // indirect 21 - github.com/lib/pq v1.10.9 // indirect 21 + github.com/jackc/pgpassfile v1.0.0 // indirect 22 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect 23 + github.com/jackc/pgx/v5 v5.7.6 // indirect 24 + github.com/jackc/puddle/v2 v2.2.2 // indirect 25 + golang.org/x/crypto v0.37.0 // indirect 26 + golang.org/x/sync v0.13.0 // indirect 27 + golang.org/x/text v0.24.0 // indirect 22 28 )
+21
go.sum
··· 1 1 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= 2 2 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= 3 + github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 4 github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= 4 5 github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= 5 6 github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= 6 7 github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= 7 8 github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= 8 9 github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= 10 + github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= 11 + github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= 12 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= 13 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= 14 + github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= 15 + github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= 16 + github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= 17 + github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= 9 18 github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= 10 19 github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= 11 20 github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= 12 21 github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= 13 22 github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= 14 23 github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= 24 + github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 25 + github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 26 + github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= 27 + github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 28 + golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= 29 + golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= 30 + golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= 31 + golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= 32 + golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= 33 + golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= 15 34 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= 16 35 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 36 + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= 37 + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 17 38 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 18 39 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+8 -1
internal/api/handlers.go
··· 194 194 return 195 195 } 196 196 197 + // Get the last bundle number where this DID appeared 198 + if len(didRecord.BundleNumbers) == 0 { 199 + resp.error("DID has no bundle history", http.StatusInternalServerError) 200 + return 201 + } 202 + 203 + lastBundleNum := didRecord.BundleNumbers[len(didRecord.BundleNumbers)-1] 204 + 197 205 // Load last bundle to get latest operation 198 - lastBundleNum := didRecord.LastSeenBundle 199 206 ops, err := s.bundleManager.LoadBundleOperations(r.Context(), lastBundleNum) 200 207 if err != nil { 201 208 resp.error(fmt.Sprintf("failed to load bundle: %v", err), http.StatusInternalServerError)
+4 -2
internal/plc/bundle.go
··· 375 375 return err 376 376 } 377 377 378 + start := time.Now() 378 379 // Index DIDs synchronously (will use bulk inserts for speed) 379 - if err := bm.db.AddBundleDIDs(ctx, bundleNum, dids, firstSeenAt, lastSeenAt); err != nil { 380 + if err := bm.db.AddBundleDIDs(ctx, bundleNum, dids); err != nil { 380 381 log.Error("Failed to index DIDs for bundle %06d: %v", bundleNum, err) 381 382 // Don't return error - bundle is already created 382 383 // DID indexing can be retried later 383 384 } else { 384 - log.Verbose("✓ Indexed %d unique DIDs for bundle %06d", len(dids), bundleNum) 385 + elapsed := time.Since(start) 386 + log.Verbose("✓ Indexed %d unique DIDs for bundle %06d in %v", len(dids), bundleNum, elapsed) 385 387 } 386 388 387 389 return nil
+2 -4
internal/storage/db.go
··· 9 9 // NewDatabase creates a database connection based on type 10 10 func NewDatabase(dbType, connectionString string) (Database, error) { 11 11 switch dbType { 12 - case "sqlite": 13 - return NewSQLiteDB(connectionString) 14 12 case "postgres", "postgresql": 15 13 return NewPostgresDB(connectionString) 16 14 default: ··· 61 59 GetEndpointStats(ctx context.Context) (*EndpointStats, error) 62 60 63 61 // DID operations 64 - UpsertDID(ctx context.Context, did *DIDRecord) error 62 + UpsertDID(ctx context.Context, did string, bundleNum int) error 65 63 GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) 66 - AddBundleDIDs(ctx context.Context, bundleNum int, dids []string, firstSeenAt, lastSeenAt time.Time) error 64 + AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error 67 65 GetTotalDIDCount(ctx context.Context) (int64, error) 68 66 }
+65 -133
internal/storage/postgres.go
··· 126 126 CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did); 127 127 CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid); 128 128 129 - CREATE TABLE IF NOT EXISTS dids ( 130 - did TEXT PRIMARY KEY, 131 - first_seen_bundle INTEGER NOT NULL, 132 - last_seen_bundle INTEGER NOT NULL, 133 - bundle_numbers JSONB NOT NULL, 134 - operation_count INTEGER DEFAULT 1, 135 - first_seen_at TIMESTAMP NOT NULL, 136 - last_seen_at TIMESTAMP NOT NULL 137 - ); 129 + -- Minimal dids table 130 + CREATE TABLE IF NOT EXISTS dids ( 131 + did TEXT PRIMARY KEY, 132 + bundle_numbers JSONB NOT NULL DEFAULT '[]'::jsonb, 133 + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 134 + ); 138 135 139 - CREATE INDEX IF NOT EXISTS idx_dids_did ON dids(did); 140 - CREATE INDEX IF NOT EXISTS idx_dids_last_bundle ON dids(last_seen_bundle); 141 - CREATE INDEX IF NOT EXISTS idx_dids_first_seen ON dids(first_seen_at); 142 - CREATE INDEX IF NOT EXISTS idx_dids_last_seen ON dids(last_seen_at); 143 - CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 136 + CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 137 + CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at); 144 138 ` 145 139 146 140 _, err := p.db.Exec(schema) ··· 881 875 882 876 // ===== DID OPERATIONS ===== 883 877 884 - func (p *PostgresDB) UpsertDID(ctx context.Context, did *DIDRecord) error { 885 - bundleNumbersJSON, err := json.Marshal(did.BundleNumbers) 886 - if err != nil { 887 - return err 888 - } 889 - 878 + func (p *PostgresDB) UpsertDID(ctx context.Context, did string, bundleNum int) error { 890 879 query := ` 891 - INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at) 892 - VALUES ($1, $2, $3, $4, $5, $6, $7) 893 - ON CONFLICT(did) DO UPDATE SET 894 - last_seen_bundle = EXCLUDED.last_seen_bundle, 895 - bundle_numbers = EXCLUDED.bundle_numbers, 896 - operation_count = EXCLUDED.operation_count, 897 - last_seen_at = EXCLUDED.last_seen_at 898 - ` 899 - _, err = p.db.ExecContext(ctx, query, 900 - did.DID, did.FirstSeenBundle, did.LastSeenBundle, 901 - bundleNumbersJSON, did.OperationCount, 902 - did.FirstSeenAt, did.LastSeenAt) 880 + INSERT INTO dids (did, bundle_numbers, created_at) 881 + VALUES ($1, jsonb_build_array($2), CURRENT_TIMESTAMP) 882 + ON CONFLICT(did) DO UPDATE SET 883 + bundle_numbers = CASE 884 + WHEN dids.bundle_numbers ? $2::text THEN dids.bundle_numbers 885 + ELSE dids.bundle_numbers || jsonb_build_array($2) 886 + END 887 + ` 888 + _, err := p.db.ExecContext(ctx, query, did, bundleNum) 903 889 return err 904 890 } 905 891 906 892 func (p *PostgresDB) GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) { 907 893 query := ` 908 - SELECT did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at 909 - FROM dids 910 - WHERE did = $1 911 - ` 894 + SELECT did, bundle_numbers, created_at 895 + FROM dids 896 + WHERE did = $1 897 + ` 912 898 913 899 var record DIDRecord 914 900 var bundleNumbersJSON []byte 915 901 916 902 err := p.db.QueryRowContext(ctx, query, did).Scan( 917 - &record.DID, &record.FirstSeenBundle, &record.LastSeenBundle, 918 - &bundleNumbersJSON, &record.OperationCount, 919 - &record.FirstSeenAt, &record.LastSeenAt, 903 + &record.DID, 904 + &bundleNumbersJSON, 905 + &record.CreatedAt, 920 906 ) 921 907 if err != nil { 922 908 return nil, err ··· 929 915 return &record, nil 930 916 } 931 917 932 - func (p *PostgresDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string, firstSeenAt, lastSeenAt time.Time) error { 918 + func (p *PostgresDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error { 933 919 if len(dids) == 0 { 934 920 return nil 935 921 } ··· 940 926 } 941 927 defer tx.Rollback() 942 928 943 - // Get existing DIDs 944 - existingDIDs := make(map[string][]int) 945 - if len(dids) > 0 { 946 - placeholders := make([]string, len(dids)) 947 - args := make([]interface{}, len(dids)) 948 - for i, did := range dids { 949 - placeholders[i] = fmt.Sprintf("$%d", i+1) 950 - args[i] = did 951 - } 952 - 953 - query := fmt.Sprintf(` 954 - SELECT did, bundle_numbers 955 - FROM dids 956 - WHERE did IN (%s) 957 - `, strings.Join(placeholders, ",")) 958 - 959 - rows, err := tx.QueryContext(ctx, query, args...) 960 - if err != nil { 961 - return err 962 - } 963 - 964 - for rows.Next() { 965 - var did string 966 - var bundleNumbersJSON []byte 967 - if err := rows.Scan(&did, &bundleNumbersJSON); err != nil { 968 - rows.Close() 969 - return err 970 - } 971 - 972 - var bundles []int 973 - if err := json.Unmarshal(bundleNumbersJSON, &bundles); err != nil { 974 - rows.Close() 975 - return err 976 - } 977 - 978 - existingDIDs[did] = bundles 979 - } 980 - rows.Close() 981 - 982 - if err := rows.Err(); err != nil { 983 - return err 984 - } 929 + // Create temporary table 930 + _, err = tx.ExecContext(ctx, ` 931 + CREATE TEMP TABLE temp_dids (did TEXT PRIMARY KEY) ON COMMIT DROP 932 + `) 933 + if err != nil { 934 + return err 985 935 } 986 936 987 - // Batch upsert 988 - batchSize := 500 937 + // Bulk insert DIDs into temp table 938 + batchSize := 5000 989 939 for i := 0; i < len(dids); i += batchSize { 990 940 end := i + batchSize 991 941 if end > len(dids) { ··· 993 943 } 994 944 batch := dids[i:end] 995 945 996 - if err := p.bulkUpsertDIDsSimplified(ctx, tx, bundleNum, batch, existingDIDs, firstSeenAt, lastSeenAt); err != nil { 997 - return err 946 + placeholders := make([]string, len(batch)) 947 + args := make([]interface{}, len(batch)) 948 + for j, did := range batch { 949 + placeholders[j] = fmt.Sprintf("($%d)", j+1) 950 + args[j] = did 998 951 } 999 - } 1000 952 1001 - return tx.Commit() 1002 - } 953 + query := fmt.Sprintf(`INSERT INTO temp_dids VALUES %s ON CONFLICT DO NOTHING`, 954 + strings.Join(placeholders, ",")) 1003 955 1004 - func (p *PostgresDB) bulkUpsertDIDsSimplified(ctx context.Context, tx *sql.Tx, bundleNum int, dids []string, existingDIDs map[string][]int, firstSeenAt, lastSeenAt time.Time) error { 1005 - if len(dids) == 0 { 1006 - return nil 956 + if _, err := tx.ExecContext(ctx, query, args...); err != nil { 957 + return err 958 + } 1007 959 } 1008 960 1009 - var values []string 1010 - var args []interface{} 1011 - argIdx := 1 1012 - 1013 - for _, did := range dids { 1014 - bundles := existingDIDs[did] 1015 - 1016 - alreadyHas := false 1017 - for _, b := range bundles { 1018 - if b == bundleNum { 1019 - alreadyHas = true 1020 - break 1021 - } 1022 - } 961 + // Step 1: Insert new DIDs 962 + _, err = tx.ExecContext(ctx, ` 963 + INSERT INTO dids (did, bundle_numbers, created_at) 964 + SELECT td.did, $1::jsonb, CURRENT_TIMESTAMP 965 + FROM temp_dids td 966 + WHERE NOT EXISTS (SELECT 1 FROM dids WHERE dids.did = td.did) 967 + `, fmt.Sprintf("[%d]", bundleNum)) 1023 968 1024 - if !alreadyHas { 1025 - bundles = append(bundles, bundleNum) 1026 - } 969 + if err != nil { 970 + return err 971 + } 1027 972 1028 - bundleNumbersJSON, _ := json.Marshal(bundles) 973 + // Step 2: Update existing DIDs (only if bundle not already present) 974 + _, err = tx.ExecContext(ctx, ` 975 + UPDATE dids 976 + SET bundle_numbers = bundle_numbers || $1::jsonb 977 + FROM temp_dids 978 + WHERE dids.did = temp_dids.did 979 + AND NOT (bundle_numbers @> $1::jsonb) 980 + `, fmt.Sprintf("[%d]", bundleNum)) 1029 981 1030 - if len(existingDIDs[did]) > 0 { 1031 - values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)", 1032 - argIdx, argIdx+1, argIdx+2, argIdx+3, argIdx+4, argIdx+5, argIdx+6)) 1033 - args = append(args, did, bundleNum, bundleNum, bundleNumbersJSON, len(bundles), firstSeenAt, lastSeenAt) 1034 - argIdx += 7 1035 - } else { 1036 - values = append(values, fmt.Sprintf("($%d, $%d, $%d, $%d, 1, $%d, $%d)", 1037 - argIdx, argIdx+1, argIdx+2, argIdx+3, argIdx+4, argIdx+5)) 1038 - args = append(args, did, bundleNum, bundleNum, bundleNumbersJSON, firstSeenAt, lastSeenAt) 1039 - argIdx += 6 1040 - } 982 + if err != nil { 983 + return err 1041 984 } 1042 985 1043 - query := fmt.Sprintf(` 1044 - INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at) 1045 - VALUES %s 1046 - ON CONFLICT(did) DO UPDATE SET 1047 - last_seen_bundle = EXCLUDED.last_seen_bundle, 1048 - bundle_numbers = EXCLUDED.bundle_numbers, 1049 - operation_count = EXCLUDED.operation_count, 1050 - last_seen_at = EXCLUDED.last_seen_at 1051 - `, strings.Join(values, ",")) 1052 - 1053 - _, err := tx.ExecContext(ctx, query, args...) 1054 - return err 986 + return tx.Commit() 1055 987 } 1056 988 1057 989 func (p *PostgresDB) GetTotalDIDCount(ctx context.Context) (int64, error) {
-1160
internal/storage/sqlite.go
··· 1 - package storage 2 - 3 - import ( 4 - "context" 5 - "database/sql" 6 - "encoding/json" 7 - "fmt" 8 - "strings" 9 - "time" 10 - 11 - _ "github.com/mattn/go-sqlite3" 12 - ) 13 - 14 - type SQLiteDB struct { 15 - db *sql.DB 16 - } 17 - 18 - func NewSQLiteDB(path string) (*SQLiteDB, error) { 19 - db, err := sql.Open("sqlite3", path) 20 - if err != nil { 21 - return nil, err 22 - } 23 - 24 - // Performance optimizations 25 - pragmas := []string{ 26 - "PRAGMA journal_mode=WAL", 27 - "PRAGMA synchronous=NORMAL", // Faster than FULL, still safe with WAL 28 - "PRAGMA cache_size=-64000", // 64MB cache (negative = KB) 29 - "PRAGMA temp_store=MEMORY", // Store temp tables in memory 30 - "PRAGMA mmap_size=268435456", // 256MB memory-mapped I/O 31 - "PRAGMA page_size=4096", // Optimal page size 32 - "PRAGMA busy_timeout=5000", // Wait up to 5s for locks 33 - } 34 - 35 - for _, pragma := range pragmas { 36 - if _, err := db.Exec(pragma); err != nil { 37 - return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err) 38 - } 39 - } 40 - 41 - return &SQLiteDB{db: db}, nil 42 - } 43 - 44 - func (s *SQLiteDB) Close() error { 45 - return s.db.Close() 46 - } 47 - 48 - func (s *SQLiteDB) Migrate() error { 49 - schema := ` 50 - -- Endpoints table (replaces pds_servers) 51 - CREATE TABLE IF NOT EXISTS endpoints ( 52 - id INTEGER PRIMARY KEY AUTOINCREMENT, 53 - endpoint_type TEXT NOT NULL DEFAULT 'pds', 54 - endpoint TEXT NOT NULL, 55 - discovered_at TIMESTAMP NOT NULL, 56 - last_checked TIMESTAMP, 57 - status INTEGER DEFAULT 0, 58 - user_count INTEGER DEFAULT 0, 59 - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 60 - UNIQUE(endpoint_type, endpoint) 61 - ); 62 - 63 - CREATE INDEX IF NOT EXISTS idx_endpoints_type_endpoint ON endpoints(endpoint_type, endpoint); 64 - CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status); 65 - CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type); 66 - CREATE INDEX IF NOT EXISTS idx_endpoints_user_count ON endpoints(user_count); 67 - 68 - CREATE TABLE IF NOT EXISTS pds_scans ( 69 - id INTEGER PRIMARY KEY AUTOINCREMENT, 70 - pds_id INTEGER NOT NULL, 71 - status INTEGER NOT NULL, 72 - response_time REAL, 73 - scan_data TEXT, 74 - scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 75 - FOREIGN KEY (pds_id) REFERENCES endpoints(id) ON DELETE CASCADE 76 - ); 77 - 78 - CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id); 79 - CREATE INDEX IF NOT EXISTS idx_pds_scans_scanned_at ON pds_scans(scanned_at); 80 - 81 - CREATE TABLE IF NOT EXISTS plc_metrics ( 82 - id INTEGER PRIMARY KEY AUTOINCREMENT, 83 - total_dids INTEGER, 84 - total_pds INTEGER, 85 - unique_pds INTEGER, 86 - scan_duration_ms INTEGER, 87 - error_count INTEGER, 88 - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 89 - ); 90 - 91 - CREATE TABLE IF NOT EXISTS scan_cursors ( 92 - source TEXT PRIMARY KEY, 93 - last_bundle_number INTEGER DEFAULT 0, 94 - last_scan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 95 - records_processed INTEGER DEFAULT 0 96 - ); 97 - 98 - CREATE TABLE IF NOT EXISTS plc_bundles ( 99 - bundle_number INTEGER PRIMARY KEY, 100 - start_time TIMESTAMP NOT NULL, 101 - end_time TIMESTAMP NOT NULL, 102 - dids TEXT NOT NULL, 103 - hash TEXT NOT NULL, 104 - compressed_hash TEXT NOT NULL, 105 - compressed_size INTEGER NOT NULL, 106 - uncompressed_size INTEGER NOT NULL, 107 - cumulative_compressed_size INTEGER NOT NULL, 108 - cumulative_uncompressed_size INTEGER NOT NULL, 109 - cursor TEXT, 110 - prev_bundle_hash TEXT, 111 - compressed BOOLEAN DEFAULT 1, 112 - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 113 - ); 114 - 115 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time); 116 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_hash ON plc_bundles(hash); 117 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash); 118 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC); 119 - 120 - CREATE TABLE IF NOT EXISTS plc_mempool ( 121 - id INTEGER PRIMARY KEY AUTOINCREMENT, 122 - did TEXT NOT NULL, 123 - operation TEXT NOT NULL, 124 - cid TEXT NOT NULL UNIQUE, 125 - created_at TIMESTAMP NOT NULL, 126 - added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 127 - ); 128 - 129 - CREATE INDEX IF NOT EXISTS idx_mempool_created_at ON plc_mempool(created_at); 130 - CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did); 131 - CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid); 132 - 133 - -- NEW: DIDs table 134 - CREATE TABLE IF NOT EXISTS dids ( 135 - did TEXT PRIMARY KEY, 136 - first_seen_bundle INTEGER NOT NULL, 137 - last_seen_bundle INTEGER NOT NULL, 138 - bundle_numbers TEXT NOT NULL, -- JSON array of bundle numbers 139 - operation_count INTEGER DEFAULT 1, 140 - first_seen_at TIMESTAMP NOT NULL, 141 - last_seen_at TIMESTAMP NOT NULL 142 - ); 143 - 144 - CREATE INDEX IF NOT EXISTS idx_dids_last_bundle ON dids(did); 145 - CREATE INDEX IF NOT EXISTS idx_dids_last_bundle ON dids(last_seen_bundle); 146 - CREATE INDEX IF NOT EXISTS idx_dids_first_seen ON dids(first_seen_at); 147 - CREATE INDEX IF NOT EXISTS idx_dids_last_seen ON dids(last_seen_at); 148 - ` 149 - 150 - _, err := s.db.Exec(schema) 151 - return err 152 - } 153 - 154 - // GetBundleByNumber 155 - func (s *SQLiteDB) GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) { 156 - query := ` 157 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 158 - compressed_size, uncompressed_size, cumulative_compressed_size, 159 - cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 160 - FROM plc_bundles 161 - WHERE bundle_number = ? 162 - ` 163 - 164 - var bundle PLCBundle 165 - var didsJSON string 166 - var prevHash sql.NullString 167 - var cursor sql.NullString 168 - 169 - err := s.db.QueryRowContext(ctx, query, bundleNumber).Scan( 170 - &bundle.BundleNumber, &bundle.StartTime, &bundle.EndTime, 171 - &didsJSON, &bundle.Hash, &bundle.CompressedHash, 172 - &bundle.CompressedSize, &bundle.UncompressedSize, 173 - &bundle.CumulativeCompressedSize, &bundle.CumulativeUncompressedSize, 174 - &cursor, &prevHash, &bundle.Compressed, &bundle.CreatedAt, 175 - ) 176 - if err != nil { 177 - return nil, err 178 - } 179 - 180 - if prevHash.Valid { 181 - bundle.PrevBundleHash = prevHash.String 182 - } 183 - if cursor.Valid { 184 - bundle.Cursor = cursor.String 185 - } 186 - 187 - json.Unmarshal([]byte(didsJSON), &bundle.DIDs) 188 - return &bundle, nil 189 - } 190 - 191 - // GetBundleForTimestamp finds the bundle that should contain operations at or after the given time 192 - func (s *SQLiteDB) GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error) { 193 - query := ` 194 - SELECT bundle_number 195 - FROM plc_bundles 196 - WHERE start_time <= ? AND end_time >= ? 197 - ORDER BY bundle_number ASC 198 - LIMIT 1 199 - ` 200 - 201 - var bundleNum int 202 - err := s.db.QueryRowContext(ctx, query, afterTime, afterTime).Scan(&bundleNum) 203 - if err == sql.ErrNoRows { 204 - // No exact match, find the closest bundle before this time 205 - query = ` 206 - SELECT bundle_number 207 - FROM plc_bundles 208 - WHERE end_time < ? 209 - ORDER BY bundle_number DESC 210 - LIMIT 1 211 - ` 212 - err = s.db.QueryRowContext(ctx, query, afterTime).Scan(&bundleNum) 213 - if err == sql.ErrNoRows { 214 - return 1, nil // Start from first bundle 215 - } 216 - if err != nil { 217 - return 0, err 218 - } 219 - return bundleNum, nil // Return the bundle just before 220 - } 221 - if err != nil { 222 - return 0, err 223 - } 224 - 225 - return bundleNum, nil 226 - } 227 - 228 - // GetLastBundleNumber gets the highest bundle number 229 - func (s *SQLiteDB) GetLastBundleNumber(ctx context.Context) (int, error) { 230 - query := "SELECT COALESCE(MAX(bundle_number), 0) FROM plc_bundles" 231 - var num int 232 - err := s.db.QueryRowContext(ctx, query).Scan(&num) 233 - return num, err 234 - } 235 - 236 - // AddToMempool adds operations to the mempool 237 - func (s *SQLiteDB) AddToMempool(ctx context.Context, ops []MempoolOperation) error { 238 - if len(ops) == 0 { 239 - return nil 240 - } 241 - 242 - tx, err := s.db.BeginTx(ctx, nil) 243 - if err != nil { 244 - return err 245 - } 246 - defer tx.Rollback() 247 - 248 - // ✅ Use ON CONFLICT to skip duplicates 249 - stmt, err := tx.PrepareContext(ctx, ` 250 - INSERT INTO plc_mempool (did, operation, cid, created_at) 251 - VALUES (?, ?, ?, ?) 252 - ON CONFLICT(cid) DO NOTHING 253 - `) 254 - if err != nil { 255 - return err 256 - } 257 - defer stmt.Close() 258 - 259 - for _, op := range ops { 260 - _, err := stmt.ExecContext(ctx, op.DID, op.Operation, op.CID, op.CreatedAt) 261 - if err != nil { 262 - return err 263 - } 264 - } 265 - 266 - return tx.Commit() 267 - } 268 - 269 - // GetMempoolCount returns number of operations in mempool 270 - func (s *SQLiteDB) GetMempoolCount(ctx context.Context) (int, error) { 271 - query := "SELECT COUNT(*) FROM plc_mempool" 272 - var count int 273 - err := s.db.QueryRowContext(ctx, query).Scan(&count) 274 - return count, err 275 - } 276 - 277 - // GetMempoolOperations retrieves operations from mempool ordered by timestamp 278 - func (s *SQLiteDB) GetMempoolOperations(ctx context.Context, limit int) ([]MempoolOperation, error) { 279 - query := ` 280 - SELECT id, did, operation, cid, created_at, added_at 281 - FROM plc_mempool 282 - ORDER BY created_at ASC 283 - LIMIT ? 284 - ` 285 - 286 - rows, err := s.db.QueryContext(ctx, query, limit) 287 - if err != nil { 288 - return nil, err 289 - } 290 - defer rows.Close() 291 - 292 - var ops []MempoolOperation 293 - for rows.Next() { 294 - var op MempoolOperation 295 - err := rows.Scan(&op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt) 296 - if err != nil { 297 - return nil, err 298 - } 299 - ops = append(ops, op) 300 - } 301 - 302 - return ops, rows.Err() 303 - } 304 - 305 - // DeleteFromMempool removes operations from mempool 306 - func (s *SQLiteDB) DeleteFromMempool(ctx context.Context, ids []int64) error { 307 - if len(ids) == 0 { 308 - return nil 309 - } 310 - 311 - placeholders := make([]string, len(ids)) 312 - args := make([]interface{}, len(ids)) 313 - for i, id := range ids { 314 - placeholders[i] = "?" 315 - args[i] = id 316 - } 317 - 318 - query := fmt.Sprintf("DELETE FROM plc_mempool WHERE id IN (%s)", 319 - strings.Join(placeholders, ",")) 320 - 321 - _, err := s.db.ExecContext(ctx, query, args...) 322 - return err 323 - } 324 - 325 - // GetFirstMempoolOperation retrieves the oldest operation from mempool 326 - func (s *SQLiteDB) GetFirstMempoolOperation(ctx context.Context) (*MempoolOperation, error) { 327 - query := ` 328 - SELECT id, did, operation, cid, created_at, added_at 329 - FROM plc_mempool 330 - ORDER BY created_at ASC, id ASC 331 - LIMIT 1 332 - ` 333 - 334 - var op MempoolOperation 335 - err := s.db.QueryRowContext(ctx, query).Scan( 336 - &op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt, 337 - ) 338 - if err == sql.ErrNoRows { 339 - return nil, nil // No operations in mempool 340 - } 341 - if err != nil { 342 - return nil, err 343 - } 344 - 345 - return &op, nil 346 - } 347 - 348 - // GetLastMempoolOperation retrieves the most recent operation from mempool 349 - func (s *SQLiteDB) GetLastMempoolOperation(ctx context.Context) (*MempoolOperation, error) { 350 - query := ` 351 - SELECT id, did, operation, cid, created_at, added_at 352 - FROM plc_mempool 353 - ORDER BY created_at DESC, id DESC 354 - LIMIT 1 355 - ` 356 - 357 - var op MempoolOperation 358 - err := s.db.QueryRowContext(ctx, query).Scan( 359 - &op.ID, &op.DID, &op.Operation, &op.CID, &op.CreatedAt, &op.AddedAt, 360 - ) 361 - if err == sql.ErrNoRows { 362 - return nil, nil // No operations in mempool 363 - } 364 - if err != nil { 365 - return nil, err 366 - } 367 - 368 - return &op, nil 369 - } 370 - 371 - func (s *SQLiteDB) CreateBundle(ctx context.Context, bundle *PLCBundle) error { 372 - didsJSON, err := json.Marshal(bundle.DIDs) 373 - if err != nil { 374 - return err 375 - } 376 - 377 - // Calculate cumulative sizes from previous bundle 378 - if bundle.BundleNumber > 1 { 379 - prevBundle, err := s.GetBundleByNumber(ctx, bundle.BundleNumber-1) 380 - if err == nil && prevBundle != nil { 381 - bundle.CumulativeCompressedSize = prevBundle.CumulativeCompressedSize + bundle.CompressedSize 382 - bundle.CumulativeUncompressedSize = prevBundle.CumulativeUncompressedSize + bundle.UncompressedSize 383 - } else { 384 - // Fallback: this shouldn't happen, but calculate from scratch 385 - bundle.CumulativeCompressedSize = bundle.CompressedSize 386 - bundle.CumulativeUncompressedSize = bundle.UncompressedSize 387 - } 388 - } else { 389 - // First bundle 390 - bundle.CumulativeCompressedSize = bundle.CompressedSize 391 - bundle.CumulativeUncompressedSize = bundle.UncompressedSize 392 - } 393 - 394 - query := ` 395 - INSERT INTO plc_bundles ( 396 - bundle_number, start_time, end_time, dids, 397 - hash, compressed_hash, compressed_size, uncompressed_size, 398 - cumulative_compressed_size, cumulative_uncompressed_size, 399 - cursor, prev_bundle_hash, compressed 400 - ) 401 - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 402 - ON CONFLICT(bundle_number) DO UPDATE SET 403 - start_time = excluded.start_time, 404 - end_time = excluded.end_time, 405 - dids = excluded.dids, 406 - hash = excluded.hash, 407 - compressed_hash = excluded.compressed_hash, 408 - compressed_size = excluded.compressed_size, 409 - uncompressed_size = excluded.uncompressed_size, 410 - cumulative_compressed_size = excluded.cumulative_compressed_size, 411 - cumulative_uncompressed_size = excluded.cumulative_uncompressed_size, 412 - cursor = excluded.cursor, 413 - prev_bundle_hash = excluded.prev_bundle_hash, 414 - compressed = excluded.compressed 415 - ` 416 - _, err = s.db.ExecContext(ctx, query, 417 - bundle.BundleNumber, bundle.StartTime, bundle.EndTime, 418 - string(didsJSON), bundle.Hash, bundle.CompressedHash, 419 - bundle.CompressedSize, bundle.UncompressedSize, 420 - bundle.CumulativeCompressedSize, bundle.CumulativeUncompressedSize, 421 - bundle.Cursor, bundle.PrevBundleHash, bundle.Compressed, 422 - ) 423 - 424 - return err 425 - } 426 - 427 - // GetMempoolUniqueDIDCount returns the number of unique DIDs in mempool 428 - func (s *SQLiteDB) GetMempoolUniqueDIDCount(ctx context.Context) (int, error) { 429 - query := "SELECT COUNT(DISTINCT did) FROM plc_mempool" 430 - var count int 431 - err := s.db.QueryRowContext(ctx, query).Scan(&count) 432 - return count, err 433 - } 434 - 435 - // GetMempoolUncompressedSize returns total uncompressed size of all operations 436 - func (s *SQLiteDB) GetMempoolUncompressedSize(ctx context.Context) (int64, error) { 437 - query := "SELECT COALESCE(SUM(LENGTH(operation)), 0) FROM plc_mempool" 438 - var size int64 439 - err := s.db.QueryRowContext(ctx, query).Scan(&size) 440 - return size, err 441 - } 442 - 443 - // GetBundles 444 - func (s *SQLiteDB) GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) { 445 - query := ` 446 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 447 - compressed_size, uncompressed_size, cumulative_compressed_size, 448 - cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 449 - FROM plc_bundles 450 - ORDER BY bundle_number DESC 451 - LIMIT ? 452 - ` 453 - 454 - rows, err := s.db.QueryContext(ctx, query, limit) 455 - if err != nil { 456 - return nil, err 457 - } 458 - defer rows.Close() 459 - 460 - return s.scanBundles(rows) 461 - } 462 - 463 - // GetBundlesForDID 464 - func (s *SQLiteDB) GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) { 465 - query := ` 466 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 467 - compressed_size, uncompressed_size, cumulative_compressed_size, 468 - cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 469 - FROM plc_bundles 470 - WHERE EXISTS ( 471 - SELECT 1 FROM json_each(dids) 472 - WHERE json_each.value = ? 473 - ) 474 - ORDER BY bundle_number ASC 475 - ` 476 - 477 - rows, err := s.db.QueryContext(ctx, query, did) 478 - if err != nil { 479 - return nil, err 480 - } 481 - defer rows.Close() 482 - 483 - return s.scanBundles(rows) 484 - } 485 - 486 - // GetBundle retrieves bundle by time (if needed, otherwise can be removed) 487 - func (s *SQLiteDB) GetBundle(ctx context.Context, afterTime time.Time) (*PLCBundle, error) { 488 - var query string 489 - var args []interface{} 490 - 491 - if afterTime.IsZero() { 492 - query = ` 493 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 494 - compressed_size, uncompressed_size, cumulative_compressed_size, 495 - cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 496 - FROM plc_bundles 497 - ORDER BY start_time ASC 498 - LIMIT 1 499 - ` 500 - args = []interface{}{} 501 - } else { 502 - query = ` 503 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 504 - compressed_size, uncompressed_size, cumulative_compressed_size, 505 - cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 506 - FROM plc_bundles 507 - WHERE start_time >= ? 508 - ORDER BY start_time ASC 509 - LIMIT 1 510 - ` 511 - args = []interface{}{afterTime} 512 - } 513 - 514 - var bundle PLCBundle 515 - var didsJSON string 516 - var prevHash sql.NullString 517 - var cursor sql.NullString 518 - 519 - err := s.db.QueryRowContext(ctx, query, args...).Scan( 520 - &bundle.BundleNumber, 521 - &bundle.StartTime, 522 - &bundle.EndTime, 523 - &didsJSON, 524 - &bundle.Hash, 525 - &bundle.CompressedHash, 526 - &bundle.CompressedSize, 527 - &bundle.UncompressedSize, 528 - &bundle.CumulativeCompressedSize, 529 - &bundle.CumulativeUncompressedSize, 530 - &cursor, 531 - &prevHash, 532 - &bundle.Compressed, 533 - &bundle.CreatedAt, 534 - ) 535 - if err == sql.ErrNoRows { 536 - return nil, nil 537 - } 538 - if err != nil { 539 - return nil, err 540 - } 541 - 542 - if prevHash.Valid { 543 - bundle.PrevBundleHash = prevHash.String 544 - } 545 - if cursor.Valid { 546 - bundle.Cursor = cursor.String 547 - } 548 - 549 - json.Unmarshal([]byte(didsJSON), &bundle.DIDs) 550 - return &bundle, nil 551 - } 552 - 553 - func (s *SQLiteDB) scanBundles(rows *sql.Rows) ([]*PLCBundle, error) { 554 - var bundles []*PLCBundle 555 - 556 - for rows.Next() { 557 - var bundle PLCBundle 558 - var didsJSON string 559 - var prevHash sql.NullString 560 - var cursor sql.NullString 561 - 562 - if err := rows.Scan( 563 - &bundle.BundleNumber, 564 - &bundle.StartTime, 565 - &bundle.EndTime, 566 - &didsJSON, 567 - &bundle.Hash, 568 - &bundle.CompressedHash, 569 - &bundle.CompressedSize, 570 - &bundle.UncompressedSize, 571 - &bundle.CumulativeCompressedSize, 572 - &bundle.CumulativeUncompressedSize, 573 - &cursor, 574 - &prevHash, 575 - &bundle.Compressed, 576 - &bundle.CreatedAt, 577 - ); err != nil { 578 - return nil, err 579 - } 580 - 581 - if prevHash.Valid { 582 - bundle.PrevBundleHash = prevHash.String 583 - } 584 - if cursor.Valid { 585 - bundle.Cursor = cursor.String 586 - } 587 - 588 - json.Unmarshal([]byte(didsJSON), &bundle.DIDs) 589 - bundles = append(bundles, &bundle) 590 - } 591 - 592 - return bundles, rows.Err() 593 - } 594 - 595 - func (s *SQLiteDB) GetBundleStats(ctx context.Context) (int64, int64, int64, int64, error) { 596 - // Get count and last bundle number 597 - var count, lastBundleNum int64 598 - err := s.db.QueryRowContext(ctx, ` 599 - SELECT COUNT(*), COALESCE(MAX(bundle_number), 0) 600 - FROM plc_bundles 601 - `).Scan(&count, &lastBundleNum) 602 - if err != nil { 603 - return 0, 0, 0, 0, err 604 - } 605 - 606 - if lastBundleNum == 0 { 607 - return 0, 0, 0, 0, nil 608 - } 609 - 610 - // Get cumulative sizes from last bundle (O(1) with index!) 611 - var compressedSize, uncompressedSize int64 612 - err = s.db.QueryRowContext(ctx, ` 613 - SELECT cumulative_compressed_size, cumulative_uncompressed_size 614 - FROM plc_bundles 615 - WHERE bundle_number = ? 616 - `, lastBundleNum).Scan(&compressedSize, &uncompressedSize) 617 - if err != nil { 618 - return 0, 0, 0, 0, err 619 - } 620 - 621 - return count, compressedSize, uncompressedSize, lastBundleNum, nil 622 - } 623 - 624 - // UpsertEndpoint inserts or updates an endpoint 625 - func (s *SQLiteDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 626 - query := ` 627 - INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status) 628 - VALUES (?, ?, ?, ?, ?) 629 - ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 630 - last_checked = excluded.last_checked 631 - RETURNING id 632 - ` 633 - err := s.db.QueryRowContext(ctx, query, 634 - endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 635 - endpoint.LastChecked, endpoint.Status).Scan(&endpoint.ID) 636 - return err 637 - } 638 - 639 - // EndpointExists checks if an endpoint already exists 640 - func (s *SQLiteDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) { 641 - query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = ? AND endpoint_type = ?)" 642 - var exists bool 643 - err := s.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists) 644 - return exists, err 645 - } 646 - 647 - // GetEndpointIDByEndpoint gets the ID for an endpoint 648 - func (s *SQLiteDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) { 649 - query := "SELECT id FROM endpoints WHERE endpoint = ? AND endpoint_type = ?" 650 - var id int64 651 - err := s.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id) 652 - return id, err 653 - } 654 - 655 - // GetEndpoint retrieves an endpoint by endpoint string and type 656 - func (s *SQLiteDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 657 - query := ` 658 - SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at 659 - FROM endpoints 660 - WHERE endpoint = ? AND endpoint_type = ? 661 - ` 662 - 663 - var ep Endpoint 664 - var lastChecked sql.NullTime 665 - 666 - err := s.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 667 - &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 668 - &ep.Status, &ep.UserCount, &ep.UpdatedAt, 669 - ) 670 - if err != nil { 671 - return nil, err 672 - } 673 - 674 - if lastChecked.Valid { 675 - ep.LastChecked = lastChecked.Time 676 - } 677 - 678 - return &ep, nil 679 - } 680 - 681 - // GetEndpointByID retrieves an endpoint by ID 682 - func (s *SQLiteDB) GetEndpointByID(ctx context.Context, id int64) (*Endpoint, error) { 683 - query := ` 684 - SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at 685 - FROM endpoints 686 - WHERE id = ? 687 - ` 688 - 689 - var ep Endpoint 690 - var lastChecked sql.NullTime 691 - 692 - err := s.db.QueryRowContext(ctx, query, id).Scan( 693 - &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 694 - &ep.Status, &ep.UserCount, &ep.UpdatedAt, 695 - ) 696 - if err != nil { 697 - return nil, err 698 - } 699 - 700 - if lastChecked.Valid { 701 - ep.LastChecked = lastChecked.Time 702 - } 703 - 704 - return &ep, nil 705 - } 706 - 707 - // GetEndpoints retrieves multiple endpoints 708 - func (s *SQLiteDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) { 709 - query := ` 710 - SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, updated_at 711 - FROM endpoints 712 - WHERE 1=1 713 - ` 714 - args := []interface{}{} 715 - 716 - if filter != nil { 717 - if filter.Type != "" { 718 - query += " AND endpoint_type = ?" 719 - args = append(args, filter.Type) 720 - } 721 - if filter.Status != "" { 722 - statusInt := EndpointStatusUnknown 723 - switch filter.Status { 724 - case "online": 725 - statusInt = EndpointStatusOnline 726 - case "offline": 727 - statusInt = EndpointStatusOffline 728 - } 729 - query += " AND status = ?" 730 - args = append(args, statusInt) 731 - } 732 - if filter.MinUserCount > 0 { 733 - query += " AND user_count >= ?" 734 - args = append(args, filter.MinUserCount) 735 - } 736 - } 737 - 738 - query += " ORDER BY user_count DESC" 739 - 740 - if filter != nil && filter.Limit > 0 { 741 - query += fmt.Sprintf(" LIMIT %d OFFSET %d", filter.Limit, filter.Offset) 742 - } 743 - 744 - rows, err := s.db.QueryContext(ctx, query, args...) 745 - if err != nil { 746 - return nil, err 747 - } 748 - defer rows.Close() 749 - 750 - var endpoints []*Endpoint 751 - for rows.Next() { 752 - var ep Endpoint 753 - var lastChecked sql.NullTime 754 - 755 - err := rows.Scan( 756 - &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 757 - &ep.Status, &ep.UserCount, &ep.UpdatedAt, 758 - ) 759 - if err != nil { 760 - return nil, err 761 - } 762 - 763 - if lastChecked.Valid { 764 - ep.LastChecked = lastChecked.Time 765 - } 766 - 767 - endpoints = append(endpoints, &ep) 768 - } 769 - 770 - return endpoints, rows.Err() 771 - } 772 - 773 - // UpdateEndpointStatus updates the status and creates a scan record 774 - func (s *SQLiteDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error { 775 - tx, err := s.db.BeginTx(ctx, nil) 776 - if err != nil { 777 - return err 778 - } 779 - defer tx.Rollback() 780 - 781 - // Calculate user count from scan data 782 - userCount := 0 783 - if update.ScanData != nil { 784 - userCount = update.ScanData.DIDCount 785 - } 786 - 787 - // Update main endpoints record 788 - query := ` 789 - UPDATE endpoints 790 - SET status = ?, last_checked = ?, user_count = ?, updated_at = ? 791 - WHERE id = ? 792 - ` 793 - _, err = tx.ExecContext(ctx, query, update.Status, update.LastChecked, userCount, time.Now(), endpointID) 794 - if err != nil { 795 - return err 796 - } 797 - 798 - // Marshal scan data 799 - var scanDataJSON []byte 800 - if update.ScanData != nil { 801 - scanDataJSON, _ = json.Marshal(update.ScanData) 802 - } 803 - 804 - // Insert scan history (reuse pds_scans table or rename it to endpoint_scans) 805 - scanQuery := ` 806 - INSERT INTO pds_scans (pds_id, status, response_time, scan_data) 807 - VALUES (?, ?, ?, ?) 808 - ` 809 - _, err = tx.ExecContext(ctx, scanQuery, endpointID, update.Status, update.ResponseTime, string(scanDataJSON)) 810 - if err != nil { 811 - return err 812 - } 813 - 814 - return tx.Commit() 815 - } 816 - 817 - // GetEndpointScans retrieves scan history for an endpoint 818 - func (s *SQLiteDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) { 819 - query := ` 820 - SELECT id, pds_id, status, response_time, scan_data, scanned_at 821 - FROM pds_scans 822 - WHERE pds_id = ? 823 - ORDER BY scanned_at DESC 824 - LIMIT ? 825 - ` 826 - 827 - rows, err := s.db.QueryContext(ctx, query, endpointID, limit) 828 - if err != nil { 829 - return nil, err 830 - } 831 - defer rows.Close() 832 - 833 - var scans []*EndpointScan 834 - for rows.Next() { 835 - var scan EndpointScan 836 - var responseTime sql.NullFloat64 837 - var scanDataJSON sql.NullString 838 - 839 - err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt) 840 - if err != nil { 841 - return nil, err 842 - } 843 - 844 - if responseTime.Valid { 845 - scan.ResponseTime = responseTime.Float64 846 - } 847 - 848 - if scanDataJSON.Valid && scanDataJSON.String != "" { 849 - var scanData EndpointScanData 850 - if err := json.Unmarshal([]byte(scanDataJSON.String), &scanData); err == nil { 851 - scan.ScanData = &scanData 852 - } 853 - } 854 - 855 - scans = append(scans, &scan) 856 - } 857 - 858 - return scans, rows.Err() 859 - } 860 - 861 - // GetEndpointStats returns aggregate statistics about all endpoints 862 - func (s *SQLiteDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) { 863 - query := ` 864 - SELECT 865 - COUNT(*) as total_endpoints, 866 - SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints, 867 - SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints, 868 - (SELECT AVG(response_time) FROM pds_scans WHERE response_time > 0 869 - AND scanned_at > datetime('now', '-1 hour')) as avg_response_time, 870 - SUM(user_count) as total_dids 871 - FROM endpoints 872 - ` 873 - 874 - var stats EndpointStats 875 - var avgResponseTime sql.NullFloat64 876 - 877 - err := s.db.QueryRowContext(ctx, query).Scan( 878 - &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, 879 - &avgResponseTime, &stats.TotalDIDs, 880 - ) 881 - 882 - if avgResponseTime.Valid { 883 - stats.AvgResponseTime = avgResponseTime.Float64 884 - } 885 - 886 - // Get counts by type 887 - typeQuery := ` 888 - SELECT endpoint_type, COUNT(*) 889 - FROM endpoints 890 - GROUP BY endpoint_type 891 - ` 892 - rows, err := s.db.QueryContext(ctx, typeQuery) 893 - if err == nil { 894 - defer rows.Close() 895 - stats.ByType = make(map[string]int64) 896 - for rows.Next() { 897 - var typ string 898 - var count int64 899 - if err := rows.Scan(&typ, &count); err == nil { 900 - stats.ByType[typ] = count 901 - } 902 - } 903 - } 904 - 905 - return &stats, err 906 - } 907 - 908 - // GetScanCursor retrieves cursor with bundle number 909 - func (s *SQLiteDB) GetScanCursor(ctx context.Context, source string) (*ScanCursor, error) { 910 - query := "SELECT source, last_bundle_number, last_scan_time, records_processed FROM scan_cursors WHERE source = ?" 911 - 912 - var cursor ScanCursor 913 - err := s.db.QueryRowContext(ctx, query, source).Scan( 914 - &cursor.Source, &cursor.LastBundleNumber, &cursor.LastScanTime, &cursor.RecordsProcessed, 915 - ) 916 - if err == sql.ErrNoRows { 917 - return &ScanCursor{ 918 - Source: source, 919 - LastBundleNumber: 0, 920 - LastScanTime: time.Time{}, 921 - }, nil 922 - } 923 - return &cursor, err 924 - } 925 - 926 - // UpdateScanCursor updates cursor with bundle number 927 - func (s *SQLiteDB) UpdateScanCursor(ctx context.Context, cursor *ScanCursor) error { 928 - query := ` 929 - INSERT INTO scan_cursors (source, last_bundle_number, last_scan_time, records_processed) 930 - VALUES (?, ?, ?, ?) 931 - ON CONFLICT(source) DO UPDATE SET 932 - last_bundle_number = excluded.last_bundle_number, 933 - last_scan_time = excluded.last_scan_time, 934 - records_processed = excluded.records_processed 935 - ` 936 - _, err := s.db.ExecContext(ctx, query, cursor.Source, cursor.LastBundleNumber, cursor.LastScanTime, cursor.RecordsProcessed) 937 - return err 938 - } 939 - 940 - // StorePLCMetrics stores PLC scan metrics 941 - func (s *SQLiteDB) StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error { 942 - query := ` 943 - INSERT INTO plc_metrics (total_dids, total_pds, unique_pds, scan_duration_ms, error_count) 944 - VALUES (?, ?, ?, ?, ?) 945 - ` 946 - _, err := s.db.ExecContext(ctx, query, metrics.TotalDIDs, metrics.TotalPDS, 947 - metrics.UniquePDS, metrics.ScanDuration, metrics.ErrorCount) 948 - return err 949 - } 950 - 951 - // GetPLCMetrics retrieves recent PLC metrics 952 - func (s *SQLiteDB) GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error) { 953 - query := ` 954 - SELECT total_dids, total_pds, unique_pds, scan_duration_ms, error_count, created_at 955 - FROM plc_metrics 956 - ORDER BY created_at DESC 957 - LIMIT ? 958 - ` 959 - 960 - rows, err := s.db.QueryContext(ctx, query, limit) 961 - if err != nil { 962 - return nil, err 963 - } 964 - defer rows.Close() 965 - 966 - var metrics []*PLCMetrics 967 - for rows.Next() { 968 - var m PLCMetrics 969 - if err := rows.Scan(&m.TotalDIDs, &m.TotalPDS, &m.UniquePDS, &m.ScanDuration, &m.ErrorCount, &m.LastScanTime); err != nil { 970 - return nil, err 971 - } 972 - metrics = append(metrics, &m) 973 - } 974 - 975 - return metrics, rows.Err() 976 - } 977 - 978 - // UpsertDID inserts or updates a DID record 979 - func (s *SQLiteDB) UpsertDID(ctx context.Context, did *DIDRecord) error { 980 - bundleNumbersJSON, err := json.Marshal(did.BundleNumbers) 981 - if err != nil { 982 - return err 983 - } 984 - 985 - query := ` 986 - INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at) 987 - VALUES (?, ?, ?, ?, ?, ?, ?) 988 - ON CONFLICT(did) DO UPDATE SET 989 - last_seen_bundle = excluded.last_seen_bundle, 990 - bundle_numbers = excluded.bundle_numbers, 991 - operation_count = excluded.operation_count, 992 - last_seen_at = excluded.last_seen_at 993 - ` 994 - _, err = s.db.ExecContext(ctx, query, 995 - did.DID, did.FirstSeenBundle, did.LastSeenBundle, 996 - string(bundleNumbersJSON), did.OperationCount, 997 - did.FirstSeenAt, did.LastSeenAt) 998 - return err 999 - } 1000 - 1001 - // GetDIDRecord retrieves a DID record 1002 - func (s *SQLiteDB) GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) { 1003 - query := ` 1004 - SELECT did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at 1005 - FROM dids 1006 - WHERE did = ? 1007 - ` 1008 - 1009 - var record DIDRecord 1010 - var bundleNumbersJSON string 1011 - 1012 - err := s.db.QueryRowContext(ctx, query, did).Scan( 1013 - &record.DID, &record.FirstSeenBundle, &record.LastSeenBundle, 1014 - &bundleNumbersJSON, &record.OperationCount, 1015 - &record.FirstSeenAt, &record.LastSeenAt, 1016 - ) 1017 - if err != nil { 1018 - return nil, err 1019 - } 1020 - 1021 - // Parse bundle numbers 1022 - if err := json.Unmarshal([]byte(bundleNumbersJSON), &record.BundleNumbers); err != nil { 1023 - return nil, err 1024 - } 1025 - 1026 - return &record, nil 1027 - } 1028 - 1029 - // AddBundleDIDs adds DIDs for a bundle (optimized bulk operation) 1030 - func (s *SQLiteDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string, firstSeenAt, lastSeenAt time.Time) error { 1031 - if len(dids) == 0 { 1032 - return nil 1033 - } 1034 - 1035 - tx, err := s.db.BeginTx(ctx, nil) 1036 - if err != nil { 1037 - return err 1038 - } 1039 - defer tx.Rollback() 1040 - 1041 - // Get existing DIDs to update their bundle_numbers arrays 1042 - existingDIDs := make(map[string][]int) 1043 - if len(dids) > 0 { 1044 - // Build query to fetch existing DIDs 1045 - placeholders := make([]string, len(dids)) 1046 - args := make([]interface{}, len(dids)) 1047 - for i, did := range dids { 1048 - placeholders[i] = "?" 1049 - args[i] = did 1050 - } 1051 - 1052 - query := fmt.Sprintf(` 1053 - SELECT did, bundle_numbers 1054 - FROM dids 1055 - WHERE did IN (%s) 1056 - `, strings.Join(placeholders, ",")) 1057 - 1058 - rows, err := tx.QueryContext(ctx, query, args...) 1059 - if err != nil { 1060 - return err 1061 - } 1062 - 1063 - for rows.Next() { 1064 - var did, bundleNumbersJSON string 1065 - if err := rows.Scan(&did, &bundleNumbersJSON); err != nil { 1066 - rows.Close() 1067 - return err 1068 - } 1069 - 1070 - var bundles []int 1071 - if err := json.Unmarshal([]byte(bundleNumbersJSON), &bundles); err != nil { 1072 - rows.Close() 1073 - return err 1074 - } 1075 - 1076 - existingDIDs[did] = bundles 1077 - } 1078 - rows.Close() 1079 - 1080 - if err := rows.Err(); err != nil { 1081 - return err 1082 - } 1083 - } 1084 - 1085 - // Batch upsert 1086 - batchSize := 500 1087 - for i := 0; i < len(dids); i += batchSize { 1088 - end := i + batchSize 1089 - if end > len(dids) { 1090 - end = len(dids) 1091 - } 1092 - batch := dids[i:end] 1093 - 1094 - if err := s.bulkUpsertDIDsSimplified(ctx, tx, bundleNum, batch, existingDIDs, firstSeenAt, lastSeenAt); err != nil { 1095 - return err 1096 - } 1097 - } 1098 - 1099 - return tx.Commit() 1100 - } 1101 - 1102 - func (s *SQLiteDB) bulkUpsertDIDsSimplified(ctx context.Context, tx *sql.Tx, bundleNum int, dids []string, existingDIDs map[string][]int, firstSeenAt, lastSeenAt time.Time) error { 1103 - if len(dids) == 0 { 1104 - return nil 1105 - } 1106 - 1107 - var values []string 1108 - var args []interface{} 1109 - 1110 - for _, did := range dids { 1111 - // Check if DID exists and append bundle number 1112 - bundles := existingDIDs[did] 1113 - 1114 - // Avoid duplicates 1115 - alreadyHas := false 1116 - for _, b := range bundles { 1117 - if b == bundleNum { 1118 - alreadyHas = true 1119 - break 1120 - } 1121 - } 1122 - 1123 - if !alreadyHas { 1124 - bundles = append(bundles, bundleNum) 1125 - } 1126 - 1127 - bundleNumbersJSON, _ := json.Marshal(bundles) 1128 - 1129 - if len(existingDIDs[did]) > 0 { 1130 - // Update existing 1131 - values = append(values, "(?, ?, ?, ?, ?, ?, ?)") 1132 - args = append(args, did, bundleNum, bundleNum, string(bundleNumbersJSON), len(bundles), firstSeenAt, lastSeenAt) 1133 - } else { 1134 - // New DID 1135 - values = append(values, "(?, ?, ?, ?, 1, ?, ?)") 1136 - args = append(args, did, bundleNum, bundleNum, string(bundleNumbersJSON), firstSeenAt, lastSeenAt) 1137 - } 1138 - } 1139 - 1140 - query := fmt.Sprintf(` 1141 - INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at) 1142 - VALUES %s 1143 - ON CONFLICT(did) DO UPDATE SET 1144 - last_seen_bundle = excluded.last_seen_bundle, 1145 - bundle_numbers = excluded.bundle_numbers, 1146 - operation_count = excluded.operation_count, 1147 - last_seen_at = excluded.last_seen_at 1148 - `, strings.Join(values, ",")) 1149 - 1150 - _, err := tx.ExecContext(ctx, query, args...) 1151 - return err 1152 - } 1153 - 1154 - // GetTotalDIDCount returns the total number of unique DIDs 1155 - func (s *SQLiteDB) GetTotalDIDCount(ctx context.Context) (int64, error) { 1156 - query := "SELECT COUNT(*) FROM dids" 1157 - var count int64 1158 - err := s.db.QueryRowContext(ctx, query).Scan(&count) 1159 - return count, err 1160 - }
+3 -7
internal/storage/types.go
··· 153 153 154 154 // DIDRecord represents a DID entry in the database 155 155 type DIDRecord struct { 156 - DID string 157 - FirstSeenBundle int 158 - LastSeenBundle int 159 - BundleNumbers []int // Bundle numbers where this DID appears 160 - OperationCount int 161 - FirstSeenAt time.Time 162 - LastSeenAt time.Time 156 + DID string `json:"did"` 157 + BundleNumbers []int `json:"bundle_numbers"` 158 + CreatedAt time.Time `json:"created_at"` 163 159 }