+45
-20
internal/api/handlers.go
+45
-20
internal/api/handlers.go
···
3
import (
4
"context"
5
"crypto/sha256"
6
"encoding/hex"
7
"encoding/json"
8
"fmt"
···
182
vars := mux.Vars(r)
183
did := vars["did"]
184
185
-
bundles, err := s.db.GetBundlesForDID(r.Context(), did)
186
if err != nil {
187
-
resp.error(err.Error(), http.StatusInternalServerError)
188
return
189
}
190
191
-
if len(bundles) == 0 {
192
-
resp.error("DID not found in bundles", http.StatusNotFound)
193
-
return
194
-
}
195
-
196
-
lastBundle := bundles[len(bundles)-1]
197
-
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), lastBundle.BundleNumber)
198
if err != nil {
199
resp.error(fmt.Sprintf("failed to load bundle: %v", err), http.StatusInternalServerError)
200
return
···
216
vars := mux.Vars(r)
217
did := vars["did"]
218
219
-
bundles, err := s.db.GetBundlesForDID(r.Context(), did)
220
if err != nil {
221
-
resp.error(err.Error(), http.StatusInternalServerError)
222
-
return
223
-
}
224
-
225
-
if len(bundles) == 0 {
226
-
resp.error("DID not found in bundles", http.StatusNotFound)
227
return
228
}
229
230
var allOperations []plc.DIDHistoryEntry
231
var currentOp *plc.PLCOperation
232
233
-
for _, bundle := range bundles {
234
-
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), bundle.BundleNumber)
235
if err != nil {
236
-
log.Error("Warning: failed to load bundle: %v", err)
237
continue
238
}
239
···
241
if op.DID == did {
242
entry := plc.DIDHistoryEntry{
243
Operation: op,
244
-
PLCBundle: fmt.Sprintf("%06d", bundle.BundleNumber),
245
}
246
allOperations = append(allOperations, entry)
247
currentOp = &op
···
253
DID: did,
254
Current: currentOp,
255
Operations: allOperations,
256
})
257
}
258
···
3
import (
4
"context"
5
"crypto/sha256"
6
+
"database/sql"
7
"encoding/hex"
8
"encoding/json"
9
"fmt"
···
183
vars := mux.Vars(r)
184
did := vars["did"]
185
186
+
// Fast lookup using dids table
187
+
didRecord, err := s.db.GetDIDRecord(r.Context(), did)
188
if err != nil {
189
+
if err == sql.ErrNoRows {
190
+
resp.error("DID not found", http.StatusNotFound)
191
+
} else {
192
+
resp.error(err.Error(), http.StatusInternalServerError)
193
+
}
194
return
195
}
196
197
+
// Load last bundle to get latest operation
198
+
lastBundleNum := didRecord.LastSeenBundle
199
+
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), lastBundleNum)
200
if err != nil {
201
resp.error(fmt.Sprintf("failed to load bundle: %v", err), http.StatusInternalServerError)
202
return
···
218
vars := mux.Vars(r)
219
did := vars["did"]
220
221
+
// Fast lookup using dids table
222
+
didRecord, err := s.db.GetDIDRecord(r.Context(), did)
223
if err != nil {
224
+
if err == sql.ErrNoRows {
225
+
resp.error("DID not found", http.StatusNotFound)
226
+
} else {
227
+
resp.error(err.Error(), http.StatusInternalServerError)
228
+
}
229
return
230
}
231
232
var allOperations []plc.DIDHistoryEntry
233
var currentOp *plc.PLCOperation
234
235
+
// Load operations from each bundle
236
+
for _, bundleNum := range didRecord.BundleNumbers {
237
+
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), bundleNum)
238
if err != nil {
239
+
log.Error("Warning: failed to load bundle %d: %v", bundleNum, err)
240
continue
241
}
242
···
244
if op.DID == did {
245
entry := plc.DIDHistoryEntry{
246
Operation: op,
247
+
PLCBundle: fmt.Sprintf("%06d", bundleNum),
248
}
249
allOperations = append(allOperations, entry)
250
currentOp = &op
···
256
DID: did,
257
Current: currentOp,
258
Operations: allOperations,
259
+
})
260
+
}
261
+
262
+
func (s *Server) handleGetDIDStats(w http.ResponseWriter, r *http.Request) {
263
+
resp := newResponse(w)
264
+
ctx := r.Context()
265
+
266
+
totalDIDs, err := s.db.GetTotalDIDCount(ctx)
267
+
if err != nil {
268
+
resp.error(err.Error(), http.StatusInternalServerError)
269
+
return
270
+
}
271
+
272
+
lastBundle, err := s.db.GetLastBundleNumber(ctx)
273
+
if err != nil {
274
+
resp.error(err.Error(), http.StatusInternalServerError)
275
+
return
276
+
}
277
+
278
+
resp.json(map[string]interface{}{
279
+
"total_unique_dids": totalDIDs,
280
+
"last_bundle": lastBundle,
281
})
282
}
283
+1
internal/api/server.go
+1
internal/api/server.go
···
77
// DID routes
78
api.HandleFunc("/plc/did/{did}", s.handleGetDID).Methods("GET")
79
api.HandleFunc("/plc/did/{did}/history", s.handleGetDIDHistory).Methods("GET")
80
+
api.HandleFunc("/plc/dids/stats", s.handleGetDIDStats).Methods("GET") // NEW
81
82
// Mempool routes
83
api.HandleFunc("/mempool/stats", s.handleGetMempoolStats).Methods("GET")
+21
-3
internal/plc/bundle.go
+21
-3
internal/plc/bundle.go
···
351
uncompressedSize += int64(len(op.RawJSON)) + 1 // +1 for newline
352
}
353
354
bundle := &storage.PLCBundle{
355
BundleNumber: bundleNum,
356
-
StartTime: bf.operations[0].CreatedAt,
357
-
EndTime: bf.operations[len(bf.operations)-1].CreatedAt,
358
DIDs: dids,
359
Hash: bf.uncompressedHash,
360
CompressedHash: bf.compressedHash,
···
366
CreatedAt: time.Now(),
367
}
368
369
-
return bm.db.CreateBundle(ctx, bundle)
370
}
371
372
func (bm *BundleManager) extractUniqueDIDs(ops []PLCOperation) []string {
···
351
uncompressedSize += int64(len(op.RawJSON)) + 1 // +1 for newline
352
}
353
354
+
// Get time range from operations
355
+
firstSeenAt := bf.operations[0].CreatedAt
356
+
lastSeenAt := bf.operations[len(bf.operations)-1].CreatedAt
357
+
358
bundle := &storage.PLCBundle{
359
BundleNumber: bundleNum,
360
+
StartTime: firstSeenAt,
361
+
EndTime: lastSeenAt,
362
DIDs: dids,
363
Hash: bf.uncompressedHash,
364
CompressedHash: bf.compressedHash,
···
370
CreatedAt: time.Now(),
371
}
372
373
+
// Create bundle first
374
+
if err := bm.db.CreateBundle(ctx, bundle); err != nil {
375
+
return err
376
+
}
377
+
378
+
// Index DIDs synchronously (will use bulk inserts for speed)
379
+
if err := bm.db.AddBundleDIDs(ctx, bundleNum, dids, firstSeenAt, lastSeenAt); err != nil {
380
+
log.Error("Failed to index DIDs for bundle %06d: %v", bundleNum, err)
381
+
// Don't return error - bundle is already created
382
+
// DID indexing can be retried later
383
+
} else {
384
+
log.Verbose("✓ Indexed %d unique DIDs for bundle %06d", len(dids), bundleNum)
385
+
}
386
+
387
+
return nil
388
}
389
390
func (bm *BundleManager) extractUniqueDIDs(ops []PLCOperation) []string {
+6
internal/storage/db.go
+6
internal/storage/db.go
···
46
StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error
47
GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error)
48
GetEndpointStats(ctx context.Context) (*EndpointStats, error)
49
+
50
+
// DID operations
51
+
UpsertDID(ctx context.Context, did *DIDRecord) error
52
+
GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error)
53
+
AddBundleDIDs(ctx context.Context, bundleNum int, dids []string, firstSeenAt, lastSeenAt time.Time) error
54
+
GetTotalDIDCount(ctx context.Context) (int64, error)
55
}
+211
-16
internal/storage/sqlite.go
+211
-16
internal/storage/sqlite.go
···
65
CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
66
CREATE INDEX IF NOT EXISTS idx_endpoints_user_count ON endpoints(user_count);
67
68
-
-- Keep pds_scans table (or rename to endpoint_scans later)
69
CREATE TABLE IF NOT EXISTS pds_scans (
70
id INTEGER PRIMARY KEY AUTOINCREMENT,
71
pds_id INTEGER NOT NULL,
···
79
CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id);
80
CREATE INDEX IF NOT EXISTS idx_pds_scans_scanned_at ON pds_scans(scanned_at);
81
82
-
-- Metrics
83
CREATE TABLE IF NOT EXISTS plc_metrics (
84
id INTEGER PRIMARY KEY AUTOINCREMENT,
85
total_dids INTEGER,
···
90
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
91
);
92
93
-
-- Scan cursors with bundle number
94
CREATE TABLE IF NOT EXISTS scan_cursors (
95
source TEXT PRIMARY KEY,
96
last_bundle_number INTEGER DEFAULT 0,
···
98
records_processed INTEGER DEFAULT 0
99
);
100
101
-
-- Bundles with dual hashing
102
CREATE TABLE IF NOT EXISTS plc_bundles (
103
bundle_number INTEGER PRIMARY KEY,
104
start_time TIMESTAMP NOT NULL,
···
108
compressed_hash TEXT NOT NULL,
109
compressed_size INTEGER NOT NULL,
110
uncompressed_size INTEGER NOT NULL,
111
-
cumulative_compressed_size INTEGER NOT NULL, -- NEW
112
-
cumulative_uncompressed_size INTEGER NOT NULL, -- NEW
113
cursor TEXT,
114
prev_bundle_hash TEXT,
115
compressed BOOLEAN DEFAULT 1,
···
121
CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash);
122
CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC);
123
124
-
-- NEW: Mempool for pending operations
125
-
CREATE TABLE IF NOT EXISTS plc_mempool (
126
-
id INTEGER PRIMARY KEY AUTOINCREMENT,
127
-
did TEXT NOT NULL,
128
-
operation TEXT NOT NULL,
129
-
cid TEXT NOT NULL UNIQUE, -- ✅ Add UNIQUE constraint
130
-
created_at TIMESTAMP NOT NULL,
131
-
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
132
-
);
133
134
CREATE INDEX IF NOT EXISTS idx_mempool_created_at ON plc_mempool(created_at);
135
CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did);
136
-
CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid);
137
`
138
139
_, err := s.db.Exec(schema)
···
963
964
return metrics, rows.Err()
965
}
···
65
CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
66
CREATE INDEX IF NOT EXISTS idx_endpoints_user_count ON endpoints(user_count);
67
68
CREATE TABLE IF NOT EXISTS pds_scans (
69
id INTEGER PRIMARY KEY AUTOINCREMENT,
70
pds_id INTEGER NOT NULL,
···
78
CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id);
79
CREATE INDEX IF NOT EXISTS idx_pds_scans_scanned_at ON pds_scans(scanned_at);
80
81
CREATE TABLE IF NOT EXISTS plc_metrics (
82
id INTEGER PRIMARY KEY AUTOINCREMENT,
83
total_dids INTEGER,
···
88
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
89
);
90
91
CREATE TABLE IF NOT EXISTS scan_cursors (
92
source TEXT PRIMARY KEY,
93
last_bundle_number INTEGER DEFAULT 0,
···
95
records_processed INTEGER DEFAULT 0
96
);
97
98
CREATE TABLE IF NOT EXISTS plc_bundles (
99
bundle_number INTEGER PRIMARY KEY,
100
start_time TIMESTAMP NOT NULL,
···
104
compressed_hash TEXT NOT NULL,
105
compressed_size INTEGER NOT NULL,
106
uncompressed_size INTEGER NOT NULL,
107
+
cumulative_compressed_size INTEGER NOT NULL,
108
+
cumulative_uncompressed_size INTEGER NOT NULL,
109
cursor TEXT,
110
prev_bundle_hash TEXT,
111
compressed BOOLEAN DEFAULT 1,
···
117
CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash);
118
CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC);
119
120
+
CREATE TABLE IF NOT EXISTS plc_mempool (
121
+
id INTEGER PRIMARY KEY AUTOINCREMENT,
122
+
did TEXT NOT NULL,
123
+
operation TEXT NOT NULL,
124
+
cid TEXT NOT NULL UNIQUE,
125
+
created_at TIMESTAMP NOT NULL,
126
+
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
127
+
);
128
129
CREATE INDEX IF NOT EXISTS idx_mempool_created_at ON plc_mempool(created_at);
130
CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did);
131
+
CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid);
132
+
133
+
-- NEW: DIDs table
134
+
CREATE TABLE IF NOT EXISTS dids (
135
+
did TEXT PRIMARY KEY,
136
+
first_seen_bundle INTEGER NOT NULL,
137
+
last_seen_bundle INTEGER NOT NULL,
138
+
bundle_numbers TEXT NOT NULL, -- JSON array of bundle numbers
139
+
operation_count INTEGER DEFAULT 1,
140
+
first_seen_at TIMESTAMP NOT NULL,
141
+
last_seen_at TIMESTAMP NOT NULL
142
+
);
143
+
144
+
CREATE INDEX IF NOT EXISTS idx_dids_last_bundle ON dids(did);
145
+
CREATE INDEX IF NOT EXISTS idx_dids_last_bundle ON dids(last_seen_bundle);
146
+
CREATE INDEX IF NOT EXISTS idx_dids_first_seen ON dids(first_seen_at);
147
+
CREATE INDEX IF NOT EXISTS idx_dids_last_seen ON dids(last_seen_at);
148
`
149
150
_, err := s.db.Exec(schema)
···
974
975
return metrics, rows.Err()
976
}
977
+
978
+
// UpsertDID inserts or updates a DID record
979
+
func (s *SQLiteDB) UpsertDID(ctx context.Context, did *DIDRecord) error {
980
+
bundleNumbersJSON, err := json.Marshal(did.BundleNumbers)
981
+
if err != nil {
982
+
return err
983
+
}
984
+
985
+
query := `
986
+
INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at)
987
+
VALUES (?, ?, ?, ?, ?, ?, ?)
988
+
ON CONFLICT(did) DO UPDATE SET
989
+
last_seen_bundle = excluded.last_seen_bundle,
990
+
bundle_numbers = excluded.bundle_numbers,
991
+
operation_count = excluded.operation_count,
992
+
last_seen_at = excluded.last_seen_at
993
+
`
994
+
_, err = s.db.ExecContext(ctx, query,
995
+
did.DID, did.FirstSeenBundle, did.LastSeenBundle,
996
+
string(bundleNumbersJSON), did.OperationCount,
997
+
did.FirstSeenAt, did.LastSeenAt)
998
+
return err
999
+
}
1000
+
1001
+
// GetDIDRecord retrieves a DID record
1002
+
func (s *SQLiteDB) GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) {
1003
+
query := `
1004
+
SELECT did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at
1005
+
FROM dids
1006
+
WHERE did = ?
1007
+
`
1008
+
1009
+
var record DIDRecord
1010
+
var bundleNumbersJSON string
1011
+
1012
+
err := s.db.QueryRowContext(ctx, query, did).Scan(
1013
+
&record.DID, &record.FirstSeenBundle, &record.LastSeenBundle,
1014
+
&bundleNumbersJSON, &record.OperationCount,
1015
+
&record.FirstSeenAt, &record.LastSeenAt,
1016
+
)
1017
+
if err != nil {
1018
+
return nil, err
1019
+
}
1020
+
1021
+
// Parse bundle numbers
1022
+
if err := json.Unmarshal([]byte(bundleNumbersJSON), &record.BundleNumbers); err != nil {
1023
+
return nil, err
1024
+
}
1025
+
1026
+
return &record, nil
1027
+
}
1028
+
1029
+
// AddBundleDIDs adds DIDs for a bundle (optimized bulk operation)
1030
+
func (s *SQLiteDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string, firstSeenAt, lastSeenAt time.Time) error {
1031
+
if len(dids) == 0 {
1032
+
return nil
1033
+
}
1034
+
1035
+
tx, err := s.db.BeginTx(ctx, nil)
1036
+
if err != nil {
1037
+
return err
1038
+
}
1039
+
defer tx.Rollback()
1040
+
1041
+
// Get existing DIDs to update their bundle_numbers arrays
1042
+
existingDIDs := make(map[string][]int)
1043
+
if len(dids) > 0 {
1044
+
// Build query to fetch existing DIDs
1045
+
placeholders := make([]string, len(dids))
1046
+
args := make([]interface{}, len(dids))
1047
+
for i, did := range dids {
1048
+
placeholders[i] = "?"
1049
+
args[i] = did
1050
+
}
1051
+
1052
+
query := fmt.Sprintf(`
1053
+
SELECT did, bundle_numbers
1054
+
FROM dids
1055
+
WHERE did IN (%s)
1056
+
`, strings.Join(placeholders, ","))
1057
+
1058
+
rows, err := tx.QueryContext(ctx, query, args...)
1059
+
if err != nil {
1060
+
return err
1061
+
}
1062
+
1063
+
for rows.Next() {
1064
+
var did, bundleNumbersJSON string
1065
+
if err := rows.Scan(&did, &bundleNumbersJSON); err != nil {
1066
+
rows.Close()
1067
+
return err
1068
+
}
1069
+
1070
+
var bundles []int
1071
+
if err := json.Unmarshal([]byte(bundleNumbersJSON), &bundles); err != nil {
1072
+
rows.Close()
1073
+
return err
1074
+
}
1075
+
1076
+
existingDIDs[did] = bundles
1077
+
}
1078
+
rows.Close()
1079
+
1080
+
if err := rows.Err(); err != nil {
1081
+
return err
1082
+
}
1083
+
}
1084
+
1085
+
// Batch upsert
1086
+
batchSize := 500
1087
+
for i := 0; i < len(dids); i += batchSize {
1088
+
end := i + batchSize
1089
+
if end > len(dids) {
1090
+
end = len(dids)
1091
+
}
1092
+
batch := dids[i:end]
1093
+
1094
+
if err := s.bulkUpsertDIDsSimplified(ctx, tx, bundleNum, batch, existingDIDs, firstSeenAt, lastSeenAt); err != nil {
1095
+
return err
1096
+
}
1097
+
}
1098
+
1099
+
return tx.Commit()
1100
+
}
1101
+
1102
+
func (s *SQLiteDB) bulkUpsertDIDsSimplified(ctx context.Context, tx *sql.Tx, bundleNum int, dids []string, existingDIDs map[string][]int, firstSeenAt, lastSeenAt time.Time) error {
1103
+
if len(dids) == 0 {
1104
+
return nil
1105
+
}
1106
+
1107
+
var values []string
1108
+
var args []interface{}
1109
+
1110
+
for _, did := range dids {
1111
+
// Check if DID exists and append bundle number
1112
+
bundles := existingDIDs[did]
1113
+
1114
+
// Avoid duplicates
1115
+
alreadyHas := false
1116
+
for _, b := range bundles {
1117
+
if b == bundleNum {
1118
+
alreadyHas = true
1119
+
break
1120
+
}
1121
+
}
1122
+
1123
+
if !alreadyHas {
1124
+
bundles = append(bundles, bundleNum)
1125
+
}
1126
+
1127
+
bundleNumbersJSON, _ := json.Marshal(bundles)
1128
+
1129
+
if len(existingDIDs[did]) > 0 {
1130
+
// Update existing
1131
+
values = append(values, "(?, ?, ?, ?, ?, ?, ?)")
1132
+
args = append(args, did, bundleNum, bundleNum, string(bundleNumbersJSON), len(bundles), firstSeenAt, lastSeenAt)
1133
+
} else {
1134
+
// New DID
1135
+
values = append(values, "(?, ?, ?, ?, 1, ?, ?)")
1136
+
args = append(args, did, bundleNum, bundleNum, string(bundleNumbersJSON), firstSeenAt, lastSeenAt)
1137
+
}
1138
+
}
1139
+
1140
+
query := fmt.Sprintf(`
1141
+
INSERT INTO dids (did, first_seen_bundle, last_seen_bundle, bundle_numbers, operation_count, first_seen_at, last_seen_at)
1142
+
VALUES %s
1143
+
ON CONFLICT(did) DO UPDATE SET
1144
+
last_seen_bundle = excluded.last_seen_bundle,
1145
+
bundle_numbers = excluded.bundle_numbers,
1146
+
operation_count = excluded.operation_count,
1147
+
last_seen_at = excluded.last_seen_at
1148
+
`, strings.Join(values, ","))
1149
+
1150
+
_, err := tx.ExecContext(ctx, query, args...)
1151
+
return err
1152
+
}
1153
+
1154
+
// GetTotalDIDCount returns the total number of unique DIDs
1155
+
func (s *SQLiteDB) GetTotalDIDCount(ctx context.Context) (int64, error) {
1156
+
query := "SELECT COUNT(*) FROM dids"
1157
+
var count int64
1158
+
err := s.db.QueryRowContext(ctx, query).Scan(&count)
1159
+
return count, err
1160
+
}
+11
internal/storage/types.go
+11
internal/storage/types.go
···
150
LastScanTime time.Time
151
RecordsProcessed int64
152
}
153
+
154
+
// DIDRecord represents a DID entry in the database
155
+
type DIDRecord struct {
156
+
DID string
157
+
FirstSeenBundle int
158
+
LastSeenBundle int
159
+
BundleNumbers []int // Bundle numbers where this DID appears
160
+
OperationCount int
161
+
FirstSeenAt time.Time
162
+
LastSeenAt time.Time
163
+
}