update

Changed files
+40 -24
internal
log
pds
storage
+2 -2
internal/log/log.go
··· 28 28 errorLog = log.New(os.Stderr, "", 0) 29 29 } 30 30 31 - // timestamp returns current time in ISO 8601 format 31 + // timestamp returns current time with milliseconds (local time, no timezone) 32 32 func timestamp() string { 33 - return time.Now().Format(time.RFC3339) 33 + return time.Now().Format("2006-01-02T15:04:05.000") 34 34 } 35 35 36 36 func Verbose(format string, v ...interface{}) {
+1 -1
internal/pds/scanner.go
··· 195 195 196 196 // Save repos in batches (only tracks changes) 197 197 if len(repoList) > 0 { 198 - batchSize := 10000 198 + batchSize := 200000 199 199 200 200 log.Verbose("Processing %d repos for %s (tracking changes only)", len(repoList), ep.Endpoint) 201 201
+37 -21
internal/storage/postgres.go
··· 741 741 742 742 func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 743 743 query := ` 744 - WITH target_endpoint AS ( 744 + WITH target_endpoint AS MATERIALIZED ( -- MATERIALIZED fence for optimization 745 745 SELECT 746 746 e.id, 747 747 e.endpoint, ··· 752 752 e.ip, 753 753 e.ipv6 754 754 FROM endpoints e 755 - WHERE e.endpoint = $1 AND e.endpoint_type = 'pds' 756 - ), 757 - aliases_agg AS ( 758 - SELECT 759 - te.server_did, 760 - array_agg(e.endpoint ORDER BY e.discovered_at) FILTER (WHERE e.endpoint != te.endpoint) as aliases, 761 - MIN(e.discovered_at) as first_discovered_at 762 - FROM target_endpoint te 763 - LEFT JOIN endpoints e ON te.server_did = e.server_did 764 - AND e.endpoint_type = 'pds' 765 - AND te.server_did IS NOT NULL 766 - GROUP BY te.server_did 755 + WHERE e.endpoint = $1 756 + AND e.endpoint_type = 'pds' 757 + LIMIT 1 -- Early termination since we expect exactly 1 row 767 758 ) 768 759 SELECT 769 760 te.id, ··· 783 774 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, 784 775 i.latitude, i.longitude, 785 776 i.raw_data, 786 - COALESCE(aa.aliases, ARRAY[]::text[]) as aliases, 787 - aa.first_discovered_at 777 + -- Inline aliases aggregation (avoid second CTE) 778 + COALESCE( 779 + ARRAY( 780 + SELECT e2.endpoint 781 + FROM endpoints e2 782 + WHERE e2.server_did = te.server_did 783 + AND e2.endpoint_type = 'pds' 784 + AND e2.endpoint != te.endpoint 785 + AND te.server_did IS NOT NULL 786 + ORDER BY e2.discovered_at 787 + ), 788 + ARRAY[]::text[] 789 + ) as aliases, 790 + -- Inline first_discovered_at (avoid aggregation) 791 + CASE 792 + WHEN te.server_did IS NOT NULL THEN ( 793 + SELECT MIN(e3.discovered_at) 794 + FROM endpoints e3 795 + WHERE e3.server_did = te.server_did 796 + AND e3.endpoint_type = 'pds' 797 + ) 798 + ELSE NULL 799 + END as first_discovered_at 788 800 FROM target_endpoint te 789 - LEFT JOIN aliases_agg aa ON te.server_did = aa.server_did 790 801 LEFT JOIN LATERAL ( 791 - SELECT scan_data, response_time, version, scanned_at, user_count 792 - FROM endpoint_scans 793 - WHERE endpoint_id = te.id 794 - ORDER BY scanned_at DESC 802 + SELECT 803 + es.scan_data, 804 + es.response_time, 805 + es.version, 806 + es.scanned_at, 807 + es.user_count 808 + FROM endpoint_scans es 809 + WHERE es.endpoint_id = te.id 810 + ORDER BY es.scanned_at DESC 795 811 LIMIT 1 796 812 ) latest ON true 797 - LEFT JOIN ip_infos i ON te.ip = i.ip 813 + LEFT JOIN ip_infos i ON te.ip = i.ip; 798 814 ` 799 815 800 816 detail := &PDSDetail{}