update

Changed files
+146 -70
internal
+14 -7
internal/api/handlers.go
··· 47 47 r.w.Header().Set("X-Bundle-Start-Time", bundle.StartTime.Format(time.RFC3339Nano)) 48 48 r.w.Header().Set("X-Bundle-End-Time", bundle.EndTime.Format(time.RFC3339Nano)) 49 49 r.w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", plc.BUNDLE_SIZE)) 50 - r.w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", len(bundle.DIDs))) 50 + r.w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", bundle.DIDCount)) 51 51 } 52 52 53 53 // ===== REQUEST HELPERS ===== ··· 83 83 "start_time": bundle.StartTime, 84 84 "end_time": bundle.EndTime, 85 85 "operation_count": plc.BUNDLE_SIZE, 86 - "did_count": len(bundle.DIDs), 86 + "did_count": bundle.DIDCount, // Use DIDCount instead of len(DIDs) 87 87 "hash": bundle.Hash, 88 88 "compressed_hash": bundle.CompressedHash, 89 89 "compressed_size": bundle.CompressedSize, ··· 809 809 "progress_percent": float64(mempoolCount) / float64(plc.BUNDLE_SIZE) * 100, 810 810 "operations_needed": operationsNeeded, 811 811 "did_count": uniqueDIDCount, 812 - "start_time": firstOp.CreatedAt, // This is FIXED once first op exists 813 - "current_end_time": lastOp.CreatedAt, // This will change as more ops arrive 812 + "start_time": firstOp.CreatedAt, 813 + "current_end_time": lastOp.CreatedAt, 814 814 "uncompressed_size": uncompressedSize, 815 815 "estimated_compressed_size": estimatedCompressedSize, 816 816 "compression_ratio": float64(uncompressedSize) / float64(estimatedCompressedSize), ··· 823 823 result["current_rate_per_second"] = currentRate 824 824 } 825 825 826 - // Get actual mempool operations if requested 826 + // Get actual mempool operations if requested (for DIDs list) 827 827 if r.URL.Query().Get("include_dids") == "true" { 828 828 ops, err := s.db.GetMempoolOperations(ctx, plc.BUNDLE_SIZE) 829 829 if err == nil { ··· 858 858 return 859 859 } 860 860 861 + // Query DIDs from dids table instead 862 + dids, err := s.db.GetDIDsForBundle(r.Context(), bundleNum) 863 + if err != nil { 864 + resp.error(fmt.Sprintf("failed to get DIDs: %v", err), http.StatusInternalServerError) 865 + return 866 + } 867 + 861 868 resp.json(map[string]interface{}{ 862 869 "plc_bundle_number": bundle.BundleNumber, 863 - "did_count": len(bundle.DIDs), 864 - "dids": bundle.DIDs, 870 + "did_count": bundle.DIDCount, 871 + "dids": dids, 865 872 }) 866 873 } 867 874
+38 -18
internal/plc/bundle.go
··· 32 32 33 33 func NewBundleManager(dir string, enabled bool, db storage.Database, indexDIDs bool) (*BundleManager, error) { 34 34 if !enabled { 35 + log.Verbose("BundleManager disabled (enabled=false)") 35 36 return &BundleManager{enabled: false}, nil 36 37 } 37 38 ··· 49 50 return nil, err 50 51 } 51 52 53 + log.Verbose("BundleManager initialized: enabled=%v, indexDIDs=%v, dir=%s", enabled, indexDIDs, dir) 54 + 52 55 return &BundleManager{ 53 56 dir: dir, 54 57 enabled: enabled, 55 58 encoder: encoder, 56 59 decoder: decoder, 57 60 db: db, 58 - indexDIDs: indexDIDs, // NEW 61 + indexDIDs: indexDIDs, 59 62 }, nil 60 63 } 61 64 ··· 337 340 // ===== BUNDLE INDEXING ===== 338 341 339 342 func (bm *BundleManager) indexBundle(ctx context.Context, bundleNum int, bf *bundleFile, cursor string) error { 343 + log.Verbose("indexBundle called for bundle %06d: indexDIDs=%v", bundleNum, bm.indexDIDs) 344 + 340 345 prevHash := "" 341 346 if bundleNum > 1 { 342 347 if prev, err := bm.db.GetBundleByNumber(ctx, bundleNum-1); err == nil { ··· 345 350 } 346 351 347 352 dids := bm.extractUniqueDIDs(bf.operations) 353 + log.Verbose("Extracted %d unique DIDs from bundle %06d", len(dids), bundleNum) 354 + 348 355 compressedFileSize := bm.getFileSize(bf.path) 349 356 350 357 // Calculate uncompressed size 351 358 uncompressedSize := int64(0) 352 359 for _, op := range bf.operations { 353 - uncompressedSize += int64(len(op.RawJSON)) + 1 // +1 for newline 360 + uncompressedSize += int64(len(op.RawJSON)) + 1 354 361 } 355 362 356 363 // Get time range from operations ··· 361 368 BundleNumber: bundleNum, 362 369 StartTime: firstSeenAt, 363 370 EndTime: lastSeenAt, 364 - DIDs: dids, 371 + DIDCount: len(dids), 365 372 Hash: bf.uncompressedHash, 366 373 CompressedHash: bf.compressedHash, 367 374 CompressedSize: compressedFileSize, ··· 372 379 CreatedAt: time.Now().UTC(), 373 380 } 374 381 382 + log.Verbose("About to create bundle %06d in database (DIDCount=%d)", bundleNum, bundle.DIDCount) 383 + 375 384 // Create bundle first 376 385 if err := bm.db.CreateBundle(ctx, bundle); err != nil { 386 + log.Error("Failed to create bundle %06d in database: %v", bundleNum, err) 377 387 return err 378 388 } 379 389 380 - // NEW: Only index DIDs if enabled 390 + log.Verbose("Bundle %06d created successfully in database", bundleNum) 391 + 392 + // Index DIDs if enabled 381 393 if bm.indexDIDs { 382 394 start := time.Now() 395 + log.Verbose("Starting DID indexing for bundle %06d: %d unique DIDs", bundleNum, len(dids)) 383 396 384 - // Extract handle and PDS for each DID using centralized helper 397 + // Extract handle and PDS for each DID 385 398 didInfoMap := ExtractDIDInfoMap(bf.operations) 399 + log.Verbose("Extracted info for %d DIDs from operations", len(didInfoMap)) 386 400 387 - if err := bm.db.AddBundleDIDs(ctx, bundleNum, dids); err != nil { 388 - log.Error("Failed to index DIDs for bundle %06d: %v", bundleNum, err) 389 - // Don't return error - bundle is already created 390 - } else { 391 - // Update handle and PDS for each DID 392 - for did, info := range didInfoMap { 393 - // Validate handle length before saving 394 - validHandle := ValidateHandle(info.Handle) 401 + successCount := 0 402 + errorCount := 0 403 + invalidHandleCount := 0 395 404 396 - if err := bm.db.UpsertDID(ctx, did, bundleNum, validHandle, info.PDS); err != nil { 397 - log.Error("Failed to update DID %s metadata: %v", did, err) 398 - } 405 + // Upsert each DID with handle, pds, and bundle number 406 + for did, info := range didInfoMap { 407 + validHandle := ValidateHandle(info.Handle) 408 + if info.Handle != "" && validHandle == "" { 409 + //log.Verbose("Bundle %06d: Skipping invalid handle for DID %s (length: %d)", bundleNum, did, len(info.Handle)) 410 + invalidHandleCount++ 399 411 } 400 412 401 - elapsed := time.Since(start) 402 - log.Verbose("✓ Indexed %d unique DIDs for bundle %06d in %v", len(dids), bundleNum, elapsed) 413 + if err := bm.db.UpsertDID(ctx, did, bundleNum, validHandle, info.PDS); err != nil { 414 + log.Error("Failed to index DID %s for bundle %06d: %v", did, bundleNum, err) 415 + errorCount++ 416 + } else { 417 + successCount++ 418 + } 403 419 } 420 + 421 + elapsed := time.Since(start) 422 + log.Info("✓ Indexed bundle %06d: %d DIDs succeeded, %d errors, %d invalid handles in %v", 423 + bundleNum, successCount, errorCount, invalidHandleCount, elapsed) 404 424 } else { 405 425 log.Verbose("⊘ Skipped DID indexing for bundle %06d (disabled in config)", bundleNum) 406 426 }
+3 -1
internal/plc/scanner.go
··· 21 21 } 22 22 23 23 func NewScanner(db storage.Database, cfg config.PLCConfig) *Scanner { 24 - bundleManager, err := NewBundleManager(cfg.BundleDir, cfg.UseCache, db, cfg.IndexDIDs) // NEW: pass IndexDIDs 24 + log.Verbose("NewScanner: IndexDIDs config = %v", cfg.IndexDIDs) 25 + 26 + bundleManager, err := NewBundleManager(cfg.BundleDir, cfg.UseCache, db, cfg.IndexDIDs) 25 27 if err != nil { 26 28 log.Error("Warning: failed to initialize bundle manager: %v", err) 27 29 bundleManager = &BundleManager{enabled: false}
+1
internal/storage/db.go
··· 55 55 GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) 56 56 GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) 57 57 GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) 58 + GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, error) 58 59 GetBundleStats(ctx context.Context) (count, compressedSize, uncompressedSize, lastBundle int64, err error) 59 60 GetLastBundleNumber(ctx context.Context) (int, error) 60 61 GetBundleForTimestamp(ctx context.Context, afterTime time.Time) (int, error)
+89 -43
internal/storage/postgres.go
··· 157 157 records_processed BIGINT DEFAULT 0 158 158 ); 159 159 160 - CREATE TABLE IF NOT EXISTS plc_bundles ( 161 - bundle_number INTEGER PRIMARY KEY, 162 - start_time TIMESTAMP NOT NULL, 163 - end_time TIMESTAMP NOT NULL, 164 - dids JSONB NOT NULL, 165 - hash TEXT NOT NULL, 166 - compressed_hash TEXT NOT NULL, 167 - compressed_size BIGINT NOT NULL, 168 - uncompressed_size BIGINT NOT NULL, 169 - cumulative_compressed_size BIGINT NOT NULL, 170 - cumulative_uncompressed_size BIGINT NOT NULL, 171 - cursor TEXT, 172 - prev_bundle_hash TEXT, 173 - compressed BOOLEAN DEFAULT true, 174 - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 175 - ); 160 + CREATE TABLE IF NOT EXISTS plc_bundles ( 161 + bundle_number INTEGER PRIMARY KEY, 162 + start_time TIMESTAMP NOT NULL, 163 + end_time TIMESTAMP NOT NULL, 164 + did_count INTEGER NOT NULL DEFAULT 0, 165 + hash TEXT NOT NULL, 166 + compressed_hash TEXT NOT NULL, 167 + compressed_size BIGINT NOT NULL, 168 + uncompressed_size BIGINT NOT NULL, 169 + cumulative_compressed_size BIGINT NOT NULL, 170 + cumulative_uncompressed_size BIGINT NOT NULL, 171 + cursor TEXT, 172 + prev_bundle_hash TEXT, 173 + compressed BOOLEAN DEFAULT true, 174 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 175 + ); 176 176 177 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time); 178 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_hash ON plc_bundles(hash); 179 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash); 180 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC); 181 - CREATE INDEX IF NOT EXISTS idx_plc_bundles_dids ON plc_bundles USING gin(dids); 177 + CREATE INDEX IF NOT EXISTS idx_plc_bundles_time ON plc_bundles(start_time, end_time); 178 + CREATE INDEX IF NOT EXISTS idx_plc_bundles_hash ON plc_bundles(hash); 179 + CREATE INDEX IF NOT EXISTS idx_plc_bundles_prev ON plc_bundles(prev_bundle_hash); 180 + CREATE INDEX IF NOT EXISTS idx_plc_bundles_number_desc ON plc_bundles(bundle_number DESC); 182 181 183 182 CREATE TABLE IF NOT EXISTS plc_mempool ( 184 183 id BIGSERIAL PRIMARY KEY, ··· 1168 1167 // ===== BUNDLE OPERATIONS ===== 1169 1168 1170 1169 func (p *PostgresDB) CreateBundle(ctx context.Context, bundle *PLCBundle) error { 1171 - didsJSON, err := json.Marshal(bundle.DIDs) 1172 - if err != nil { 1173 - return err 1174 - } 1175 - 1176 1170 // Calculate cumulative sizes from previous bundle 1177 1171 if bundle.BundleNumber > 1 { 1178 1172 prevBundle, err := p.GetBundleByNumber(ctx, bundle.BundleNumber-1) ··· 1190 1184 1191 1185 query := ` 1192 1186 INSERT INTO plc_bundles ( 1193 - bundle_number, start_time, end_time, dids, 1187 + bundle_number, start_time, end_time, did_count, 1194 1188 hash, compressed_hash, compressed_size, uncompressed_size, 1195 1189 cumulative_compressed_size, cumulative_uncompressed_size, 1196 1190 cursor, prev_bundle_hash, compressed ··· 1199 1193 ON CONFLICT(bundle_number) DO UPDATE SET 1200 1194 start_time = EXCLUDED.start_time, 1201 1195 end_time = EXCLUDED.end_time, 1202 - dids = EXCLUDED.dids, 1196 + did_count = EXCLUDED.did_count, 1203 1197 hash = EXCLUDED.hash, 1204 1198 compressed_hash = EXCLUDED.compressed_hash, 1205 1199 compressed_size = EXCLUDED.compressed_size, ··· 1210 1204 prev_bundle_hash = EXCLUDED.prev_bundle_hash, 1211 1205 compressed = EXCLUDED.compressed 1212 1206 ` 1213 - _, err = p.db.ExecContext(ctx, query, 1207 + _, err := p.db.ExecContext(ctx, query, 1214 1208 bundle.BundleNumber, bundle.StartTime, bundle.EndTime, 1215 - didsJSON, bundle.Hash, bundle.CompressedHash, 1209 + bundle.DIDCount, bundle.Hash, bundle.CompressedHash, 1216 1210 bundle.CompressedSize, bundle.UncompressedSize, 1217 1211 bundle.CumulativeCompressedSize, bundle.CumulativeUncompressedSize, 1218 1212 bundle.Cursor, bundle.PrevBundleHash, bundle.Compressed, ··· 1223 1217 1224 1218 func (p *PostgresDB) GetBundleByNumber(ctx context.Context, bundleNumber int) (*PLCBundle, error) { 1225 1219 query := ` 1226 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 1220 + SELECT bundle_number, start_time, end_time, did_count, hash, compressed_hash, 1227 1221 compressed_size, uncompressed_size, cumulative_compressed_size, 1228 1222 cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 1229 1223 FROM plc_bundles ··· 1231 1225 ` 1232 1226 1233 1227 var bundle PLCBundle 1234 - var didsJSON []byte 1235 1228 var prevHash sql.NullString 1236 1229 var cursor sql.NullString 1237 1230 1238 1231 err := p.db.QueryRowContext(ctx, query, bundleNumber).Scan( 1239 1232 &bundle.BundleNumber, &bundle.StartTime, &bundle.EndTime, 1240 - &didsJSON, &bundle.Hash, &bundle.CompressedHash, 1233 + &bundle.DIDCount, &bundle.Hash, &bundle.CompressedHash, 1241 1234 &bundle.CompressedSize, &bundle.UncompressedSize, 1242 1235 &bundle.CumulativeCompressedSize, &bundle.CumulativeUncompressedSize, 1243 1236 &cursor, &prevHash, &bundle.Compressed, &bundle.CreatedAt, ··· 1253 1246 bundle.Cursor = cursor.String 1254 1247 } 1255 1248 1256 - json.Unmarshal(didsJSON, &bundle.DIDs) 1257 1249 return &bundle, nil 1258 1250 } 1259 1251 1260 1252 func (p *PostgresDB) GetBundles(ctx context.Context, limit int) ([]*PLCBundle, error) { 1261 1253 query := ` 1262 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 1254 + SELECT bundle_number, start_time, end_time, did_count, hash, compressed_hash, 1263 1255 compressed_size, uncompressed_size, cumulative_compressed_size, 1264 1256 cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 1265 1257 FROM plc_bundles ··· 1277 1269 } 1278 1270 1279 1271 func (p *PostgresDB) GetBundlesForDID(ctx context.Context, did string) ([]*PLCBundle, error) { 1280 - query := ` 1281 - SELECT bundle_number, start_time, end_time, dids, hash, compressed_hash, 1272 + // Get bundle numbers from dids table 1273 + var bundleNumbersJSON []byte 1274 + err := p.db.QueryRowContext(ctx, ` 1275 + SELECT bundle_numbers FROM dids WHERE did = $1 1276 + `, did).Scan(&bundleNumbersJSON) 1277 + 1278 + if err == sql.ErrNoRows { 1279 + return []*PLCBundle{}, nil 1280 + } 1281 + if err != nil { 1282 + return nil, err 1283 + } 1284 + 1285 + var bundleNumbers []int 1286 + if err := json.Unmarshal(bundleNumbersJSON, &bundleNumbers); err != nil { 1287 + return nil, err 1288 + } 1289 + 1290 + if len(bundleNumbers) == 0 { 1291 + return []*PLCBundle{}, nil 1292 + } 1293 + 1294 + // Build query with IN clause 1295 + placeholders := make([]string, len(bundleNumbers)) 1296 + args := make([]interface{}, len(bundleNumbers)) 1297 + for i, num := range bundleNumbers { 1298 + placeholders[i] = fmt.Sprintf("$%d", i+1) 1299 + args[i] = num 1300 + } 1301 + 1302 + query := fmt.Sprintf(` 1303 + SELECT bundle_number, start_time, end_time, did_count, hash, compressed_hash, 1282 1304 compressed_size, uncompressed_size, cumulative_compressed_size, 1283 1305 cumulative_uncompressed_size, cursor, prev_bundle_hash, compressed, created_at 1284 1306 FROM plc_bundles 1285 - WHERE dids ? $1 1307 + WHERE bundle_number IN (%s) 1286 1308 ORDER BY bundle_number ASC 1287 - ` 1309 + `, strings.Join(placeholders, ",")) 1288 1310 1289 - rows, err := p.db.QueryContext(ctx, query, did) 1311 + rows, err := p.db.QueryContext(ctx, query, args...) 1290 1312 if err != nil { 1291 1313 return nil, err 1292 1314 } ··· 1295 1317 return p.scanBundles(rows) 1296 1318 } 1297 1319 1320 + func (p *PostgresDB) GetDIDsForBundle(ctx context.Context, bundleNum int) ([]string, error) { 1321 + query := ` 1322 + SELECT did 1323 + FROM dids 1324 + WHERE bundle_numbers @> $1::jsonb 1325 + ORDER BY did 1326 + ` 1327 + 1328 + rows, err := p.db.QueryContext(ctx, query, fmt.Sprintf("[%d]", bundleNum)) 1329 + if err != nil { 1330 + return nil, err 1331 + } 1332 + defer rows.Close() 1333 + 1334 + var dids []string 1335 + for rows.Next() { 1336 + var did string 1337 + if err := rows.Scan(&did); err != nil { 1338 + return nil, err 1339 + } 1340 + dids = append(dids, did) 1341 + } 1342 + 1343 + return dids, rows.Err() 1344 + } 1345 + 1298 1346 func (p *PostgresDB) scanBundles(rows *sql.Rows) ([]*PLCBundle, error) { 1299 1347 var bundles []*PLCBundle 1300 1348 1301 1349 for rows.Next() { 1302 1350 var bundle PLCBundle 1303 - var didsJSON []byte 1304 1351 var prevHash sql.NullString 1305 1352 var cursor sql.NullString 1306 1353 ··· 1308 1355 &bundle.BundleNumber, 1309 1356 &bundle.StartTime, 1310 1357 &bundle.EndTime, 1311 - &didsJSON, 1358 + &bundle.DIDCount, 1312 1359 &bundle.Hash, 1313 1360 &bundle.CompressedHash, 1314 1361 &bundle.CompressedSize, ··· 1330 1377 bundle.Cursor = cursor.String 1331 1378 } 1332 1379 1333 - json.Unmarshal(didsJSON, &bundle.DIDs) 1334 1380 bundles = append(bundles, &bundle) 1335 1381 } 1336 1382
+1 -1
internal/storage/types.go
··· 119 119 StartTime time.Time 120 120 EndTime time.Time 121 121 BoundaryCIDs []string 122 - DIDs []string 122 + DIDCount int // Changed from DIDs []string 123 123 Hash string 124 124 CompressedHash string 125 125 CompressedSize int64