better pds scan & endpoint validation

Changed files
+159 -97
cmd
import-labels
internal
+2 -2
cmd/import-labels/main.go
··· 143 if lineCount%100000 == 0 { 144 elapsed := time.Since(startTime).Seconds() 145 rate := float64(lineCount) / elapsed 146 - fmt.Printf(" ... processed %,d lines (%.0f lines/sec)\n", lineCount, rate) 147 } 148 } 149 ··· 164 fmt.Println("Import Summary") 165 fmt.Println("========================================") 166 fmt.Printf("✓ Import completed in %v\n", totalTime) 167 - fmt.Printf("Total lines processed: %,d\n", lineCount) 168 }
··· 143 if lineCount%100000 == 0 { 144 elapsed := time.Since(startTime).Seconds() 145 rate := float64(lineCount) / elapsed 146 + fmt.Printf(" ... processed %d lines (%.0f lines/sec)\n", lineCount, rate) 147 } 148 } 149 ··· 164 fmt.Println("Import Summary") 165 fmt.Println("========================================") 166 fmt.Printf("✓ Import completed in %v\n", totalTime) 167 + fmt.Printf("Total lines processed: %d\n", lineCount) 168 }
-1
go.mod
··· 11 require github.com/klauspost/compress v1.18.1 12 13 require ( 14 - github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d 15 github.com/gorilla/handlers v1.5.2 16 github.com/jackc/pgx/v5 v5.7.6 17 tangled.org/atscan.net/plcbundle v0.3.6
··· 11 require github.com/klauspost/compress v1.18.1 12 13 require ( 14 github.com/gorilla/handlers v1.5.2 15 github.com/jackc/pgx/v5 v5.7.6 16 tangled.org/atscan.net/plcbundle v0.3.6
-2
go.sum
··· 1 - github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= 2 - github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= 3 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= 4 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 5 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
··· 1 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= 2 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+1
internal/api/handlers.go
··· 248 "endpoint": pds.Endpoint, 249 "discovered_at": pds.DiscoveredAt, 250 "status": statusToString(pds.Status), 251 } 252 253 // Add server_did if available
··· 248 "endpoint": pds.Endpoint, 249 "discovered_at": pds.DiscoveredAt, 250 "status": statusToString(pds.Status), 251 + "valid": pds.Valid, // NEW 252 } 253 254 // Add server_did if available
+44 -45
internal/pds/client.go
··· 84 } 85 86 // DescribeServer fetches com.atproto.server.describeServer 87 - func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, error) { 88 url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint) 89 90 - //fmt.Println(url) 91 92 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 93 if err != nil { 94 - return nil, err 95 } 96 97 - resp, err := c.httpClient.Do(req) 98 if err != nil { 99 - return nil, err 100 } 101 defer resp.Body.Close() 102 103 if resp.StatusCode != http.StatusOK { 104 - return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 105 } 106 107 var desc ServerDescription 108 if err := json.NewDecoder(resp.Body).Decode(&desc); err != nil { 109 - return nil, err 110 } 111 112 - return &desc, nil 113 } 114 115 // CheckHealth performs a basic health check, ensuring the endpoint returns JSON with a "version" 116 - // Returns: available, responseTime, version, usedIP, error 117 - func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, string, error) { 118 startTime := time.Now() 119 120 url := fmt.Sprintf("%s/xrpc/_health", endpoint) 121 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 122 if err != nil { 123 - return false, 0, "", "", err 124 - } 125 - 126 - // Create a custom dialer to track which IP was actually used 127 - var usedIP string 128 - transport := &http.Transport{ 129 - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 130 - conn, err := (&net.Dialer{ 131 - Timeout: 30 * time.Second, 132 - KeepAlive: 30 * time.Second, 133 - }).DialContext(ctx, network, addr) 134 - 135 - if err == nil && conn != nil { 136 - if remoteAddr := conn.RemoteAddr(); remoteAddr != nil { 137 - // Extract IP from "ip:port" format 138 - if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { 139 - usedIP = tcpAddr.IP.String() 140 - } 141 - } 142 - } 143 - 144 - return conn, err 145 - }, 146 } 147 148 - // Create a client with our custom transport 149 - client := &http.Client{ 150 - Timeout: c.httpClient.Timeout, 151 - Transport: transport, 152 - } 153 - 154 - resp, err := client.Do(req) 155 duration := time.Since(startTime) 156 157 if err != nil { 158 - return false, duration, "", usedIP, err 159 } 160 defer resp.Body.Close() 161 162 if resp.StatusCode != http.StatusOK { 163 - return false, duration, "", usedIP, fmt.Errorf("health check returned status %d", resp.StatusCode) 164 } 165 166 // Decode the JSON response and check for "version" ··· 169 } 170 171 if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil { 172 - return false, duration, "", usedIP, fmt.Errorf("failed to decode health JSON: %w", err) 173 } 174 175 if healthResponse.Version == "" { 176 - return false, duration, "", usedIP, fmt.Errorf("health JSON response missing 'version' field") 177 } 178 179 // All checks passed 180 - return true, duration, healthResponse.Version, usedIP, nil 181 }
··· 84 } 85 86 // DescribeServer fetches com.atproto.server.describeServer 87 + // Returns: description, responseTime, usedIP, error 88 + func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, time.Duration, string, error) { 89 + startTime := time.Now() 90 url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint) 91 92 + // Track which IP was used 93 + var usedIP string 94 + transport := &http.Transport{ 95 + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 96 + conn, err := (&net.Dialer{ 97 + Timeout: 30 * time.Second, 98 + KeepAlive: 30 * time.Second, 99 + }).DialContext(ctx, network, addr) 100 + 101 + if err == nil && conn != nil { 102 + if remoteAddr := conn.RemoteAddr(); remoteAddr != nil { 103 + if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { 104 + usedIP = tcpAddr.IP.String() 105 + } 106 + } 107 + } 108 + return conn, err 109 + }, 110 + } 111 + 112 + client := &http.Client{ 113 + Timeout: c.httpClient.Timeout, 114 + Transport: transport, 115 + } 116 117 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 118 if err != nil { 119 + return nil, 0, "", err 120 } 121 122 + resp, err := client.Do(req) 123 + responseTime := time.Since(startTime) 124 + 125 if err != nil { 126 + return nil, responseTime, usedIP, err 127 } 128 defer resp.Body.Close() 129 130 if resp.StatusCode != http.StatusOK { 131 + return nil, responseTime, usedIP, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 132 } 133 134 var desc ServerDescription 135 if err := json.NewDecoder(resp.Body).Decode(&desc); err != nil { 136 + return nil, responseTime, usedIP, err 137 } 138 139 + return &desc, responseTime, usedIP, nil 140 } 141 142 // CheckHealth performs a basic health check, ensuring the endpoint returns JSON with a "version" 143 + // Returns: available, responseTime, version, error 144 + func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, error) { 145 startTime := time.Now() 146 147 url := fmt.Sprintf("%s/xrpc/_health", endpoint) 148 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 149 if err != nil { 150 + return false, 0, "", err 151 } 152 153 + resp, err := c.httpClient.Do(req) 154 duration := time.Since(startTime) 155 156 if err != nil { 157 + return false, duration, "", err 158 } 159 defer resp.Body.Close() 160 161 if resp.StatusCode != http.StatusOK { 162 + return false, duration, "", fmt.Errorf("health check returned status %d", resp.StatusCode) 163 } 164 165 // Decode the JSON response and check for "version" ··· 168 } 169 170 if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil { 171 + return false, duration, "", fmt.Errorf("failed to decode health JSON: %w", err) 172 } 173 174 if healthResponse.Version == "" { 175 + return false, duration, "", fmt.Errorf("health JSON response missing 'version' field") 176 } 177 178 // All checks passed 179 + return true, duration, healthResponse.Version, nil 180 }
+30 -26
internal/pds/scanner.go
··· 8 "sync/atomic" 9 "time" 10 11 - "github.com/acarl005/stripansi" 12 "github.com/atscan/atscand/internal/config" 13 "github.com/atscan/atscand/internal/ipinfo" 14 "github.com/atscan/atscand/internal/log" ··· 40 servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{ 41 Type: "pds", 42 OnlyStale: true, 43 RecheckInterval: s.config.RecheckInterval, 44 }) 45 if err != nil { ··· 127 // STEP 1: Resolve IPs (both IPv4 and IPv6) 128 ips, err := ipinfo.ExtractIPsFromEndpoint(ep.Endpoint) 129 if err != nil { 130 - // Mark as offline due to DNS failure 131 s.saveScanResult(ctx, ep.ID, &ScanResult{ 132 Status: storage.EndpointStatusOffline, 133 ErrorMessage: fmt.Sprintf("DNS resolution failed: %v", err), ··· 146 go s.updateIPInfoIfNeeded(ctx, ips.IPv6) 147 } 148 149 - // STEP 2: Health check (now returns which IP was used) 150 - available, responseTime, version, usedIP, err := s.client.CheckHealth(ctx, ep.Endpoint) 151 - if err != nil || !available { 152 - errMsg := "health check failed" 153 - if err != nil { 154 - errMsg = err.Error() 155 - } 156 s.saveScanResult(ctx, ep.ID, &ScanResult{ 157 Status: storage.EndpointStatusOffline, 158 - ResponseTime: responseTime, 159 - ErrorMessage: errMsg, 160 - UsedIP: usedIP, // Save even if failed 161 }) 162 return 163 } 164 165 - // STEP 3: Fetch PDS-specific data 166 - desc, err := s.client.DescribeServer(ctx, ep.Endpoint) 167 - if err != nil { 168 - log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err) 169 - } else if desc != nil && desc.DID != "" { 170 s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID) 171 } 172 173 - // Fetch repos with full info 174 repoList, err := s.client.ListRepos(ctx, ep.Endpoint) 175 if err != nil { 176 log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err) 177 repoList = []Repo{} 178 } 179 180 - // Convert to DIDs for backward compatibility 181 dids := make([]string, len(repoList)) 182 for i, repo := range repoList { 183 dids[i] = repo.DID 184 } 185 186 - // STEP 4: SAVE scan result 187 s.saveScanResult(ctx, ep.ID, &ScanResult{ 188 Status: storage.EndpointStatusOnline, 189 - ResponseTime: responseTime, 190 Description: desc, 191 DIDs: dids, 192 Version: version, 193 - UsedIP: usedIP, // NEW: Save which IP was used 194 }) 195 196 - // Save repos in batches (only tracks changes) 197 if len(repoList) > 0 { 198 batchSize := 100_000 199 ··· 235 236 log.Verbose("✓ Processed %d repos for %s", len(repoList), ep.Endpoint) 237 } 238 - 239 - // IP info fetch already started at the beginning (step 1.5) 240 - // It will complete in the background 241 } 242 243 func (s *Scanner) saveScanResult(ctx context.Context, endpointID int64, result *ScanResult) {
··· 8 "sync/atomic" 9 "time" 10 11 "github.com/atscan/atscand/internal/config" 12 "github.com/atscan/atscand/internal/ipinfo" 13 "github.com/atscan/atscand/internal/log" ··· 39 servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{ 40 Type: "pds", 41 OnlyStale: true, 42 + OnlyValid: true, 43 RecheckInterval: s.config.RecheckInterval, 44 }) 45 if err != nil { ··· 127 // STEP 1: Resolve IPs (both IPv4 and IPv6) 128 ips, err := ipinfo.ExtractIPsFromEndpoint(ep.Endpoint) 129 if err != nil { 130 s.saveScanResult(ctx, ep.ID, &ScanResult{ 131 Status: storage.EndpointStatusOffline, 132 ErrorMessage: fmt.Sprintf("DNS resolution failed: %v", err), ··· 145 go s.updateIPInfoIfNeeded(ctx, ips.IPv6) 146 } 147 148 + // STEP 2: Call describeServer (primary health check + metadata) 149 + desc, descResponseTime, usedIP, err := s.client.DescribeServer(ctx, ep.Endpoint) 150 + if err != nil { 151 s.saveScanResult(ctx, ep.ID, &ScanResult{ 152 Status: storage.EndpointStatusOffline, 153 + ResponseTime: descResponseTime, 154 + ErrorMessage: fmt.Sprintf("describeServer failed: %v", err), 155 + UsedIP: usedIP, 156 }) 157 return 158 } 159 160 + // Update server DID immediately 161 + if desc.DID != "" { 162 s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID) 163 } 164 165 + // STEP 3: Call _health to get version 166 + available, healthResponseTime, version, err := s.client.CheckHealth(ctx, ep.Endpoint) 167 + if err != nil || !available { 168 + log.Verbose("Warning: _health check failed for %s: %v", ep.Endpoint, err) 169 + // Server is online (describeServer worked) but _health failed 170 + // Continue with empty version 171 + version = "" 172 + } 173 + 174 + // Calculate average response time from both calls 175 + avgResponseTime := descResponseTime 176 + if available { 177 + avgResponseTime = (descResponseTime + healthResponseTime) / 2 178 + } 179 + 180 + // STEP 4: Fetch repos 181 repoList, err := s.client.ListRepos(ctx, ep.Endpoint) 182 if err != nil { 183 log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err) 184 repoList = []Repo{} 185 } 186 187 + // Convert to DIDs 188 dids := make([]string, len(repoList)) 189 for i, repo := range repoList { 190 dids[i] = repo.DID 191 } 192 193 + // STEP 5: SAVE scan result 194 s.saveScanResult(ctx, ep.ID, &ScanResult{ 195 Status: storage.EndpointStatusOnline, 196 + ResponseTime: avgResponseTime, 197 Description: desc, 198 DIDs: dids, 199 Version: version, 200 + UsedIP: usedIP, // Only from describeServer 201 }) 202 203 + // STEP 6: Save repos in batches (only tracks changes) 204 if len(repoList) > 0 { 205 batchSize := 100_000 206 ··· 242 243 log.Verbose("✓ Processed %d repos for %s", len(repoList), ep.Endpoint) 244 } 245 } 246 247 func (s *Scanner) saveScanResult(ctx context.Context, endpointID int64, result *ScanResult) {
+2
internal/plc/scanner.go
··· 190 } 191 192 func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error { 193 return s.db.UpsertEndpoint(ctx, &storage.Endpoint{ 194 EndpointType: epType, 195 Endpoint: endpoint, 196 DiscoveredAt: discoveredAt, 197 LastChecked: time.Time{}, 198 Status: storage.EndpointStatusUnknown, 199 }) 200 } 201
··· 190 } 191 192 func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error { 193 + valid := validateEndpoint(endpoint) 194 return s.db.UpsertEndpoint(ctx, &storage.Endpoint{ 195 EndpointType: epType, 196 Endpoint: endpoint, 197 DiscoveredAt: discoveredAt, 198 LastChecked: time.Time{}, 199 Status: storage.EndpointStatusUnknown, 200 + Valid: valid, 201 }) 202 } 203
+49
internal/plc/types.go
··· 1 package plc 2 3 import ( 4 plclib "tangled.org/atscan.net/plcbundle/plc" 5 ) 6 ··· 38 Confidence float64 `json:"confidence"` 39 Detectors []string `json:"detectors"` 40 }
··· 1 package plc 2 3 import ( 4 + "net/url" 5 + "strings" 6 + 7 plclib "tangled.org/atscan.net/plcbundle/plc" 8 ) 9 ··· 41 Confidence float64 `json:"confidence"` 42 Detectors []string `json:"detectors"` 43 } 44 + 45 + // validateEndpoint checks if endpoint is in correct format: https://<domain> 46 + func validateEndpoint(endpoint string) bool { 47 + // Must not be empty 48 + if endpoint == "" { 49 + return false 50 + } 51 + 52 + // Must not have trailing slash 53 + if strings.HasSuffix(endpoint, "/") { 54 + return false 55 + } 56 + 57 + // Parse URL 58 + u, err := url.Parse(endpoint) 59 + if err != nil { 60 + return false 61 + } 62 + 63 + // Must use https scheme 64 + if u.Scheme != "https" { 65 + return false 66 + } 67 + 68 + // Must have a host 69 + if u.Host == "" { 70 + return false 71 + } 72 + 73 + // Must not have path (except empty) 74 + if u.Path != "" && u.Path != "/" { 75 + return false 76 + } 77 + 78 + // Must not have query parameters 79 + if u.RawQuery != "" { 80 + return false 81 + } 82 + 83 + // Must not have fragment 84 + if u.Fragment != "" { 85 + return false 86 + } 87 + 88 + return true 89 + }
+24 -17
internal/storage/postgres.go
··· 84 ip TEXT, 85 ipv6 TEXT, 86 ip_resolved_at TIMESTAMP, 87 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 88 UNIQUE(endpoint_type, endpoint) 89 ); ··· 95 CREATE INDEX IF NOT EXISTS idx_endpoints_ipv6 ON endpoints(ipv6); 96 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did); 97 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did_type_discovered ON endpoints(server_did, endpoint_type, discovered_at); 98 99 -- IP infos table (IP as PRIMARY KEY) 100 CREATE TABLE IF NOT EXISTS ip_infos ( ··· 208 209 func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 210 query := ` 211 - INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at) 212 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 213 ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 214 last_checked = EXCLUDED.last_checked, 215 status = EXCLUDED.status, ··· 225 WHEN (EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '') OR (EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '') THEN EXCLUDED.ip_resolved_at 226 ELSE endpoints.ip_resolved_at 227 END, 228 updated_at = CURRENT_TIMESTAMP 229 RETURNING id 230 ` 231 err := p.db.QueryRowContext(ctx, query, 232 endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 233 - endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt).Scan(&endpoint.ID) 234 return err 235 } 236 ··· 251 func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 252 query := ` 253 SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, 254 - ip, ipv6, ip_resolved_at, updated_at 255 FROM endpoints 256 WHERE endpoint = $1 AND endpoint_type = $2 257 ` ··· 262 263 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 264 &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 265 - &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.UpdatedAt, 266 ) 267 if err != nil { 268 return nil, err ··· 288 query := ` 289 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 290 id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status, 291 - ip, ipv6, ip_resolved_at, updated_at 292 FROM endpoints 293 WHERE 1=1 294 ` ··· 300 query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx) 301 args = append(args, filter.Type) 302 argIdx++ 303 } 304 if filter.Status != "" { 305 statusInt := EndpointStatusUnknown ··· 566 last_checked, 567 status, 568 ip, 569 - ipv6 570 FROM endpoints 571 WHERE endpoint_type = 'pds' 572 ORDER BY COALESCE(server_did, id::text), discovered_at ASC 573 ) 574 SELECT 575 - e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, 576 latest.user_count, latest.response_time, latest.version, latest.scanned_at, 577 i.city, i.country, i.country_code, i.asn, i.asn_org, 578 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, ··· 643 var scannedAt sql.NullTime 644 645 err := rows.Scan( 646 - &item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, 647 &userCount, &responseTime, &version, &scannedAt, 648 &city, &country, &countryCode, &asn, &asnOrg, 649 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, ··· 705 706 func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 707 query := ` 708 - WITH target_endpoint AS MATERIALIZED ( -- MATERIALIZED fence for optimization 709 SELECT 710 e.id, 711 e.endpoint, ··· 714 e.last_checked, 715 e.status, 716 e.ip, 717 - e.ipv6 718 FROM endpoints e 719 WHERE e.endpoint = $1 720 AND e.endpoint_type = 'pds' 721 - LIMIT 1 -- Early termination since we expect exactly 1 row 722 ) 723 SELECT 724 te.id, ··· 729 te.status, 730 te.ip, 731 te.ipv6, 732 latest.user_count, 733 latest.response_time, 734 latest.version, ··· 738 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, 739 i.latitude, i.longitude, 740 i.raw_data, 741 - -- Inline aliases aggregation (avoid second CTE) 742 COALESCE( 743 ARRAY( 744 SELECT e2.endpoint ··· 751 ), 752 ARRAY[]::text[] 753 ) as aliases, 754 - -- Inline first_discovered_at (avoid aggregation) 755 CASE 756 WHEN te.server_did IS NOT NULL THEN ( 757 SELECT MIN(e3.discovered_at) ··· 792 var firstDiscoveredAt sql.NullTime 793 794 err := p.db.QueryRowContext(ctx, query, endpoint).Scan( 795 - &detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, 796 &userCount, &responseTime, &version, &serverInfoJSON, &scannedAt, 797 &city, &country, &countryCode, &asn, &asnOrg, 798 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, ··· 819 // Set aliases and is_primary 820 detail.Aliases = aliases 821 if serverDID.Valid && serverDID.String != "" && firstDiscoveredAt.Valid { 822 - // Has server_did - check if this is the first discovered 823 detail.IsPrimary = detail.DiscoveredAt.Equal(firstDiscoveredAt.Time) || 824 detail.DiscoveredAt.Before(firstDiscoveredAt.Time) 825 } else { 826 - // No server_did means unique server 827 detail.IsPrimary = true 828 } 829
··· 84 ip TEXT, 85 ipv6 TEXT, 86 ip_resolved_at TIMESTAMP, 87 + valid BOOLEAN DEFAULT true, 88 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 89 UNIQUE(endpoint_type, endpoint) 90 ); ··· 96 CREATE INDEX IF NOT EXISTS idx_endpoints_ipv6 ON endpoints(ipv6); 97 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did); 98 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did_type_discovered ON endpoints(server_did, endpoint_type, discovered_at); 99 + CREATE INDEX IF NOT EXISTS idx_endpoints_valid ON endpoints(valid); 100 101 -- IP infos table (IP as PRIMARY KEY) 102 CREATE TABLE IF NOT EXISTS ip_infos ( ··· 210 211 func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 212 query := ` 213 + INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at, valid) 214 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 215 ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 216 last_checked = EXCLUDED.last_checked, 217 status = EXCLUDED.status, ··· 227 WHEN (EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '') OR (EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '') THEN EXCLUDED.ip_resolved_at 228 ELSE endpoints.ip_resolved_at 229 END, 230 + valid = EXCLUDED.valid, 231 updated_at = CURRENT_TIMESTAMP 232 RETURNING id 233 ` 234 err := p.db.QueryRowContext(ctx, query, 235 endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 236 + endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt, endpoint.Valid).Scan(&endpoint.ID) 237 return err 238 } 239 ··· 254 func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 255 query := ` 256 SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, 257 + ip, ipv6, ip_resolved_at, valid, updated_at 258 FROM endpoints 259 WHERE endpoint = $1 AND endpoint_type = $2 260 ` ··· 265 266 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 267 &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 268 + &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.Valid, &ep.UpdatedAt, 269 ) 270 if err != nil { 271 return nil, err ··· 291 query := ` 292 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 293 id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status, 294 + ip, ipv6, ip_resolved_at, valid, updated_at 295 FROM endpoints 296 WHERE 1=1 297 ` ··· 303 query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx) 304 args = append(args, filter.Type) 305 argIdx++ 306 + } 307 + 308 + // NEW: Filter by valid flag 309 + if filter.OnlyValid { 310 + query += fmt.Sprintf(" AND valid = true", argIdx) 311 } 312 if filter.Status != "" { 313 statusInt := EndpointStatusUnknown ··· 574 last_checked, 575 status, 576 ip, 577 + ipv6, 578 + valid 579 FROM endpoints 580 WHERE endpoint_type = 'pds' 581 ORDER BY COALESCE(server_did, id::text), discovered_at ASC 582 ) 583 SELECT 584 + e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, e.valid, 585 latest.user_count, latest.response_time, latest.version, latest.scanned_at, 586 i.city, i.country, i.country_code, i.asn, i.asn_org, 587 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, ··· 652 var scannedAt sql.NullTime 653 654 err := rows.Scan( 655 + &item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, &item.Valid, 656 &userCount, &responseTime, &version, &scannedAt, 657 &city, &country, &countryCode, &asn, &asnOrg, 658 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, ··· 714 715 func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 716 query := ` 717 + WITH target_endpoint AS MATERIALIZED ( 718 SELECT 719 e.id, 720 e.endpoint, ··· 723 e.last_checked, 724 e.status, 725 e.ip, 726 + e.ipv6, 727 + e.valid 728 FROM endpoints e 729 WHERE e.endpoint = $1 730 AND e.endpoint_type = 'pds' 731 + LIMIT 1 732 ) 733 SELECT 734 te.id, ··· 739 te.status, 740 te.ip, 741 te.ipv6, 742 + te.valid, 743 latest.user_count, 744 latest.response_time, 745 latest.version, ··· 749 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, 750 i.latitude, i.longitude, 751 i.raw_data, 752 COALESCE( 753 ARRAY( 754 SELECT e2.endpoint ··· 761 ), 762 ARRAY[]::text[] 763 ) as aliases, 764 CASE 765 WHEN te.server_did IS NOT NULL THEN ( 766 SELECT MIN(e3.discovered_at) ··· 801 var firstDiscoveredAt sql.NullTime 802 803 err := p.db.QueryRowContext(ctx, query, endpoint).Scan( 804 + &detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, &detail.Valid, 805 &userCount, &responseTime, &version, &serverInfoJSON, &scannedAt, 806 &city, &country, &countryCode, &asn, &asnOrg, 807 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, ··· 828 // Set aliases and is_primary 829 detail.Aliases = aliases 830 if serverDID.Valid && serverDID.String != "" && firstDiscoveredAt.Valid { 831 detail.IsPrimary = detail.DiscoveredAt.Equal(firstDiscoveredAt.Time) || 832 detail.DiscoveredAt.Before(firstDiscoveredAt.Time) 833 } else { 834 detail.IsPrimary = true 835 } 836
+7 -4
internal/storage/types.go
··· 26 LastChecked time.Time 27 Status int 28 IP string 29 - IPv6 string // NEW 30 IPResolvedAt time.Time 31 UpdatedAt time.Time 32 } 33 ··· 76 77 // EndpointFilter for querying endpoints 78 type EndpointFilter struct { 79 - Type string // "pds", "labeler", etc. 80 Status string 81 MinUserCount int64 82 OnlyStale bool 83 RecheckInterval time.Duration 84 - Random bool // NEW: Return results in random order 85 Limit int 86 Offset int 87 } ··· 213 LastChecked time.Time 214 Status int 215 IP string 216 - IPv6 string // NEW 217 218 // From latest endpoint_scans (via JOIN) 219 LatestScan *struct {
··· 26 LastChecked time.Time 27 Status int 28 IP string 29 + IPv6 string 30 IPResolvedAt time.Time 31 + Valid bool 32 UpdatedAt time.Time 33 } 34 ··· 77 78 // EndpointFilter for querying endpoints 79 type EndpointFilter struct { 80 + Type string 81 Status string 82 MinUserCount int64 83 OnlyStale bool 84 + OnlyValid bool 85 RecheckInterval time.Duration 86 + Random bool 87 Limit int 88 Offset int 89 } ··· 215 LastChecked time.Time 216 Status int 217 IP string 218 + IPv6 string 219 + Valid bool // NEW 220 221 // From latest endpoint_scans (via JOIN) 222 LatestScan *struct {