update

Changed files
+1256 -518
internal
utils
+184 -42
internal/api/handlers.go
··· 102 102 "discovered_at": ep.DiscoveredAt, 103 103 "last_checked": ep.LastChecked, 104 104 "status": statusToString(ep.Status), 105 - "user_count": ep.UserCount, 105 + // REMOVED: "user_count": ep.UserCount, // No longer exists 106 106 } 107 107 108 108 // Add IP if available ··· 110 110 response["ip"] = ep.IP 111 111 } 112 112 113 - // Extract specific fields from ip_info 114 - if ep.IPInfo != nil { 115 - // Extract location info 116 - if location, ok := ep.IPInfo["location"].(map[string]interface{}); ok { 117 - if city, ok := location["city"].(string); ok { 118 - response["city"] = city 119 - } 120 - if countryCode, ok := location["country_code"].(string); ok { 121 - response["country_code"] = countryCode 122 - } 123 - if country, ok := location["country"].(string); ok { 124 - response["country"] = country 125 - } 126 - } 127 - 128 - // Extract ASN info 129 - if asn, ok := ep.IPInfo["asn"].(map[string]interface{}); ok { 130 - if asnNumber, ok := asn["asn"].(float64); ok { 131 - response["asn"] = int(asnNumber) 132 - } 133 - } 134 - 135 - // Optionally include full ip_info for detailed view 136 - // response["ip_info"] = ep.IPInfo 137 - } 113 + // REMOVED: IP info extraction - no longer in Endpoint struct 114 + // IPInfo is now in separate table, joined only in PDS handlers 138 115 139 116 return response 140 117 } ··· 159 136 Type: r.URL.Query().Get("type"), 160 137 Status: r.URL.Query().Get("status"), 161 138 MinUserCount: getQueryInt64(r, "min_user_count", 0), 162 - Limit: getQueryInt(r, "limit", 0), 139 + Limit: getQueryInt(r, "limit", 50), 163 140 Offset: getQueryInt(r, "offset", 0), 164 141 } 165 142 ··· 177 154 resp.json(response) 178 155 } 179 156 180 - func (s *Server) handleGetEndpoint(w http.ResponseWriter, r *http.Request) { 157 + func (s *Server) handleGetEndpointStats(w http.ResponseWriter, r *http.Request) { 158 + resp := newResponse(w) 159 + stats, err := s.db.GetEndpointStats(r.Context()) 160 + if err != nil { 161 + resp.error(err.Error(), http.StatusInternalServerError) 162 + return 163 + } 164 + resp.json(stats) 165 + } 166 + 167 + // ===== PDS HANDLERS ===== 168 + 169 + func (s *Server) handleGetPDSList(w http.ResponseWriter, r *http.Request) { 170 + resp := newResponse(w) 171 + 172 + filter := &storage.EndpointFilter{ 173 + Type: "pds", 174 + Status: r.URL.Query().Get("status"), 175 + MinUserCount: getQueryInt64(r, "min_user_count", 0), 176 + Limit: getQueryInt(r, "limit", 50), 177 + Offset: getQueryInt(r, "offset", 0), 178 + } 179 + 180 + pdsServers, err := s.db.GetPDSList(r.Context(), filter) 181 + if err != nil { 182 + resp.error(err.Error(), http.StatusInternalServerError) 183 + return 184 + } 185 + 186 + response := make([]map[string]interface{}, len(pdsServers)) 187 + for i, pds := range pdsServers { 188 + response[i] = formatPDSListItem(pds) 189 + } 190 + 191 + resp.json(response) 192 + } 193 + 194 + func (s *Server) handleGetPDSDetail(w http.ResponseWriter, r *http.Request) { 181 195 resp := newResponse(w) 182 196 vars := mux.Vars(r) 183 - endpoint := vars["endpoint"] 184 - endpointType := r.URL.Query().Get("type") 185 - if endpointType == "" { 186 - endpointType = "pds" 187 - } 197 + endpoint := "https://" + normalizeEndpoint(vars["endpoint"]) 188 198 189 - endpoint = "https://" + normalizeEndpoint(endpoint) 190 - ep, err := s.db.GetEndpoint(r.Context(), endpoint, endpointType) 199 + // FIX: Use r.Context() instead of ctx 200 + pds, err := s.db.GetPDSDetail(r.Context(), endpoint) 191 201 if err != nil { 192 - resp.error("Endpoint not found", http.StatusNotFound) 202 + resp.error("PDS not found", http.StatusNotFound) 193 203 return 194 204 } 195 205 196 - scans, _ := s.db.GetEndpointScans(r.Context(), ep.ID, 10) 206 + // Get recent scans 207 + scans, _ := s.db.GetEndpointScans(r.Context(), pds.ID, 10) 197 208 198 - result := formatEndpointResponse(ep) 199 - result["recent_scans"] = scans 209 + result := formatPDSDetail(pds) 210 + result["recent_scans"] = formatScans(scans) 200 211 201 - result["ipinfo"] = ep.IPInfo 202 212 resp.json(result) 203 213 } 204 214 205 - func (s *Server) handleGetEndpointStats(w http.ResponseWriter, r *http.Request) { 215 + func (s *Server) handleGetPDSStats(w http.ResponseWriter, r *http.Request) { 206 216 resp := newResponse(w) 207 - stats, err := s.db.GetEndpointStats(r.Context()) 217 + ctx := r.Context() 218 + 219 + // Get PDS-specific stats 220 + stats, err := s.db.GetPDSStats(ctx) 208 221 if err != nil { 209 222 resp.error(err.Error(), http.StatusInternalServerError) 210 223 return 211 224 } 225 + 212 226 resp.json(stats) 227 + } 228 + 229 + func formatPDSListItem(pds *storage.PDSListItem) map[string]interface{} { 230 + response := map[string]interface{}{ 231 + "id": pds.ID, 232 + "endpoint": pds.Endpoint, 233 + "discovered_at": pds.DiscoveredAt, 234 + "status": statusToString(pds.Status), 235 + } 236 + 237 + // Add last_checked if available 238 + if !pds.LastChecked.IsZero() { 239 + response["last_checked"] = pds.LastChecked 240 + } 241 + 242 + // Add data from latest scan (if available) 243 + if pds.LatestScan != nil { 244 + response["user_count"] = pds.LatestScan.UserCount 245 + response["response_time"] = pds.LatestScan.ResponseTime 246 + if !pds.LatestScan.ScannedAt.IsZero() { 247 + response["last_scan"] = pds.LatestScan.ScannedAt 248 + } 249 + } 250 + 251 + // Add IP if available 252 + if pds.IP != "" { 253 + response["ip"] = pds.IP 254 + } 255 + 256 + // Add IP info (from ip_infos table via JOIN) 257 + if pds.IPInfo != nil { 258 + if pds.IPInfo.City != "" { 259 + response["city"] = pds.IPInfo.City 260 + } 261 + if pds.IPInfo.Country != "" { 262 + response["country"] = pds.IPInfo.Country 263 + } 264 + if pds.IPInfo.CountryCode != "" { 265 + response["country_code"] = pds.IPInfo.CountryCode 266 + } 267 + if pds.IPInfo.ASN > 0 { 268 + response["asn"] = pds.IPInfo.ASN 269 + } 270 + } 271 + 272 + return response 273 + } 274 + 275 + func formatPDSDetail(pds *storage.PDSDetail) map[string]interface{} { 276 + // Start with list item formatting 277 + response := formatPDSListItem(&pds.PDSListItem) 278 + 279 + // Add server_info from latest scan 280 + if pds.LatestScan != nil && pds.LatestScan.ServerInfo != nil { 281 + response["server_info"] = pds.LatestScan.ServerInfo 282 + } 283 + 284 + // Add full IP info 285 + if pds.IPInfo != nil { 286 + ipInfoMap := make(map[string]interface{}) 287 + 288 + if pds.IP != "" { 289 + ipInfoMap["ip"] = pds.IP 290 + } 291 + if pds.IPInfo.City != "" { 292 + ipInfoMap["city"] = pds.IPInfo.City 293 + } 294 + if pds.IPInfo.Country != "" { 295 + ipInfoMap["country"] = pds.IPInfo.Country 296 + } 297 + if pds.IPInfo.CountryCode != "" { 298 + ipInfoMap["country_code"] = pds.IPInfo.CountryCode 299 + } 300 + if pds.IPInfo.ASN > 0 { 301 + ipInfoMap["asn"] = pds.IPInfo.ASN 302 + } 303 + if pds.IPInfo.ASNOrg != "" { 304 + ipInfoMap["asn_org"] = pds.IPInfo.ASNOrg 305 + } 306 + ipInfoMap["is_datacenter"] = pds.IPInfo.IsDatacenter 307 + ipInfoMap["is_vpn"] = pds.IPInfo.IsVPN 308 + 309 + if pds.IPInfo.Latitude != 0 || pds.IPInfo.Longitude != 0 { 310 + ipInfoMap["latitude"] = pds.IPInfo.Latitude 311 + ipInfoMap["longitude"] = pds.IPInfo.Longitude 312 + } 313 + 314 + if len(ipInfoMap) > 0 { 315 + response["ip_info"] = ipInfoMap 316 + } 317 + } 318 + 319 + return response 320 + } 321 + 322 + func formatScans(scans []*storage.EndpointScan) []map[string]interface{} { 323 + result := make([]map[string]interface{}, len(scans)) 324 + for i, scan := range scans { 325 + scanMap := map[string]interface{}{ 326 + "id": scan.ID, 327 + "status": statusToString(scan.Status), 328 + "scanned_at": scan.ScannedAt, 329 + } 330 + 331 + if scan.ResponseTime > 0 { 332 + scanMap["response_time"] = scan.ResponseTime 333 + } 334 + 335 + if scan.ScanData != nil { 336 + // Metadata is already map[string]interface{}, no type assertion needed 337 + if scan.ScanData.Metadata != nil { 338 + // Extract user_count from metadata 339 + if userCount, ok := scan.ScanData.Metadata["user_count"].(int); ok { 340 + scanMap["user_count"] = userCount 341 + } else if userCount, ok := scan.ScanData.Metadata["user_count"].(float64); ok { 342 + scanMap["user_count"] = int(userCount) 343 + } 344 + } 345 + 346 + // Include DID count if available 347 + if scan.ScanData.DIDCount > 0 { 348 + scanMap["did_count"] = scan.ScanData.DIDCount 349 + } 350 + } 351 + 352 + result[i] = scanMap 353 + } 354 + return result 213 355 } 214 356 215 357 // ===== DID HANDLERS =====
+6 -2
internal/api/server.go
··· 56 56 func (s *Server) setupRoutes() { 57 57 api := s.router.PathPrefix("/api/v1").Subrouter() 58 58 59 - // Endpoint routes (replaces PDS routes) 59 + // Generic endpoints (keep as-is) 60 60 api.HandleFunc("/endpoints", s.handleGetEndpoints).Methods("GET") 61 61 api.HandleFunc("/endpoints/stats", s.handleGetEndpointStats).Methods("GET") 62 - api.HandleFunc("/endpoints/{endpoint}", s.handleGetEndpoint).Methods("GET") 62 + 63 + // NEW: PDS-specific endpoints (virtual, created via JOINs) 64 + api.HandleFunc("/pds", s.handleGetPDSList).Methods("GET") 65 + api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET") 66 + api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET") 63 67 64 68 // PLC Bundle routes 65 69 api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
+104 -101
internal/pds/scanner.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "sync" 6 7 "time" 7 8 ··· 44 45 45 46 // Worker pool 46 47 jobs := make(chan *storage.Endpoint, len(servers)) 47 - results := make(chan *PDSStatus, len(servers)) 48 + var wg sync.WaitGroup 48 49 49 - var wg sync.WaitGroup 50 50 for i := 0; i < s.config.Workers; i++ { 51 51 wg.Add(1) 52 52 go func() { 53 53 defer wg.Done() 54 - s.worker(ctx, jobs, results) 54 + s.worker(ctx, jobs) 55 55 }() 56 56 } 57 57 58 - go func() { 59 - for _, server := range servers { 60 - jobs <- server 61 - } 62 - close(jobs) 63 - }() 64 - 65 - go func() { 66 - wg.Wait() 67 - close(results) 68 - }() 69 - 70 - // Process results 71 - successCount := 0 72 - failureCount := 0 73 - totalUsers := int64(0) 74 - 75 - for status := range results { 76 - // Determine status code 77 - statusCode := storage.PDSStatusOffline 78 - if status.Available { 79 - statusCode = storage.PDSStatusOnline 80 - } 58 + // Send jobs 59 + for _, server := range servers { 60 + jobs <- server 61 + } 62 + close(jobs) 81 63 82 - // Build scan data 83 - scanData := &storage.EndpointScanData{ 84 - ServerInfo: status.Description, 85 - DIDs: status.DIDs, 86 - DIDCount: len(status.DIDs), 87 - } 64 + // Wait for completion 65 + wg.Wait() 88 66 89 - // Update using Endpoint ID 90 - if err := s.db.UpdateEndpointStatus(ctx, status.EndpointID, &storage.EndpointUpdate{ 91 - Status: statusCode, 92 - LastChecked: status.LastChecked, 93 - ResponseTime: status.ResponseTime.Seconds() * 1000, // Convert to ms 94 - ScanData: scanData, 95 - }); err != nil { 96 - log.Error("Error updating endpoint ID %d: %v", status.EndpointID, err) 97 - } 98 - 99 - if status.Available { 100 - successCount++ 101 - totalUsers += int64(len(status.DIDs)) 102 - } else { 103 - failureCount++ 104 - } 105 - } 106 - 107 - log.Info("PDS scan completed: %d available, %d unavailable, %d total users in %v", 108 - successCount, failureCount, totalUsers, time.Since(startTime)) 67 + log.Info("PDS scan completed in %v", time.Since(startTime)) 109 68 110 69 return nil 111 70 } 112 71 113 - func (s *Scanner) worker(ctx context.Context, jobs <-chan *storage.Endpoint, results chan<- *PDSStatus) { 72 + func (s *Scanner) worker(ctx context.Context, jobs <-chan *storage.Endpoint) { 114 73 for server := range jobs { 115 74 select { 116 75 case <-ctx.Done(): 117 76 return 118 77 default: 119 - status := s.scanPDS(ctx, server.ID, server.Endpoint) 120 - results <- status 78 + s.scanAndSaveEndpoint(ctx, server) 121 79 } 122 80 } 123 81 } 124 82 125 - func (s *Scanner) scanPDS(ctx context.Context, endpointID int64, endpoint string) *PDSStatus { 126 - status := &PDSStatus{ 127 - EndpointID: endpointID, 128 - Endpoint: endpoint, 129 - LastChecked: time.Now().UTC(), 83 + func (s *Scanner) scanAndSaveEndpoint(ctx context.Context, ep *storage.Endpoint) { 84 + // STEP 1: Resolve IP (before any network call) 85 + ip, err := ipinfo.ExtractIPFromEndpoint(ep.Endpoint) 86 + if err != nil { 87 + // Mark as offline due to DNS failure 88 + s.saveScanResult(ctx, ep.ID, &ScanResult{ 89 + Status: storage.EndpointStatusOffline, 90 + ErrorMessage: fmt.Sprintf("DNS resolution failed: %v", err), 91 + }) 92 + return 130 93 } 131 94 132 - // Health check 133 - available, responseTime, err := s.client.CheckHealth(ctx, endpoint) 134 - status.Available = available 135 - status.ResponseTime = responseTime 95 + // Update IP immediately 96 + s.db.UpdateEndpointIP(ctx, ep.ID, ip, time.Now()) 97 + 98 + // STEP 2: Health check 99 + available, responseTime, err := s.client.CheckHealth(ctx, ep.Endpoint) 100 + if err != nil || !available { 101 + errMsg := "health check failed" 102 + if err != nil { 103 + errMsg = err.Error() 104 + } 105 + s.saveScanResult(ctx, ep.ID, &ScanResult{ 106 + Status: storage.EndpointStatusOffline, 107 + ResponseTime: responseTime, 108 + ErrorMessage: errMsg, 109 + }) 110 + return 111 + } 136 112 113 + // STEP 3: Fetch PDS-specific data 114 + desc, err := s.client.DescribeServer(ctx, ep.Endpoint) 137 115 if err != nil { 138 - status.ErrorMessage = err.Error() 139 - return status 116 + log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err) 140 117 } 141 118 142 - if !available { 143 - status.ErrorMessage = "health check failed" 144 - return status 119 + dids, err := s.client.ListRepos(ctx, ep.Endpoint) 120 + if err != nil { 121 + log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err) 122 + dids = []string{} 145 123 } 146 124 147 - // Fetch and update IP info if needed 148 - s.updateIPInfoIfNeeded(ctx, endpointID, endpoint) 125 + // STEP 4: SAVE IMMEDIATELY 126 + s.saveScanResult(ctx, ep.ID, &ScanResult{ 127 + Status: storage.EndpointStatusOnline, 128 + ResponseTime: responseTime, 129 + Description: desc, 130 + DIDs: dids, 131 + }) 132 + 133 + // STEP 5: Fetch IP info if needed (async, with backoff) 134 + go s.updateIPInfoIfNeeded(ctx, ip) 135 + } 149 136 150 - // Describe server 151 - desc, err := s.client.DescribeServer(ctx, endpoint) 152 - if err != nil { 153 - log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(endpoint), err) 154 - } else { 155 - status.Description = desc 137 + func (s *Scanner) saveScanResult(ctx context.Context, endpointID int64, result *ScanResult) { 138 + // Build scan_data with PDS-specific info in Metadata 139 + scanData := &storage.EndpointScanData{ 140 + DIDCount: len(result.DIDs), 141 + Metadata: make(map[string]interface{}), 156 142 } 157 143 158 - // Optionally list repos (DIDs) - commented out by default for performance 159 - dids, err := s.client.ListRepos(ctx, endpoint) 160 - if err != nil { 161 - log.Verbose("Warning: failed to list repos for %s: %v", endpoint, err) 162 - status.DIDs = []string{} 144 + // Add PDS-specific metadata 145 + if result.Status == storage.EndpointStatusOnline { 146 + scanData.Metadata["user_count"] = len(result.DIDs) 147 + if result.Description != nil { 148 + scanData.Metadata["server_info"] = result.Description 149 + } 163 150 } else { 164 - status.DIDs = dids 165 - log.Verbose(" → Found %d users on %s", len(dids), endpoint) 151 + // Include error message for offline status 152 + if result.ErrorMessage != "" { 153 + scanData.Metadata["error"] = result.ErrorMessage 154 + } 166 155 } 167 156 168 - return status 157 + // Save scan record 158 + scan := &storage.EndpointScan{ 159 + EndpointID: endpointID, 160 + Status: result.Status, 161 + ResponseTime: result.ResponseTime.Seconds() * 1000, // Convert to ms 162 + ScanData: scanData, 163 + ScannedAt: time.Now(), 164 + } 165 + 166 + if err := s.db.SaveEndpointScan(ctx, scan); err != nil { 167 + log.Error("Failed to save scan for endpoint %d: %v", endpointID, err) 168 + } 169 + 170 + // Update endpoint status 171 + update := &storage.EndpointUpdate{ 172 + Status: result.Status, 173 + LastChecked: time.Now(), 174 + ResponseTime: result.ResponseTime.Seconds() * 1000, 175 + } 176 + 177 + if err := s.db.UpdateEndpointStatus(ctx, endpointID, update); err != nil { 178 + log.Error("Failed to update endpoint status for %d: %v", endpointID, err) 179 + } 169 180 } 170 181 171 - func (s *Scanner) updateIPInfoIfNeeded(ctx context.Context, endpointID int64, endpoint string) { 182 + func (s *Scanner) updateIPInfoIfNeeded(ctx context.Context, ip string) { 172 183 // Check if IP info client is in backoff 173 184 if s.ipInfoClient.IsInBackoff() { 174 - // Silently skip during backoff period 175 - return 176 - } 177 - 178 - // Extract IP from endpoint 179 - ip, err := ipinfo.ExtractIPFromEndpoint(endpoint) 180 - if err != nil { 181 - log.Verbose("Failed to extract IP from %s: %v", endpoint, err) 182 185 return 183 186 } 184 187 185 188 // Check if we need to update IP info 186 - shouldUpdate, err := s.db.ShouldUpdateIPInfo(ctx, endpointID, ip) 189 + exists, needsUpdate, err := s.db.ShouldUpdateIPInfo(ctx, ip) 187 190 if err != nil { 188 191 log.Verbose("Failed to check IP info status: %v", err) 189 192 return 190 193 } 191 194 192 - if !shouldUpdate { 193 - return // IP hasn't changed, no need to fetch 195 + if exists && !needsUpdate { 196 + return // IP info is fresh 194 197 } 195 198 196 199 // Fetch IP info from ipapi.is 197 - log.Verbose("Fetching IP info for %s (%s)", endpoint, ip) 200 + log.Verbose("Fetching IP info for %s", ip) 198 201 ipInfo, err := s.ipInfoClient.GetIPInfo(ctx, ip) 199 202 if err != nil { 200 203 // Log only once when backoff starts ··· 207 210 } 208 211 209 212 // Update database 210 - if err := s.db.UpdateEndpointIPInfo(ctx, endpointID, ip, ipInfo); err != nil { 211 - log.Error("Failed to update IP info for endpoint %d: %v", endpointID, err) 213 + if err := s.db.UpsertIPInfo(ctx, ip, ipInfo); err != nil { 214 + log.Error("Failed to update IP info for %s: %v", ip, err) 212 215 } else { 213 - log.Verbose("✓ Updated IP info for %s: %s", endpoint, ip) 216 + log.Verbose("✓ Updated IP info for %s", ip) 214 217 } 215 218 }
+8
internal/pds/types.go
··· 30 30 Description *ServerDescription 31 31 DIDs []string 32 32 } 33 + 34 + type ScanResult struct { 35 + Status int 36 + ResponseTime time.Duration 37 + ErrorMessage string 38 + Description *ServerDescription 39 + DIDs []string 40 + }
+12 -4
internal/storage/db.go
··· 24 24 UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error 25 25 GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) 26 26 GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) 27 - UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error 28 27 EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) 29 28 GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) 30 29 GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) 30 + UpdateEndpointIP(ctx context.Context, endpointID int64, ip string, resolvedAt time.Time) error 31 + SaveEndpointScan(ctx context.Context, scan *EndpointScan) error 32 + UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error 31 33 32 - // IP info operations 33 - ShouldUpdateIPInfo(ctx context.Context, endpointID int64, currentIP string) (bool, error) 34 - UpdateEndpointIPInfo(ctx context.Context, endpointID int64, ip string, ipInfo map[string]interface{}) error 34 + // PDS virtual endpoints (created via JOINs) 35 + GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) 36 + GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) 37 + GetPDSStats(ctx context.Context) (*PDSStats, error) 38 + 39 + // IP operations (IP as primary key) 40 + UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error 41 + GetIPInfo(ctx context.Context, ip string) (*IPInfo, error) 42 + ShouldUpdateIPInfo(ctx context.Context, ip string) (exists bool, needsUpdate bool, err error) 35 43 36 44 // Cursor operations 37 45 GetScanCursor(ctx context.Context, source string) (*ScanCursor, error)
+725 -362
internal/storage/postgres.go
··· 54 54 55 55 func (p *PostgresDB) Migrate() error { 56 56 schema := ` 57 - -- Endpoints table 57 + -- Endpoints table (NO user_count, NO ip_info) 58 58 CREATE TABLE IF NOT EXISTS endpoints ( 59 59 id BIGSERIAL PRIMARY KEY, 60 60 endpoint_type TEXT NOT NULL DEFAULT 'pds', ··· 62 62 discovered_at TIMESTAMP NOT NULL, 63 63 last_checked TIMESTAMP, 64 64 status INTEGER DEFAULT 0, 65 - user_count BIGINT DEFAULT 0, 66 65 ip TEXT, 67 - ip_info JSONB, 66 + ip_resolved_at TIMESTAMP, 68 67 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 69 68 UNIQUE(endpoint_type, endpoint) 70 69 ); ··· 72 71 CREATE INDEX IF NOT EXISTS idx_endpoints_type_endpoint ON endpoints(endpoint_type, endpoint); 73 72 CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status); 74 73 CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type); 75 - CREATE INDEX IF NOT EXISTS idx_endpoints_user_count ON endpoints(user_count); 76 74 CREATE INDEX IF NOT EXISTS idx_endpoints_ip ON endpoints(ip); 77 - CREATE INDEX IF NOT EXISTS idx_endpoints_ip_info ON endpoints USING gin(ip_info); 78 75 79 - CREATE TABLE IF NOT EXISTS pds_scans ( 76 + -- IP infos table (IP as PRIMARY KEY) 77 + CREATE TABLE IF NOT EXISTS ip_infos ( 78 + ip TEXT PRIMARY KEY, 79 + city TEXT, 80 + country TEXT, 81 + country_code TEXT, 82 + asn INTEGER, 83 + asn_org TEXT, 84 + is_datacenter BOOLEAN, 85 + is_vpn BOOLEAN, 86 + latitude REAL, 87 + longitude REAL, 88 + raw_data JSONB, 89 + fetched_at TIMESTAMP NOT NULL, 90 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 91 + ); 92 + 93 + CREATE INDEX IF NOT EXISTS idx_ip_infos_country_code ON ip_infos(country_code); 94 + CREATE INDEX IF NOT EXISTS idx_ip_infos_asn ON ip_infos(asn); 95 + 96 + -- Endpoint scans (renamed from pds_scans) 97 + CREATE TABLE IF NOT EXISTS endpoint_scans ( 80 98 id BIGSERIAL PRIMARY KEY, 81 - pds_id BIGINT NOT NULL, 99 + endpoint_id BIGINT NOT NULL, 82 100 status INTEGER NOT NULL, 83 101 response_time DOUBLE PRECISION, 84 102 scan_data JSONB, 85 103 scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 86 - FOREIGN KEY (pds_id) REFERENCES endpoints(id) ON DELETE CASCADE 104 + FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE 87 105 ); 88 106 89 - CREATE INDEX IF NOT EXISTS idx_pds_scans_pds_id ON pds_scans(pds_id); 90 - CREATE INDEX IF NOT EXISTS idx_pds_scans_scanned_at ON pds_scans(scanned_at); 91 - CREATE INDEX IF NOT EXISTS idx_pds_scans_scan_data ON pds_scans USING gin(scan_data); 107 + CREATE INDEX IF NOT EXISTS idx_endpoint_scans_endpoint_status_scanned ON endpoint_scans(endpoint_id, status, scanned_at DESC); 108 + CREATE INDEX IF NOT EXISTS idx_endpoint_scans_scanned_at ON endpoint_scans(scanned_at); 92 109 93 110 CREATE TABLE IF NOT EXISTS plc_metrics ( 94 111 id BIGSERIAL PRIMARY KEY, ··· 143 160 CREATE INDEX IF NOT EXISTS idx_mempool_did ON plc_mempool(did); 144 161 CREATE UNIQUE INDEX IF NOT EXISTS idx_mempool_cid ON plc_mempool(cid); 145 162 146 - -- Minimal dids table 147 - CREATE TABLE IF NOT EXISTS dids ( 148 - did TEXT PRIMARY KEY, 149 - bundle_numbers JSONB NOT NULL DEFAULT '[]'::jsonb, 150 - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 151 - ); 163 + -- Minimal dids table 164 + CREATE TABLE IF NOT EXISTS dids ( 165 + did TEXT PRIMARY KEY, 166 + bundle_numbers JSONB NOT NULL DEFAULT '[]'::jsonb, 167 + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 168 + ); 152 169 153 - CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 154 - CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at); 170 + CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 171 + CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at); 155 172 ` 156 173 157 174 _, err := p.db.Exec(schema) 158 175 return err 159 176 } 160 177 178 + // ===== ENDPOINT OPERATIONS ===== 179 + 180 + func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 181 + query := ` 182 + INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ip_resolved_at) 183 + VALUES ($1, $2, $3, $4, $5, $6, $7) 184 + ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 185 + last_checked = EXCLUDED.last_checked, 186 + status = EXCLUDED.status, 187 + ip = CASE 188 + WHEN EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '' THEN EXCLUDED.ip 189 + ELSE endpoints.ip 190 + END, 191 + ip_resolved_at = CASE 192 + WHEN EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '' THEN EXCLUDED.ip_resolved_at 193 + ELSE endpoints.ip_resolved_at 194 + END, 195 + updated_at = CURRENT_TIMESTAMP 196 + RETURNING id 197 + ` 198 + err := p.db.QueryRowContext(ctx, query, 199 + endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 200 + endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPResolvedAt).Scan(&endpoint.ID) 201 + return err 202 + } 203 + 204 + func (p *PostgresDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) { 205 + query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2)" 206 + var exists bool 207 + err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists) 208 + return exists, err 209 + } 210 + 211 + func (p *PostgresDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) { 212 + query := "SELECT id FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2" 213 + var id int64 214 + err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id) 215 + return id, err 216 + } 217 + 218 + func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 219 + query := ` 220 + SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, 221 + ip, ip_resolved_at, updated_at 222 + FROM endpoints 223 + WHERE endpoint = $1 AND endpoint_type = $2 224 + ` 225 + 226 + var ep Endpoint 227 + var lastChecked, ipResolvedAt sql.NullTime 228 + var ip sql.NullString 229 + 230 + err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 231 + &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 232 + &ep.Status, &ip, &ipResolvedAt, &ep.UpdatedAt, 233 + ) 234 + if err != nil { 235 + return nil, err 236 + } 237 + 238 + if lastChecked.Valid { 239 + ep.LastChecked = lastChecked.Time 240 + } 241 + if ip.Valid { 242 + ep.IP = ip.String 243 + } 244 + if ipResolvedAt.Valid { 245 + ep.IPResolvedAt = ipResolvedAt.Time 246 + } 247 + 248 + return &ep, nil 249 + } 250 + 251 + func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) { 252 + query := ` 253 + SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, 254 + ip, ip_resolved_at, updated_at 255 + FROM endpoints 256 + WHERE 1=1 257 + ` 258 + args := []interface{}{} 259 + argIdx := 1 260 + 261 + if filter != nil { 262 + if filter.Type != "" { 263 + query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx) 264 + args = append(args, filter.Type) 265 + argIdx++ 266 + } 267 + if filter.Status != "" { 268 + statusInt := EndpointStatusUnknown 269 + switch filter.Status { 270 + case "online": 271 + statusInt = EndpointStatusOnline 272 + case "offline": 273 + statusInt = EndpointStatusOffline 274 + } 275 + query += fmt.Sprintf(" AND status = $%d", argIdx) 276 + args = append(args, statusInt) 277 + argIdx++ 278 + } 279 + } 280 + 281 + query += " ORDER BY id DESC" 282 + 283 + if filter != nil && filter.Limit > 0 { 284 + query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 285 + args = append(args, filter.Limit, filter.Offset) 286 + } 287 + 288 + rows, err := p.db.QueryContext(ctx, query, args...) 289 + if err != nil { 290 + return nil, err 291 + } 292 + defer rows.Close() 293 + 294 + var endpoints []*Endpoint 295 + for rows.Next() { 296 + var ep Endpoint 297 + var lastChecked, ipResolvedAt sql.NullTime 298 + var ip sql.NullString 299 + 300 + err := rows.Scan( 301 + &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 302 + &ep.Status, &ip, &ipResolvedAt, &ep.UpdatedAt, 303 + ) 304 + if err != nil { 305 + return nil, err 306 + } 307 + 308 + if lastChecked.Valid { 309 + ep.LastChecked = lastChecked.Time 310 + } 311 + if ip.Valid { 312 + ep.IP = ip.String 313 + } 314 + if ipResolvedAt.Valid { 315 + ep.IPResolvedAt = ipResolvedAt.Time 316 + } 317 + 318 + endpoints = append(endpoints, &ep) 319 + } 320 + 321 + return endpoints, rows.Err() 322 + } 323 + 324 + func (p *PostgresDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error { 325 + query := ` 326 + UPDATE endpoints 327 + SET status = $1, last_checked = $2, updated_at = $3 328 + WHERE id = $4 329 + ` 330 + _, err := p.db.ExecContext(ctx, query, update.Status, update.LastChecked, time.Now().UTC(), endpointID) 331 + return err 332 + } 333 + 334 + func (p *PostgresDB) UpdateEndpointIP(ctx context.Context, endpointID int64, ip string, resolvedAt time.Time) error { 335 + query := ` 336 + UPDATE endpoints 337 + SET ip = $1, ip_resolved_at = $2, updated_at = $3 338 + WHERE id = $4 339 + ` 340 + _, err := p.db.ExecContext(ctx, query, ip, resolvedAt, time.Now(), endpointID) 341 + return err 342 + } 343 + 344 + // ===== SCAN OPERATIONS ===== 345 + 346 + func (p *PostgresDB) SaveEndpointScan(ctx context.Context, scan *EndpointScan) error { 347 + var scanDataJSON []byte 348 + if scan.ScanData != nil { 349 + scanDataJSON, _ = json.Marshal(scan.ScanData) 350 + } 351 + 352 + query := ` 353 + INSERT INTO endpoint_scans (endpoint_id, status, response_time, scan_data, scanned_at) 354 + VALUES ($1, $2, $3, $4, $5) 355 + ` 356 + _, err := p.db.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scanDataJSON, scan.ScannedAt) 357 + return err 358 + } 359 + 360 + func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) { 361 + query := ` 362 + SELECT id, endpoint_id, status, response_time, scan_data, scanned_at 363 + FROM endpoint_scans 364 + WHERE endpoint_id = $1 365 + ORDER BY scanned_at DESC 366 + LIMIT $2 367 + ` 368 + 369 + rows, err := p.db.QueryContext(ctx, query, endpointID, limit) 370 + if err != nil { 371 + return nil, err 372 + } 373 + defer rows.Close() 374 + 375 + var scans []*EndpointScan 376 + for rows.Next() { 377 + var scan EndpointScan 378 + var responseTime sql.NullFloat64 379 + var scanDataJSON []byte 380 + 381 + err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt) 382 + if err != nil { 383 + return nil, err 384 + } 385 + 386 + if responseTime.Valid { 387 + scan.ResponseTime = responseTime.Float64 388 + } 389 + 390 + if len(scanDataJSON) > 0 { 391 + var scanData EndpointScanData 392 + if err := json.Unmarshal(scanDataJSON, &scanData); err == nil { 393 + scan.ScanData = &scanData 394 + } 395 + } 396 + 397 + scans = append(scans, &scan) 398 + } 399 + 400 + return scans, rows.Err() 401 + } 402 + 403 + // ===== PDS VIRTUAL ENDPOINTS ===== 404 + 405 + func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) { 406 + query := ` 407 + SELECT 408 + e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip, 409 + latest.user_count, latest.response_time, latest.scanned_at, 410 + i.city, i.country, i.country_code, i.asn, i.asn_org, 411 + i.is_datacenter, i.is_vpn, i.latitude, i.longitude 412 + FROM endpoints e 413 + LEFT JOIN LATERAL ( 414 + SELECT 415 + COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count, 416 + response_time, 417 + scanned_at 418 + FROM endpoint_scans 419 + WHERE endpoint_id = e.id AND status = 1 420 + ORDER BY scanned_at DESC 421 + LIMIT 1 422 + ) latest ON true 423 + LEFT JOIN ip_infos i ON e.ip = i.ip 424 + WHERE e.endpoint_type = 'pds' 425 + ` 426 + 427 + args := []interface{}{} 428 + argIdx := 1 429 + 430 + if filter != nil { 431 + if filter.Status != "" { 432 + statusInt := EndpointStatusUnknown 433 + switch filter.Status { 434 + case "online": 435 + statusInt = EndpointStatusOnline 436 + case "offline": 437 + statusInt = EndpointStatusOffline 438 + } 439 + query += fmt.Sprintf(" AND e.status = $%d", argIdx) 440 + args = append(args, statusInt) 441 + argIdx++ 442 + } 443 + 444 + if filter.MinUserCount > 0 { 445 + query += fmt.Sprintf(" AND latest.user_count >= $%d", argIdx) 446 + args = append(args, filter.MinUserCount) 447 + argIdx++ 448 + } 449 + } 450 + 451 + query += " ORDER BY latest.user_count DESC NULLS LAST" 452 + 453 + if filter != nil && filter.Limit > 0 { 454 + query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 455 + args = append(args, filter.Limit, filter.Offset) 456 + } 457 + 458 + rows, err := p.db.QueryContext(ctx, query, args...) 459 + if err != nil { 460 + return nil, err 461 + } 462 + defer rows.Close() 463 + 464 + var items []*PDSListItem 465 + for rows.Next() { 466 + item := &PDSListItem{} 467 + var ip, city, country, countryCode, asnOrg sql.NullString 468 + var asn sql.NullInt32 469 + var isDatacenter, isVPN sql.NullBool 470 + var lat, lon sql.NullFloat64 471 + var userCount sql.NullInt32 472 + var responseTime sql.NullFloat64 473 + var scannedAt sql.NullTime 474 + 475 + err := rows.Scan( 476 + &item.ID, &item.Endpoint, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, 477 + &userCount, &responseTime, &scannedAt, 478 + &city, &country, &countryCode, &asn, &asnOrg, 479 + &isDatacenter, &isVPN, &lat, &lon, 480 + ) 481 + if err != nil { 482 + return nil, err 483 + } 484 + 485 + if ip.Valid { 486 + item.IP = ip.String 487 + } 488 + 489 + // Add latest scan data if available 490 + if userCount.Valid { 491 + item.LatestScan = &struct { 492 + UserCount int 493 + ResponseTime float64 494 + ScannedAt time.Time 495 + }{ 496 + UserCount: int(userCount.Int32), 497 + ResponseTime: responseTime.Float64, 498 + ScannedAt: scannedAt.Time, 499 + } 500 + } 501 + 502 + // Add IP info if available 503 + if city.Valid || country.Valid { 504 + item.IPInfo = &IPInfo{ 505 + IP: ip.String, 506 + City: city.String, 507 + Country: country.String, 508 + CountryCode: countryCode.String, 509 + ASN: int(asn.Int32), 510 + ASNOrg: asnOrg.String, 511 + IsDatacenter: isDatacenter.Bool, 512 + IsVPN: isVPN.Bool, 513 + Latitude: float32(lat.Float64), 514 + Longitude: float32(lon.Float64), 515 + } 516 + } 517 + 518 + items = append(items, item) 519 + } 520 + 521 + return items, rows.Err() 522 + } 523 + 524 + func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 525 + query := ` 526 + SELECT 527 + e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip, 528 + COALESCE((latest.scan_data->'metadata'->>'user_count')::int, 0) as user_count, 529 + latest.response_time, 530 + latest.scan_data->'metadata'->'server_info' as server_info, 531 + latest.scanned_at, 532 + i.city, i.country, i.country_code, i.asn, i.asn_org, 533 + i.is_datacenter, i.is_vpn, i.latitude, i.longitude 534 + FROM endpoints e 535 + LEFT JOIN LATERAL ( 536 + SELECT scan_data, response_time, scanned_at 537 + FROM endpoint_scans 538 + WHERE endpoint_id = e.id 539 + ORDER BY scanned_at DESC 540 + LIMIT 1 541 + ) latest ON true 542 + LEFT JOIN ip_infos i ON e.ip = i.ip 543 + WHERE e.endpoint = $1 AND e.endpoint_type = 'pds' 544 + ` 545 + 546 + detail := &PDSDetail{} 547 + var ip, city, country, countryCode, asnOrg sql.NullString 548 + var asn sql.NullInt32 549 + var isDatacenter, isVPN sql.NullBool 550 + var lat, lon sql.NullFloat64 551 + var userCount sql.NullInt32 552 + var responseTime sql.NullFloat64 553 + var serverInfoJSON []byte 554 + var scannedAt sql.NullTime 555 + 556 + err := p.db.QueryRowContext(ctx, query, endpoint).Scan( 557 + &detail.ID, &detail.Endpoint, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, 558 + &userCount, &responseTime, &serverInfoJSON, &scannedAt, 559 + &city, &country, &countryCode, &asn, &asnOrg, 560 + &isDatacenter, &isVPN, &lat, &lon, 561 + ) 562 + if err != nil { 563 + return nil, err 564 + } 565 + 566 + if ip.Valid { 567 + detail.IP = ip.String 568 + } 569 + 570 + // Parse latest scan data 571 + if userCount.Valid { 572 + var serverInfo interface{} 573 + if len(serverInfoJSON) > 0 { 574 + json.Unmarshal(serverInfoJSON, &serverInfo) 575 + } 576 + 577 + detail.LatestScan = &struct { 578 + UserCount int 579 + ResponseTime float64 580 + ServerInfo interface{} 581 + ScannedAt time.Time 582 + }{ 583 + UserCount: int(userCount.Int32), 584 + ResponseTime: responseTime.Float64, 585 + ServerInfo: serverInfo, 586 + ScannedAt: scannedAt.Time, 587 + } 588 + } 589 + 590 + // Parse IP info 591 + if city.Valid || country.Valid { 592 + detail.IPInfo = &IPInfo{ 593 + IP: ip.String, 594 + City: city.String, 595 + Country: country.String, 596 + CountryCode: countryCode.String, 597 + ASN: int(asn.Int32), 598 + ASNOrg: asnOrg.String, 599 + IsDatacenter: isDatacenter.Bool, 600 + IsVPN: isVPN.Bool, 601 + Latitude: float32(lat.Float64), 602 + Longitude: float32(lon.Float64), 603 + } 604 + } 605 + 606 + return detail, nil 607 + } 608 + 609 + func (p *PostgresDB) GetPDSStats(ctx context.Context) (*PDSStats, error) { 610 + // PDS stats - aggregate from latest scans 611 + query := ` 612 + WITH latest_scans AS ( 613 + SELECT DISTINCT ON (endpoint_id) 614 + endpoint_id, 615 + COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count, 616 + status 617 + FROM endpoint_scans 618 + WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds') 619 + ORDER BY endpoint_id, scanned_at DESC 620 + ) 621 + SELECT 622 + COUNT(*) as total, 623 + SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online, 624 + SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline, 625 + SUM(user_count) as total_users 626 + FROM latest_scans 627 + ` 628 + 629 + stats := &PDSStats{} 630 + err := p.db.QueryRowContext(ctx, query).Scan( 631 + &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, &stats.TotalDIDs, 632 + ) 633 + 634 + return stats, err 635 + } 636 + 637 + func (p *PostgresDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) { 638 + query := ` 639 + SELECT 640 + COUNT(*) as total_endpoints, 641 + SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints, 642 + SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints 643 + FROM endpoints 644 + ` 645 + 646 + var stats EndpointStats 647 + err := p.db.QueryRowContext(ctx, query).Scan( 648 + &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, 649 + ) 650 + 651 + // Get average response time from recent scans 652 + avgQuery := ` 653 + SELECT AVG(response_time) 654 + FROM endpoint_scans 655 + WHERE response_time > 0 AND scanned_at > NOW() - INTERVAL '1 hour' 656 + ` 657 + var avgResponseTime sql.NullFloat64 658 + p.db.QueryRowContext(ctx, avgQuery).Scan(&avgResponseTime) 659 + if avgResponseTime.Valid { 660 + stats.AvgResponseTime = avgResponseTime.Float64 661 + } 662 + 663 + // Get counts by type 664 + typeQuery := ` 665 + SELECT endpoint_type, COUNT(*) 666 + FROM endpoints 667 + GROUP BY endpoint_type 668 + ` 669 + rows, err := p.db.QueryContext(ctx, typeQuery) 670 + if err == nil { 671 + defer rows.Close() 672 + stats.ByType = make(map[string]int64) 673 + for rows.Next() { 674 + var typ string 675 + var count int64 676 + if err := rows.Scan(&typ, &count); err == nil { 677 + stats.ByType[typ] = count 678 + } 679 + } 680 + } 681 + 682 + // Get total DIDs from latest PDS scans 683 + didQuery := ` 684 + WITH latest_pds_scans AS ( 685 + SELECT DISTINCT ON (endpoint_id) 686 + endpoint_id, 687 + COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count 688 + FROM endpoint_scans 689 + WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds') 690 + ORDER BY endpoint_id, scanned_at DESC 691 + ) 692 + SELECT SUM(user_count) FROM latest_pds_scans 693 + ` 694 + var totalDIDs sql.NullInt64 695 + p.db.QueryRowContext(ctx, didQuery).Scan(&totalDIDs) 696 + if totalDIDs.Valid { 697 + stats.TotalDIDs = totalDIDs.Int64 698 + } 699 + 700 + return &stats, err 701 + } 702 + 703 + // ===== IP INFO OPERATIONS ===== 704 + 705 + func (p *PostgresDB) UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error { 706 + rawDataJSON, _ := json.Marshal(ipInfo) 707 + 708 + // Extract fields from ipInfo map 709 + city := extractString(ipInfo, "location", "city") 710 + country := extractString(ipInfo, "location", "country") 711 + countryCode := extractString(ipInfo, "location", "country_code") 712 + asn := extractInt(ipInfo, "asn", "asn") 713 + asnOrg := extractString(ipInfo, "asn", "org") 714 + isDatacenter := extractBool(ipInfo, "company", "type", "hosting") 715 + isVPN := extractBool(ipInfo, "security", "vpn") 716 + lat := extractFloat(ipInfo, "location", "latitude") 717 + lon := extractFloat(ipInfo, "location", "longitude") 718 + 719 + query := ` 720 + INSERT INTO ip_infos (ip, city, country, country_code, asn, asn_org, is_datacenter, is_vpn, latitude, longitude, raw_data, fetched_at) 721 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 722 + ON CONFLICT(ip) DO UPDATE SET 723 + city = EXCLUDED.city, 724 + country = EXCLUDED.country, 725 + country_code = EXCLUDED.country_code, 726 + asn = EXCLUDED.asn, 727 + asn_org = EXCLUDED.asn_org, 728 + is_datacenter = EXCLUDED.is_datacenter, 729 + is_vpn = EXCLUDED.is_vpn, 730 + latitude = EXCLUDED.latitude, 731 + longitude = EXCLUDED.longitude, 732 + raw_data = EXCLUDED.raw_data, 733 + fetched_at = EXCLUDED.fetched_at, 734 + updated_at = CURRENT_TIMESTAMP 735 + ` 736 + _, err := p.db.ExecContext(ctx, query, ip, city, country, countryCode, asn, asnOrg, isDatacenter, isVPN, lat, lon, rawDataJSON, time.Now()) 737 + return err 738 + } 739 + 740 + func (p *PostgresDB) GetIPInfo(ctx context.Context, ip string) (*IPInfo, error) { 741 + query := ` 742 + SELECT ip, city, country, country_code, asn, asn_org, is_datacenter, is_vpn, 743 + latitude, longitude, raw_data, fetched_at, updated_at 744 + FROM ip_infos 745 + WHERE ip = $1 746 + ` 747 + 748 + info := &IPInfo{} 749 + var rawDataJSON []byte 750 + 751 + err := p.db.QueryRowContext(ctx, query, ip).Scan( 752 + &info.IP, &info.City, &info.Country, &info.CountryCode, &info.ASN, &info.ASNOrg, 753 + &info.IsDatacenter, &info.IsVPN, &info.Latitude, &info.Longitude, 754 + &rawDataJSON, &info.FetchedAt, &info.UpdatedAt, 755 + ) 756 + if err != nil { 757 + return nil, err 758 + } 759 + 760 + if len(rawDataJSON) > 0 { 761 + json.Unmarshal(rawDataJSON, &info.RawData) 762 + } 763 + 764 + return info, nil 765 + } 766 + 767 + func (p *PostgresDB) ShouldUpdateIPInfo(ctx context.Context, ip string) (bool, bool, error) { 768 + query := `SELECT fetched_at FROM ip_infos WHERE ip = $1` 769 + 770 + var fetchedAt time.Time 771 + err := p.db.QueryRowContext(ctx, query, ip).Scan(&fetchedAt) 772 + if err == sql.ErrNoRows { 773 + return false, true, nil // Doesn't exist, needs update 774 + } 775 + if err != nil { 776 + return false, false, err 777 + } 778 + 779 + // Check if older than 30 days 780 + needsUpdate := time.Since(fetchedAt) > 30*24*time.Hour 781 + return true, needsUpdate, nil 782 + } 783 + 784 + // ===== HELPER FUNCTIONS ===== 785 + 786 + func extractString(data map[string]interface{}, keys ...string) string { 787 + current := data 788 + for i, key := range keys { 789 + if i == len(keys)-1 { 790 + if val, ok := current[key].(string); ok { 791 + return val 792 + } 793 + return "" 794 + } 795 + if nested, ok := current[key].(map[string]interface{}); ok { 796 + current = nested 797 + } else { 798 + return "" 799 + } 800 + } 801 + return "" 802 + } 803 + 804 + func extractInt(data map[string]interface{}, keys ...string) int { 805 + current := data 806 + for i, key := range keys { 807 + if i == len(keys)-1 { 808 + if val, ok := current[key].(float64); ok { 809 + return int(val) 810 + } 811 + if val, ok := current[key].(int); ok { 812 + return val 813 + } 814 + return 0 815 + } 816 + if nested, ok := current[key].(map[string]interface{}); ok { 817 + current = nested 818 + } else { 819 + return 0 820 + } 821 + } 822 + return 0 823 + } 824 + 825 + func extractFloat(data map[string]interface{}, keys ...string) float32 { 826 + current := data 827 + for i, key := range keys { 828 + if i == len(keys)-1 { 829 + if val, ok := current[key].(float64); ok { 830 + return float32(val) 831 + } 832 + return 0 833 + } 834 + if nested, ok := current[key].(map[string]interface{}); ok { 835 + current = nested 836 + } else { 837 + return 0 838 + } 839 + } 840 + return 0 841 + } 842 + 843 + func extractBool(data map[string]interface{}, keys ...string) bool { 844 + current := data 845 + for i, key := range keys { 846 + if i == len(keys)-1 { 847 + if val, ok := current[key].(bool); ok { 848 + return val 849 + } 850 + // Check if it's a string that matches (for type="hosting") 851 + if val, ok := current[key].(string); ok { 852 + // For cases like company.type == "hosting" 853 + expectedValue := keys[len(keys)-1] 854 + return val == expectedValue 855 + } 856 + return false 857 + } 858 + if nested, ok := current[key].(map[string]interface{}); ok { 859 + current = nested 860 + } else { 861 + return false 862 + } 863 + } 864 + return false 865 + } 866 + 161 867 // ===== BUNDLE OPERATIONS ===== 162 868 163 869 func (p *PostgresDB) CreateBundle(ctx context.Context, bundle *PLCBundle) error { ··· 541 1247 var size int64 542 1248 err := p.db.QueryRowContext(ctx, query).Scan(&size) 543 1249 return size, err 544 - } 545 - 546 - // ===== ENDPOINT OPERATIONS ===== 547 - 548 - func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 549 - var ipInfoJSON []byte 550 - var err error 551 - if endpoint.IPInfo != nil { 552 - ipInfoJSON, err = json.Marshal(endpoint.IPInfo) 553 - if err != nil { 554 - return fmt.Errorf("failed to marshal ip_info: %w", err) 555 - } 556 - } 557 - 558 - query := ` 559 - INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ip_info) 560 - VALUES ($1, $2, $3, $4, $5, $6, $7) 561 - ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 562 - last_checked = EXCLUDED.last_checked, 563 - ip = CASE 564 - WHEN EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '' THEN EXCLUDED.ip 565 - ELSE endpoints.ip 566 - END, 567 - ip_info = CASE 568 - WHEN EXCLUDED.ip_info IS NOT NULL THEN EXCLUDED.ip_info 569 - ELSE endpoints.ip_info 570 - END 571 - RETURNING id 572 - ` 573 - err = p.db.QueryRowContext(ctx, query, 574 - endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 575 - endpoint.LastChecked, endpoint.Status, endpoint.IP, ipInfoJSON).Scan(&endpoint.ID) 576 - return err 577 - } 578 - 579 - func (p *PostgresDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) { 580 - query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2)" 581 - var exists bool 582 - err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists) 583 - return exists, err 584 - } 585 - 586 - func (p *PostgresDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) { 587 - query := "SELECT id FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2" 588 - var id int64 589 - err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id) 590 - return id, err 591 - } 592 - 593 - func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 594 - query := ` 595 - SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, 596 - ip, ip_info, updated_at 597 - FROM endpoints 598 - WHERE endpoint = $1 AND endpoint_type = $2 599 - ` 600 - 601 - var ep Endpoint 602 - var lastChecked sql.NullTime 603 - var ip sql.NullString 604 - var ipInfoJSON []byte 605 - 606 - err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 607 - &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 608 - &ep.Status, &ep.UserCount, &ip, &ipInfoJSON, &ep.UpdatedAt, 609 - ) 610 - if err != nil { 611 - return nil, err 612 - } 613 - 614 - if lastChecked.Valid { 615 - ep.LastChecked = lastChecked.Time 616 - } 617 - 618 - if ip.Valid { 619 - ep.IP = ip.String 620 - } 621 - 622 - if len(ipInfoJSON) > 0 { 623 - var ipInfo map[string]interface{} 624 - if err := json.Unmarshal(ipInfoJSON, &ipInfo); err == nil { 625 - ep.IPInfo = ipInfo 626 - } 627 - } 628 - 629 - return &ep, nil 630 - } 631 - 632 - func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) { 633 - query := ` 634 - SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, user_count, 635 - ip, ip_info, updated_at 636 - FROM endpoints 637 - WHERE 1=1 638 - ` 639 - args := []interface{}{} 640 - argIdx := 1 641 - 642 - if filter != nil { 643 - if filter.Type != "" { 644 - query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx) 645 - args = append(args, filter.Type) 646 - argIdx++ 647 - } 648 - if filter.Status != "" { 649 - statusInt := EndpointStatusUnknown 650 - switch filter.Status { 651 - case "online": 652 - statusInt = EndpointStatusOnline 653 - case "offline": 654 - statusInt = EndpointStatusOffline 655 - } 656 - query += fmt.Sprintf(" AND status = $%d", argIdx) 657 - args = append(args, statusInt) 658 - argIdx++ 659 - } 660 - if filter.MinUserCount > 0 { 661 - query += fmt.Sprintf(" AND user_count >= $%d", argIdx) 662 - args = append(args, filter.MinUserCount) 663 - argIdx++ 664 - } 665 - } 666 - 667 - query += " ORDER BY user_count DESC" 668 - 669 - if filter != nil && filter.Limit > 0 { 670 - query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 671 - args = append(args, filter.Limit, filter.Offset) 672 - } 673 - 674 - rows, err := p.db.QueryContext(ctx, query, args...) 675 - if err != nil { 676 - return nil, err 677 - } 678 - defer rows.Close() 679 - 680 - var endpoints []*Endpoint 681 - for rows.Next() { 682 - var ep Endpoint 683 - var lastChecked sql.NullTime 684 - var ip sql.NullString 685 - var ipInfoJSON []byte 686 - 687 - err := rows.Scan( 688 - &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 689 - &ep.Status, &ep.UserCount, &ip, &ipInfoJSON, &ep.UpdatedAt, 690 - ) 691 - if err != nil { 692 - return nil, err 693 - } 694 - 695 - if lastChecked.Valid { 696 - ep.LastChecked = lastChecked.Time 697 - } 698 - 699 - if ip.Valid { 700 - ep.IP = ip.String 701 - } 702 - 703 - if len(ipInfoJSON) > 0 { 704 - var ipInfo map[string]interface{} 705 - if err := json.Unmarshal(ipInfoJSON, &ipInfo); err == nil { 706 - ep.IPInfo = ipInfo 707 - } 708 - } 709 - 710 - endpoints = append(endpoints, &ep) 711 - } 712 - 713 - return endpoints, rows.Err() 714 - } 715 - 716 - func (p *PostgresDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error { 717 - tx, err := p.db.BeginTx(ctx, nil) 718 - if err != nil { 719 - return err 720 - } 721 - defer tx.Rollback() 722 - 723 - userCount := 0 724 - if update.ScanData != nil { 725 - userCount = update.ScanData.DIDCount 726 - } 727 - 728 - query := ` 729 - UPDATE endpoints 730 - SET status = $1, last_checked = $2, user_count = $3, updated_at = $4 731 - WHERE id = $5 732 - ` 733 - _, err = tx.ExecContext(ctx, query, update.Status, update.LastChecked, userCount, time.Now().UTC(), endpointID) 734 - if err != nil { 735 - return err 736 - } 737 - 738 - var scanDataJSON []byte 739 - if update.ScanData != nil { 740 - scanDataJSON, _ = json.Marshal(update.ScanData) 741 - } 742 - 743 - scanQuery := ` 744 - INSERT INTO pds_scans (pds_id, status, response_time, scan_data) 745 - VALUES ($1, $2, $3, $4) 746 - ` 747 - _, err = tx.ExecContext(ctx, scanQuery, endpointID, update.Status, update.ResponseTime, scanDataJSON) 748 - if err != nil { 749 - return err 750 - } 751 - 752 - // Keep only the 3 most recent scans per endpoint 753 - cleanupQuery := ` 754 - DELETE FROM pds_scans 755 - WHERE pds_id = $1 756 - AND id NOT IN ( 757 - SELECT id FROM pds_scans 758 - WHERE pds_id = $1 759 - ORDER BY scanned_at DESC 760 - LIMIT 3 761 - ) 762 - ` 763 - _, err = tx.ExecContext(ctx, cleanupQuery, endpointID) 764 - if err != nil { 765 - return err 766 - } 767 - 768 - return tx.Commit() 769 - } 770 - 771 - func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) { 772 - query := ` 773 - SELECT id, pds_id, status, response_time, scan_data, scanned_at 774 - FROM pds_scans 775 - WHERE pds_id = $1 776 - ORDER BY scanned_at DESC 777 - LIMIT $2 778 - ` 779 - 780 - rows, err := p.db.QueryContext(ctx, query, endpointID, limit) 781 - if err != nil { 782 - return nil, err 783 - } 784 - defer rows.Close() 785 - 786 - var scans []*EndpointScan 787 - for rows.Next() { 788 - var scan EndpointScan 789 - var responseTime sql.NullFloat64 790 - var scanDataJSON []byte 791 - 792 - err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt) 793 - if err != nil { 794 - return nil, err 795 - } 796 - 797 - if responseTime.Valid { 798 - scan.ResponseTime = responseTime.Float64 799 - } 800 - 801 - if len(scanDataJSON) > 0 { 802 - var scanData EndpointScanData 803 - if err := json.Unmarshal(scanDataJSON, &scanData); err == nil { 804 - scan.ScanData = &scanData 805 - } 806 - } 807 - 808 - scans = append(scans, &scan) 809 - } 810 - 811 - return scans, rows.Err() 812 - } 813 - 814 - func (p *PostgresDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) { 815 - query := ` 816 - SELECT 817 - COUNT(*) as total_endpoints, 818 - SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints, 819 - SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints, 820 - (SELECT AVG(response_time) FROM pds_scans WHERE response_time > 0 821 - AND scanned_at > NOW() - INTERVAL '1 hour') as avg_response_time, 822 - SUM(user_count) as total_dids 823 - FROM endpoints 824 - ` 825 - 826 - var stats EndpointStats 827 - var avgResponseTime sql.NullFloat64 828 - 829 - err := p.db.QueryRowContext(ctx, query).Scan( 830 - &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, 831 - &avgResponseTime, &stats.TotalDIDs, 832 - ) 833 - 834 - if avgResponseTime.Valid { 835 - stats.AvgResponseTime = avgResponseTime.Float64 836 - } 837 - 838 - typeQuery := ` 839 - SELECT endpoint_type, COUNT(*) 840 - FROM endpoints 841 - GROUP BY endpoint_type 842 - ` 843 - rows, err := p.db.QueryContext(ctx, typeQuery) 844 - if err == nil { 845 - defer rows.Close() 846 - stats.ByType = make(map[string]int64) 847 - for rows.Next() { 848 - var typ string 849 - var count int64 850 - if err := rows.Scan(&typ, &count); err == nil { 851 - stats.ByType[typ] = count 852 - } 853 - } 854 - } 855 - 856 - return &stats, err 857 - } 858 - 859 - // Add method to check if IP needs update 860 - func (p *PostgresDB) ShouldUpdateIPInfo(ctx context.Context, endpointID int64, currentIP string) (bool, error) { 861 - query := `SELECT ip FROM endpoints WHERE id = $1` 862 - 863 - var storedIP sql.NullString 864 - err := p.db.QueryRowContext(ctx, query, endpointID).Scan(&storedIP) 865 - if err != nil { 866 - return false, err 867 - } 868 - 869 - // Update if no IP stored or IP changed 870 - return !storedIP.Valid || storedIP.String != currentIP, nil 871 - } 872 - 873 - // Update IP info for an endpoint 874 - func (p *PostgresDB) UpdateEndpointIPInfo(ctx context.Context, endpointID int64, ip string, ipInfo map[string]interface{}) error { 875 - ipInfoJSON, err := json.Marshal(ipInfo) 876 - if err != nil { 877 - return fmt.Errorf("failed to marshal ip_info: %w", err) 878 - } 879 - 880 - query := ` 881 - UPDATE endpoints 882 - SET ip = $1, ip_info = $2, updated_at = $3 883 - WHERE id = $4 884 - ` 885 - _, err = p.db.ExecContext(ctx, query, ip, ipInfoJSON, time.Now(), endpointID) 886 - return err 887 1250 } 888 1251 889 1252 // ===== CURSOR OPERATIONS =====
+57 -7
internal/storage/types.go
··· 18 18 // Endpoint represents any AT Protocol service endpoint 19 19 type Endpoint struct { 20 20 ID int64 21 - EndpointType string // "pds", "labeler", etc. 21 + EndpointType string 22 22 Endpoint string 23 23 DiscoveredAt time.Time 24 24 LastChecked time.Time 25 25 Status int 26 - UserCount int64 27 26 IP string 28 - IPInfo map[string]interface{} 27 + IPResolvedAt time.Time 29 28 UpdatedAt time.Time 30 29 } 31 30 ··· 39 38 40 39 // EndpointScanData contains data from an endpoint scan 41 40 type EndpointScanData struct { 42 - ServerInfo interface{} `json:"server_info,omitempty"` 43 - DIDs []string `json:"dids,omitempty"` 44 - DIDCount int `json:"did_count"` 45 - Metadata interface{} `json:"metadata,omitempty"` // Type-specific metadata 41 + ServerInfo interface{} `json:"server_info,omitempty"` 42 + DIDs []string `json:"dids,omitempty"` 43 + DIDCount int `json:"did_count"` 44 + Metadata map[string]interface{} `json:"metadata,omitempty"` 46 45 } 47 46 48 47 // EndpointScan represents a historical endpoint scan ··· 159 158 BundleNumbers []int `json:"bundle_numbers"` 160 159 CreatedAt time.Time `json:"created_at"` 161 160 } 161 + 162 + // IPInfo represents IP information (stored with IP as primary key) 163 + type IPInfo struct { 164 + IP string 165 + City string 166 + Country string 167 + CountryCode string 168 + ASN int 169 + ASNOrg string 170 + IsDatacenter bool 171 + IsVPN bool 172 + Latitude float32 173 + Longitude float32 174 + RawData map[string]interface{} 175 + FetchedAt time.Time 176 + UpdatedAt time.Time 177 + } 178 + 179 + // PDSListItem is a virtual type created by JOIN for /pds endpoint 180 + type PDSListItem struct { 181 + // From endpoints table 182 + ID int64 183 + Endpoint string 184 + DiscoveredAt time.Time 185 + LastChecked time.Time 186 + Status int 187 + IP string 188 + 189 + // From latest endpoint_scans (via JOIN) 190 + LatestScan *struct { 191 + UserCount int 192 + ResponseTime float64 193 + ScannedAt time.Time 194 + } 195 + 196 + // From ip_infos table (via JOIN on endpoints.ip) 197 + IPInfo *IPInfo 198 + } 199 + 200 + // PDSDetail is extended version for /pds/{endpoint} 201 + type PDSDetail struct { 202 + PDSListItem 203 + 204 + // Additional data from latest scan 205 + LatestScan *struct { 206 + UserCount int 207 + ResponseTime float64 208 + ServerInfo interface{} // Full server description 209 + ScannedAt time.Time 210 + } 211 + }
+160
utils/migrate-ipinfo.sh
··· 1 + #!/bin/bash 2 + # migrate_ipinfo.sh - Migrate IP info from endpoints to ip_infos table 3 + 4 + # Configuration (edit these) 5 + DB_HOST="localhost" 6 + DB_PORT="5432" 7 + DB_NAME="atscanner" 8 + DB_USER="atscanner" 9 + DB_PASSWORD="Noor1kooz5eeFai9leZagh5ua5eihai4" 10 + 11 + # Colors for output 12 + RED='\033[0;31m' 13 + GREEN='\033[0;32m' 14 + YELLOW='\033[1;33m' 15 + NC='\033[0m' # No Color 16 + 17 + echo -e "${GREEN}=== IP Info Migration Script ===${NC}" 18 + echo "" 19 + 20 + # Export password for psql 21 + export PGPASSWORD="$DB_PASSWORD" 22 + 23 + # Check if we can connect 24 + echo -e "${YELLOW}Testing database connection...${NC}" 25 + if ! psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -c "SELECT 1;" > /dev/null 2>&1; then 26 + echo -e "${RED}Error: Cannot connect to database${NC}" 27 + exit 1 28 + fi 29 + echo -e "${GREEN}✓ Connected to database${NC}" 30 + echo "" 31 + 32 + # Create ip_infos table if it doesn't exist 33 + echo -e "${YELLOW}Creating ip_infos table...${NC}" 34 + psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" << 'SQL' 35 + CREATE TABLE IF NOT EXISTS ip_infos ( 36 + ip TEXT PRIMARY KEY, 37 + city TEXT, 38 + country TEXT, 39 + country_code TEXT, 40 + asn INTEGER, 41 + asn_org TEXT, 42 + is_datacenter BOOLEAN, 43 + is_vpn BOOLEAN, 44 + latitude REAL, 45 + longitude REAL, 46 + raw_data JSONB, 47 + fetched_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 48 + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 49 + ); 50 + 51 + CREATE INDEX IF NOT EXISTS idx_ip_infos_country_code ON ip_infos(country_code); 52 + CREATE INDEX IF NOT EXISTS idx_ip_infos_asn ON ip_infos(asn); 53 + SQL 54 + 55 + if [ $? -eq 0 ]; then 56 + echo -e "${GREEN}✓ ip_infos table ready${NC}" 57 + else 58 + echo -e "${RED}✗ Failed to create table${NC}" 59 + exit 1 60 + fi 61 + echo "" 62 + 63 + # Count how many endpoints have IP info 64 + echo -e "${YELLOW}Checking existing data...${NC}" 65 + ENDPOINT_COUNT=$(psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -t -c \ 66 + "SELECT COUNT(*) FROM endpoints WHERE ip IS NOT NULL AND ip != '' AND ip_info IS NOT NULL;") 67 + echo -e "Endpoints with IP info: ${GREEN}${ENDPOINT_COUNT}${NC}" 68 + 69 + EXISTING_IP_COUNT=$(psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -t -c \ 70 + "SELECT COUNT(*) FROM ip_infos;") 71 + echo -e "Existing IPs in ip_infos table: ${GREEN}${EXISTING_IP_COUNT}${NC}" 72 + echo "" 73 + 74 + # Migrate data 75 + echo -e "${YELLOW}Migrating IP info data...${NC}" 76 + psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" << 'SQL' 77 + -- Migrate IP info from endpoints to ip_infos 78 + -- Only insert IPs that don't already exist in ip_infos 79 + INSERT INTO ip_infos ( 80 + ip, 81 + city, 82 + country, 83 + country_code, 84 + asn, 85 + asn_org, 86 + is_datacenter, 87 + is_vpn, 88 + latitude, 89 + longitude, 90 + raw_data, 91 + fetched_at, 92 + updated_at 93 + ) 94 + SELECT DISTINCT ON (e.ip) 95 + e.ip, 96 + e.ip_info->'location'->>'city' AS city, 97 + e.ip_info->'location'->>'country' AS country, 98 + e.ip_info->'location'->>'country_code' AS country_code, 99 + (e.ip_info->'asn'->>'asn')::INTEGER AS asn, 100 + e.ip_info->'asn'->>'org' AS asn_org, 101 + -- Check if company type is "hosting" for datacenter detection 102 + CASE 103 + WHEN e.ip_info->'company'->>'type' = 'hosting' THEN true 104 + ELSE false 105 + END AS is_datacenter, 106 + -- Check VPN from security field 107 + COALESCE((e.ip_info->'security'->>'vpn')::BOOLEAN, false) AS is_vpn, 108 + -- Latitude and longitude 109 + (e.ip_info->'location'->>'latitude')::REAL AS latitude, 110 + (e.ip_info->'location'->>'longitude')::REAL AS longitude, 111 + -- Store full raw data 112 + e.ip_info AS raw_data, 113 + COALESCE(e.updated_at, CURRENT_TIMESTAMP) AS fetched_at, 114 + CURRENT_TIMESTAMP AS updated_at 115 + FROM endpoints e 116 + WHERE 117 + e.ip IS NOT NULL 118 + AND e.ip != '' 119 + AND e.ip_info IS NOT NULL 120 + AND NOT EXISTS ( 121 + SELECT 1 FROM ip_infos WHERE ip_infos.ip = e.ip 122 + ) 123 + ORDER BY e.ip, e.updated_at DESC NULLS LAST; 124 + SQL 125 + 126 + if [ $? -eq 0 ]; then 127 + echo -e "${GREEN}✓ Data migration completed${NC}" 128 + else 129 + echo -e "${RED}✗ Migration failed${NC}" 130 + exit 1 131 + fi 132 + echo "" 133 + 134 + # Show results 135 + echo -e "${YELLOW}Migration summary:${NC}" 136 + NEW_IP_COUNT=$(psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -t -c \ 137 + "SELECT COUNT(*) FROM ip_infos;") 138 + MIGRATED=$((NEW_IP_COUNT - EXISTING_IP_COUNT)) 139 + echo -e "Total IPs now in ip_infos: ${GREEN}${NEW_IP_COUNT}${NC}" 140 + echo -e "Newly migrated: ${GREEN}${MIGRATED}${NC}" 141 + echo "" 142 + 143 + # Show sample data 144 + echo -e "${YELLOW}Sample migrated data:${NC}" 145 + psql -h "$DB_HOST" -ps "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -c \ 146 + "SELECT ip, city, country, country_code, asn, is_datacenter, is_vpn FROM ip_infos LIMIT 5;" 147 + echo "" 148 + 149 + # Optional: Drop old columns (commented out for safety) 150 + echo -e "${YELLOW}Cleanup options:${NC}" 151 + echo -e "To remove old ip_info column from endpoints table, run:" 152 + echo -e "${RED} ALTER TABLE endpoints DROP COLUMN IF EXISTS ip_info;${NC}" 153 + echo -e "To remove old user_count column from endpoints table, run:" 154 + echo -e "${RED} ALTER TABLE endpoints DROP COLUMN IF EXISTS user_count;${NC}" 155 + echo "" 156 + 157 + echo -e "${GREEN}=== Migration Complete ===${NC}" 158 + 159 + # Unset password 160 + unset PGPASSWORD