update

Changed files
+163 -70
cmd
internal
+3
cmd/atscanner.go
··· 43 43 } 44 44 defer db.Close() 45 45 46 + // Set scan retention from config 47 + db.SetScanRetention(cfg.PDS.ScanRetention) 48 + 46 49 // Run migrations 47 50 if err := db.Migrate(); err != nil { 48 51 log.Fatal("Failed to run migrations: %v", err)
+1
config.yaml
··· 15 15 timeout: "30s" 16 16 workers: 20 17 17 recheck_interval: "5m" 18 + scan_retention: 3 18 19 19 20 api: 20 21 host: "0.0.0.0"
+27 -15
internal/api/handlers.go
··· 243 243 if pds.LatestScan != nil { 244 244 response["user_count"] = pds.LatestScan.UserCount 245 245 response["response_time"] = pds.LatestScan.ResponseTime 246 + if pds.LatestScan.Version != "" { // NEW: Add this block 247 + response["version"] = pds.LatestScan.Version 248 + } 246 249 if !pds.LatestScan.ScannedAt.IsZero() { 247 250 response["last_scan"] = pds.LatestScan.ScannedAt 248 251 } ··· 276 279 // Start with list item formatting 277 280 response := formatPDSListItem(&pds.PDSListItem) 278 281 279 - // Add server_info from latest scan 280 - if pds.LatestScan != nil && pds.LatestScan.ServerInfo != nil { 281 - response["server_info"] = pds.LatestScan.ServerInfo 282 + // Add server_info and version from latest scan (PDSDetail's LatestScan takes precedence) 283 + if pds.LatestScan != nil { 284 + // Override with detail-specific scan data 285 + response["user_count"] = pds.LatestScan.UserCount 286 + response["response_time"] = pds.LatestScan.ResponseTime 287 + 288 + if pds.LatestScan.Version != "" { 289 + response["version"] = pds.LatestScan.Version 290 + } 291 + 292 + if !pds.LatestScan.ScannedAt.IsZero() { 293 + response["last_scan"] = pds.LatestScan.ScannedAt 294 + } 295 + 296 + if pds.LatestScan.ServerInfo != nil { 297 + response["server_info"] = pds.LatestScan.ServerInfo 298 + } 282 299 } 283 300 284 301 // Add full IP info 285 302 if pds.IPInfo != nil { 286 - response["ip_info"] = pds.IPInfo // This now includes raw_data 303 + response["ip_info"] = pds.IPInfo 287 304 } 288 305 289 306 return response ··· 302 319 scanMap["response_time"] = scan.ResponseTime 303 320 } 304 321 305 - // NEW: Use the top-level UserCount field first 322 + // NEW: Add version if available 323 + if scan.Version != "" { 324 + scanMap["version"] = scan.Version 325 + } 326 + 327 + // Use the top-level UserCount field first 306 328 if scan.UserCount > 0 { 307 329 scanMap["user_count"] = scan.UserCount 308 330 } else if scan.ScanData != nil && scan.ScanData.Metadata != nil { ··· 315 337 } 316 338 317 339 if scan.ScanData != nil { 318 - // Metadata is already map[string]interface{}, no type assertion needed 319 - // if scan.ScanData.Metadata != nil { 320 - // // Extract user_count from metadata 321 - // if userCount, ok := scan.ScanData.Metadata["user_count"].(int); ok { 322 - // scanMap["user_count"] = userCount 323 - // } else if userCount, ok := scan.ScanData.Metadata["user_count"].(float64); ok { 324 - // scanMap["user_count"] = int(userCount) 325 - // } 326 - // } -- OLD Block (replaced by above) 327 - 328 340 // Include DID count if available 329 341 if scan.ScanData.DIDCount > 0 { 330 342 scanMap["did_count"] = scan.ScanData.DIDCount
+5 -1
internal/config/config.go
··· 16 16 17 17 type DatabaseConfig struct { 18 18 Path string `yaml:"path"` 19 - Type string `yaml:"type"` // sqlite, postgres 19 + Type string `yaml:"type"` // postgres 20 20 } 21 21 22 22 type PLCConfig struct { ··· 33 33 Timeout time.Duration `yaml:"timeout"` 34 34 Workers int `yaml:"workers"` 35 35 RecheckInterval time.Duration `yaml:"recheck_interval"` 36 + ScanRetention int `yaml:"scan_retention"` 36 37 } 37 38 38 39 type APIConfig struct { ··· 73 74 } 74 75 if cfg.PDS.Workers == 0 { 75 76 cfg.PDS.Workers = 10 77 + } 78 + if cfg.PDS.ScanRetention == 0 { 79 + cfg.PDS.ScanRetention = 3 76 80 } 77 81 if cfg.API.Port == 0 { 78 82 cfg.API.Port = 8080
+23 -5
internal/pds/client.go
··· 111 111 return &desc, nil 112 112 } 113 113 114 - // CheckHealth performs a basic health check 115 - func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, error) { 114 + // CheckHealth performs a basic health check, ensuring the endpoint returns JSON with a "version" 115 + func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, error) { 116 116 startTime := time.Now() 117 117 118 118 url := fmt.Sprintf("%s/xrpc/_health", endpoint) 119 119 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 120 120 if err != nil { 121 - return false, 0, err 121 + return false, 0, "", err 122 122 } 123 123 124 124 resp, err := c.httpClient.Do(req) 125 125 duration := time.Since(startTime) 126 126 127 127 if err != nil { 128 - return false, duration, err 128 + return false, duration, "", err 129 129 } 130 130 defer resp.Body.Close() 131 131 132 - return resp.StatusCode == http.StatusOK, duration, nil 132 + if resp.StatusCode != http.StatusOK { 133 + return false, duration, "", fmt.Errorf("health check returned status %d", resp.StatusCode) 134 + } 135 + 136 + // Decode the JSON response and check for "version" 137 + var healthResponse struct { 138 + Version string `json:"version"` 139 + } 140 + 141 + if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil { 142 + return false, duration, "", fmt.Errorf("failed to decode health JSON: %w", err) 143 + } 144 + 145 + if healthResponse.Version == "" { 146 + return false, duration, "", fmt.Errorf("health JSON response missing 'version' field") 147 + } 148 + 149 + // All checks passed 150 + return true, duration, healthResponse.Version, nil 133 151 }
+5 -2
internal/pds/scanner.go
··· 42 42 return err 43 43 } 44 44 45 + // 2. ADD THIS BLOCK TO SHUFFLE THE LIST 45 46 if len(servers) > 0 { 46 47 // Create a new random source to avoid using the global one 47 48 r := rand.New(rand.NewSource(time.Now().UnixNano())) ··· 108 109 s.db.UpdateEndpointIP(ctx, ep.ID, ip, time.Now().UTC()) 109 110 110 111 // STEP 2: Health check 111 - available, responseTime, err := s.client.CheckHealth(ctx, ep.Endpoint) 112 + available, responseTime, version, err := s.client.CheckHealth(ctx, ep.Endpoint) // CHANGED: receive version 112 113 if err != nil || !available { 113 114 errMsg := "health check failed" 114 115 if err != nil { ··· 140 141 ResponseTime: responseTime, 141 142 Description: desc, 142 143 DIDs: dids, 144 + Version: version, // CHANGED: Pass version 143 145 }) 144 146 145 147 // STEP 5: Fetch IP info if needed (async, with backoff) ··· 174 176 EndpointID: endpointID, 175 177 Status: result.Status, 176 178 ResponseTime: result.ResponseTime.Seconds() * 1000, // Convert to ms 177 - UserCount: userCount, // NEW: Set the top-level field 179 + UserCount: userCount, 180 + Version: result.Version, // NEW: Set the version field 178 181 ScanData: scanData, 179 182 ScannedAt: time.Now().UTC(), 180 183 }
+1
internal/pds/types.go
··· 37 37 ErrorMessage string 38 38 Description *ServerDescription 39 39 DIDs []string 40 + Version string // NEW: Add this field to pass the version 40 41 }
+1
internal/storage/db.go
··· 29 29 GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) 30 30 UpdateEndpointIP(ctx context.Context, endpointID int64, ip string, resolvedAt time.Time) error 31 31 SaveEndpointScan(ctx context.Context, scan *EndpointScan) error 32 + SetScanRetention(retention int) 32 33 UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error 33 34 34 35 // PDS virtual endpoints (created via JOINs)
+94 -47
internal/storage/postgres.go
··· 14 14 ) 15 15 16 16 type PostgresDB struct { 17 - db *sql.DB 18 - pool *pgxpool.Pool // Add this for COPY support 17 + db *sql.DB 18 + pool *pgxpool.Pool 19 + scanRetention int 19 20 } 20 21 21 22 func NewPostgresDB(connString string) (*PostgresDB, error) { ··· 42 43 return nil, fmt.Errorf("failed to create pgx pool: %w", err) 43 44 } 44 45 45 - return &PostgresDB{db: db, pool: pool}, nil 46 + return &PostgresDB{ 47 + db: db, 48 + pool: pool, 49 + scanRetention: 3, 50 + }, nil 46 51 } 47 52 48 53 func (p *PostgresDB) Close() error { ··· 99 104 endpoint_id BIGINT NOT NULL, 100 105 status INTEGER NOT NULL, 101 106 response_time DOUBLE PRECISION, 102 - user_count BIGINT, -- NEW: Add user_count column 107 + user_count BIGINT, 108 + version TEXT, 103 109 scan_data JSONB, 104 110 scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 105 111 FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE ··· 107 113 108 114 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_endpoint_status_scanned ON endpoint_scans(endpoint_id, status, scanned_at DESC); 109 115 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_scanned_at ON endpoint_scans(scanned_at); 110 - CREATE INDEX IF NOT EXISTS idx_endpoint_scans_user_count ON endpoint_scans(user_count DESC NULLS LAST); -- NEW: Index for sorting 116 + CREATE INDEX IF NOT EXISTS idx_endpoint_scans_user_count ON endpoint_scans(user_count DESC NULLS LAST); 111 117 112 118 CREATE TABLE IF NOT EXISTS plc_metrics ( 113 119 id BIGSERIAL PRIMARY KEY, ··· 345 351 346 352 // ===== SCAN OPERATIONS ===== 347 353 354 + func (p *PostgresDB) SetScanRetention(retention int) { 355 + p.scanRetention = retention 356 + } 357 + 348 358 func (p *PostgresDB) SaveEndpointScan(ctx context.Context, scan *EndpointScan) error { 349 359 var scanDataJSON []byte 350 360 if scan.ScanData != nil { 351 361 scanDataJSON, _ = json.Marshal(scan.ScanData) 352 362 } 353 363 364 + tx, err := p.db.BeginTx(ctx, nil) 365 + if err != nil { 366 + return err 367 + } 368 + defer tx.Rollback() 369 + 354 370 query := ` 355 - INSERT INTO endpoint_scans (endpoint_id, status, response_time, user_count, scan_data, scanned_at) 356 - VALUES ($1, $2, $3, $4, $5, $6) 371 + INSERT INTO endpoint_scans (endpoint_id, status, response_time, user_count, version, scan_data, scanned_at) 372 + VALUES ($1, $2, $3, $4, $5, $6, $7) 373 + ` 374 + _, err = tx.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scan.UserCount, scan.Version, scanDataJSON, scan.ScannedAt) 375 + if err != nil { 376 + return err 377 + } 378 + 379 + // Use configured retention value 380 + cleanupQuery := ` 381 + DELETE FROM endpoint_scans 382 + WHERE endpoint_id = $1 383 + AND id NOT IN ( 384 + SELECT id 385 + FROM endpoint_scans 386 + WHERE endpoint_id = $1 387 + ORDER BY scanned_at DESC 388 + LIMIT $2 389 + ) 357 390 ` 358 - _, err := p.db.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scan.UserCount, scanDataJSON, scan.ScannedAt) 359 - return err 391 + _, err = tx.ExecContext(ctx, cleanupQuery, scan.EndpointID, p.scanRetention) 392 + if err != nil { 393 + return err 394 + } 395 + 396 + return tx.Commit() 360 397 } 361 398 362 399 func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) { 363 400 query := ` 364 - SELECT id, endpoint_id, status, response_time, user_count, scan_data, scanned_at 365 - FROM endpoint_scans 366 - WHERE endpoint_id = $1 401 + SELECT id, endpoint_id, status, response_time, user_count, version, scan_data, scanned_at 402 + FROM endpoint_scans 403 + WHERE endpoint_id = $1 367 404 ORDER BY scanned_at DESC 368 405 LIMIT $2 369 406 ` ··· 378 415 for rows.Next() { 379 416 var scan EndpointScan 380 417 var responseTime sql.NullFloat64 381 - var userCount sql.NullInt64 // NEW: Use NullInt64 418 + var userCount sql.NullInt64 419 + var version sql.NullString // NEW 382 420 var scanDataJSON []byte 383 421 384 - err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &userCount, &scanDataJSON, &scan.ScannedAt) 422 + err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &userCount, &version, &scanDataJSON, &scan.ScannedAt) 385 423 if err != nil { 386 424 return nil, err 387 425 } ··· 390 428 scan.ResponseTime = responseTime.Float64 391 429 } 392 430 393 - if userCount.Valid { // NEW: Check validity and set 431 + if userCount.Valid { 394 432 scan.UserCount = userCount.Int64 395 433 } 396 434 435 + if version.Valid { // NEW 436 + scan.Version = version.String 437 + } 438 + 397 439 if len(scanDataJSON) > 0 { 398 440 var scanData EndpointScanData 399 441 if err := json.Unmarshal(scanDataJSON, &scanData); err == nil { ··· 411 453 412 454 func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) { 413 455 query := ` 414 - SELECT 415 - e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip, 416 - latest.user_count, latest.response_time, latest.scanned_at, 417 - i.city, i.country, i.country_code, i.asn, i.asn_org, 456 + SELECT 457 + e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip, 458 + latest.user_count, latest.response_time, latest.version, latest.scanned_at, 459 + i.city, i.country, i.country_code, i.asn, i.asn_org, 418 460 i.is_datacenter, i.is_vpn, i.latitude, i.longitude 419 461 FROM endpoints e 420 - LEFT JOIN LATERAL ( 421 - SELECT 422 - -- COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count, -- OLD 423 - user_count, -- NEW: Use the column directly 424 - response_time, 425 - scanned_at 462 + LEFT JOIN LATERAL ( 463 + SELECT 464 + user_count, 465 + response_time, 466 + version, 467 + scanned_at 426 468 FROM endpoint_scans 427 469 WHERE endpoint_id = e.id AND status = 1 428 470 ORDER BY scanned_at DESC ··· 478 520 var lat, lon sql.NullFloat64 479 521 var userCount sql.NullInt32 480 522 var responseTime sql.NullFloat64 523 + var version sql.NullString // ADD THIS LINE 481 524 var scannedAt sql.NullTime 482 525 483 526 err := rows.Scan( 484 527 &item.ID, &item.Endpoint, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, 485 - &userCount, &responseTime, &scannedAt, 528 + &userCount, &responseTime, &version, &scannedAt, // ADD &version HERE 486 529 &city, &country, &countryCode, &asn, &asnOrg, 487 530 &isDatacenter, &isVPN, &lat, &lon, 488 531 ) ··· 499 542 item.LatestScan = &struct { 500 543 UserCount int 501 544 ResponseTime float64 545 + Version string 502 546 ScannedAt time.Time 503 547 }{ 504 548 UserCount: int(userCount.Int32), 505 549 ResponseTime: responseTime.Float64, 550 + Version: version.String, 506 551 ScannedAt: scannedAt.Time, 507 552 } 508 553 } ··· 531 576 532 577 func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 533 578 query := ` 534 - SELECT 535 - e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip, 536 - latest.user_count, 537 - latest.response_time, 538 - latest.scan_data->'metadata'->'server_info' as server_info, 539 - latest.scanned_at, 540 - i.city, i.country, i.country_code, i.asn, i.asn_org, 541 - i.is_datacenter, i.is_vpn, i.latitude, i.longitude, 542 - i.raw_data -- NEW: Select the raw_data column 543 - FROM endpoints e 544 - LEFT JOIN LATERAL ( 545 - SELECT scan_data, response_time, scanned_at, user_count -- NEW: Select user_count 546 - FROM endpoint_scans 547 - WHERE endpoint_id = e.id 579 + SELECT 580 + e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip, 581 + latest.user_count, 582 + latest.response_time, 583 + latest.version, -- ADD THIS LINE 584 + latest.scan_data->'metadata'->'server_info' as server_info, 585 + latest.scanned_at, 586 + i.city, i.country, i.country_code, i.asn, i.asn_org, 587 + i.is_datacenter, i.is_vpn, i.latitude, i.longitude, 588 + i.raw_data 589 + FROM endpoints e 590 + LEFT JOIN LATERAL ( 591 + SELECT scan_data, response_time, version, scanned_at, user_count -- ADD version HERE 592 + FROM endpoint_scans 593 + WHERE endpoint_id = e.id 548 594 ORDER BY scanned_at DESC 549 595 LIMIT 1 550 596 ) latest ON true ··· 559 605 var lat, lon sql.NullFloat64 560 606 var userCount sql.NullInt32 561 607 var responseTime sql.NullFloat64 608 + var version sql.NullString // ADD THIS LINE 562 609 var serverInfoJSON []byte 563 610 var scannedAt sql.NullTime 564 - var rawDataJSON []byte // NEW: Add a variable for raw_data 611 + var rawDataJSON []byte 565 612 566 613 err := p.db.QueryRowContext(ctx, query, endpoint).Scan( 567 614 &detail.ID, &detail.Endpoint, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, 568 - &userCount, &responseTime, &serverInfoJSON, &scannedAt, 615 + &userCount, &responseTime, &version, &serverInfoJSON, &scannedAt, // ADD &version HERE 569 616 &city, &country, &countryCode, &asn, &asnOrg, 570 617 &isDatacenter, &isVPN, &lat, &lon, 571 - &rawDataJSON, // NEW: Scan into the variable 618 + &rawDataJSON, 572 619 ) 573 620 if err != nil { 574 621 return nil, err ··· 588 635 detail.LatestScan = &struct { 589 636 UserCount int 590 637 ResponseTime float64 638 + Version string // ADD THIS LINE 591 639 ServerInfo interface{} 592 640 ScannedAt time.Time 593 641 }{ 594 642 UserCount: int(userCount.Int32), 595 643 ResponseTime: responseTime.Float64, 644 + Version: version.String, // ADD THIS LINE 596 645 ServerInfo: serverInfo, 597 646 ScannedAt: scannedAt.Time, 598 647 } ··· 632 681 WITH latest_scans AS ( 633 682 SELECT DISTINCT ON (endpoint_id) 634 683 endpoint_id, 635 - -- COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count, -- OLD 636 - user_count, -- NEW: Use the column directly 684 + user_count, 637 685 status 638 686 FROM endpoint_scans 639 687 WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds') ··· 705 753 WITH latest_pds_scans AS ( 706 754 SELECT DISTINCT ON (endpoint_id) 707 755 endpoint_id, 708 - -- COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count -- OLD 709 - user_count -- NEW: Use the column directly 756 + user_count 710 757 FROM endpoint_scans 711 758 WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds') 712 759 ORDER BY endpoint_id, scanned_at DESC
+3
internal/storage/types.go
··· 51 51 Status int 52 52 ResponseTime float64 53 53 UserCount int64 54 + Version string // NEW: Add this field 54 55 ScanData *EndpointScanData 55 56 ScannedAt time.Time 56 57 } ··· 191 192 LatestScan *struct { 192 193 UserCount int 193 194 ResponseTime float64 195 + Version string // NEW: Add this 194 196 ScannedAt time.Time 195 197 } 196 198 ··· 206 208 LatestScan *struct { 207 209 UserCount int 208 210 ResponseTime float64 211 + Version string // ADD THIS LINE 209 212 ServerInfo interface{} // Full server description 210 213 ScannedAt time.Time 211 214 }