+1
-1
config.yaml
+1
-1
config.yaml
+21
-39
internal/api/handlers.go
+21
-39
internal/api/handlers.go
···
283
283
284
284
// Add full IP info
285
285
if pds.IPInfo != nil {
286
-
ipInfoMap := make(map[string]interface{})
287
-
288
-
if pds.IP != "" {
289
-
ipInfoMap["ip"] = pds.IP
290
-
}
291
-
if pds.IPInfo.City != "" {
292
-
ipInfoMap["city"] = pds.IPInfo.City
293
-
}
294
-
if pds.IPInfo.Country != "" {
295
-
ipInfoMap["country"] = pds.IPInfo.Country
296
-
}
297
-
if pds.IPInfo.CountryCode != "" {
298
-
ipInfoMap["country_code"] = pds.IPInfo.CountryCode
299
-
}
300
-
if pds.IPInfo.ASN > 0 {
301
-
ipInfoMap["asn"] = pds.IPInfo.ASN
302
-
}
303
-
if pds.IPInfo.ASNOrg != "" {
304
-
ipInfoMap["asn_org"] = pds.IPInfo.ASNOrg
305
-
}
306
-
ipInfoMap["is_datacenter"] = pds.IPInfo.IsDatacenter
307
-
ipInfoMap["is_vpn"] = pds.IPInfo.IsVPN
308
-
309
-
if pds.IPInfo.Latitude != 0 || pds.IPInfo.Longitude != 0 {
310
-
ipInfoMap["latitude"] = pds.IPInfo.Latitude
311
-
ipInfoMap["longitude"] = pds.IPInfo.Longitude
312
-
}
313
-
314
-
if len(ipInfoMap) > 0 {
315
-
response["ip_info"] = ipInfoMap
316
-
}
286
+
response["ip_info"] = pds.IPInfo // This now includes raw_data
317
287
}
318
288
319
289
return response
···
332
302
scanMap["response_time"] = scan.ResponseTime
333
303
}
334
304
305
+
// NEW: Use the top-level UserCount field first
306
+
if scan.UserCount > 0 {
307
+
scanMap["user_count"] = scan.UserCount
308
+
} else if scan.ScanData != nil && scan.ScanData.Metadata != nil {
309
+
// Fallback to metadata for older scans
310
+
if userCount, ok := scan.ScanData.Metadata["user_count"].(int); ok {
311
+
scanMap["user_count"] = userCount
312
+
} else if userCount, ok := scan.ScanData.Metadata["user_count"].(float64); ok {
313
+
scanMap["user_count"] = int(userCount)
314
+
}
315
+
}
316
+
335
317
if scan.ScanData != nil {
336
318
// Metadata is already map[string]interface{}, no type assertion needed
337
-
if scan.ScanData.Metadata != nil {
338
-
// Extract user_count from metadata
339
-
if userCount, ok := scan.ScanData.Metadata["user_count"].(int); ok {
340
-
scanMap["user_count"] = userCount
341
-
} else if userCount, ok := scan.ScanData.Metadata["user_count"].(float64); ok {
342
-
scanMap["user_count"] = int(userCount)
343
-
}
344
-
}
319
+
// if scan.ScanData.Metadata != nil {
320
+
// // Extract user_count from metadata
321
+
// if userCount, ok := scan.ScanData.Metadata["user_count"].(int); ok {
322
+
// scanMap["user_count"] = userCount
323
+
// } else if userCount, ok := scan.ScanData.Metadata["user_count"].(float64); ok {
324
+
// scanMap["user_count"] = int(userCount)
325
+
// }
326
+
// } -- OLD Block (replaced by above)
345
327
346
328
// Include DID count if available
347
329
if scan.ScanData.DIDCount > 0 {
+1
-1
internal/api/server.go
+1
-1
internal/api/server.go
···
62
62
63
63
// NEW: PDS-specific endpoints (virtual, created via JOINs)
64
64
api.HandleFunc("/pds", s.handleGetPDSList).Methods("GET")
65
-
api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET")
66
65
api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET")
66
+
api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET")
67
67
68
68
// PLC Bundle routes
69
69
api.HandleFunc("/plc/bundles", s.handleGetPLCBundles).Methods("GET")
+21
-5
internal/pds/scanner.go
+21
-5
internal/pds/scanner.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"math/rand"
6
7
"sync"
7
8
"time"
8
9
···
41
42
return err
42
43
}
43
44
44
-
log.Info("Scanning %d PDS servers...", len(servers))
45
+
if len(servers) > 0 {
46
+
// Create a new random source to avoid using the global one
47
+
r := rand.New(rand.NewSource(time.Now().UnixNano()))
48
+
// Shuffle the servers slice in place
49
+
r.Shuffle(len(servers), func(i, j int) {
50
+
servers[i], servers[j] = servers[j], servers[i]
51
+
})
52
+
log.Info("Randomized scan order for %d PDS servers...", len(servers))
53
+
} else {
54
+
log.Info("Scanning 0 PDS servers...")
55
+
return nil // No need to continue if there are no servers
56
+
}
45
57
46
58
// Worker pool
47
59
jobs := make(chan *storage.Endpoint, len(servers))
···
93
105
}
94
106
95
107
// Update IP immediately
96
-
s.db.UpdateEndpointIP(ctx, ep.ID, ip, time.Now())
108
+
s.db.UpdateEndpointIP(ctx, ep.ID, ip, time.Now().UTC())
97
109
98
110
// STEP 2: Health check
99
111
available, responseTime, err := s.client.CheckHealth(ctx, ep.Endpoint)
···
141
153
Metadata: make(map[string]interface{}),
142
154
}
143
155
156
+
var userCount int64 // NEW: Declare user count
157
+
144
158
// Add PDS-specific metadata
145
159
if result.Status == storage.EndpointStatusOnline {
146
-
scanData.Metadata["user_count"] = len(result.DIDs)
160
+
userCount = int64(len(result.DIDs)) // NEW: Get user count
161
+
scanData.Metadata["user_count"] = userCount // Keep in JSON for completeness
147
162
if result.Description != nil {
148
163
scanData.Metadata["server_info"] = result.Description
149
164
}
···
159
174
EndpointID: endpointID,
160
175
Status: result.Status,
161
176
ResponseTime: result.ResponseTime.Seconds() * 1000, // Convert to ms
177
+
UserCount: userCount, // NEW: Set the top-level field
162
178
ScanData: scanData,
163
-
ScannedAt: time.Now(),
179
+
ScannedAt: time.Now().UTC(),
164
180
}
165
181
166
182
if err := s.db.SaveEndpointScan(ctx, scan); err != nil {
···
170
186
// Update endpoint status
171
187
update := &storage.EndpointUpdate{
172
188
Status: result.Status,
173
-
LastChecked: time.Now(),
189
+
LastChecked: time.Now().UTC(),
174
190
ResponseTime: result.ResponseTime.Seconds() * 1000,
175
191
}
176
192
+1
-1
internal/plc/bundle.go
+1
-1
internal/plc/bundle.go
+2
-2
internal/plc/scanner.go
+2
-2
internal/plc/scanner.go
···
530
530
return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
531
531
Source: "plc_directory",
532
532
LastBundleNumber: m.currentBundle - 1,
533
-
LastScanTime: time.Now(),
533
+
LastScanTime: time.Now().UTC(),
534
534
RecordsProcessed: cursor.RecordsProcessed + m.totalProcessed,
535
535
})
536
536
}
···
539
539
return s.db.UpdateScanCursor(ctx, &storage.ScanCursor{
540
540
Source: "plc_directory",
541
541
LastBundleNumber: bundle,
542
-
LastScanTime: time.Now(),
542
+
LastScanTime: time.Now().UTC(),
543
543
RecordsProcessed: totalProcessed,
544
544
})
545
545
}
+80
-58
internal/storage/postgres.go
+80
-58
internal/storage/postgres.go
···
94
94
CREATE INDEX IF NOT EXISTS idx_ip_infos_asn ON ip_infos(asn);
95
95
96
96
-- Endpoint scans (renamed from pds_scans)
97
-
CREATE TABLE IF NOT EXISTS endpoint_scans (
98
-
id BIGSERIAL PRIMARY KEY,
99
-
endpoint_id BIGINT NOT NULL,
100
-
status INTEGER NOT NULL,
101
-
response_time DOUBLE PRECISION,
102
-
scan_data JSONB,
103
-
scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
104
-
FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE
105
-
);
97
+
CREATE TABLE IF NOT EXISTS endpoint_scans (
98
+
id BIGSERIAL PRIMARY KEY,
99
+
endpoint_id BIGINT NOT NULL,
100
+
status INTEGER NOT NULL,
101
+
response_time DOUBLE PRECISION,
102
+
user_count BIGINT, -- NEW: Add user_count column
103
+
scan_data JSONB,
104
+
scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
105
+
FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE
106
+
);
106
107
107
-
CREATE INDEX IF NOT EXISTS idx_endpoint_scans_endpoint_status_scanned ON endpoint_scans(endpoint_id, status, scanned_at DESC);
108
-
CREATE INDEX IF NOT EXISTS idx_endpoint_scans_scanned_at ON endpoint_scans(scanned_at);
108
+
CREATE INDEX IF NOT EXISTS idx_endpoint_scans_endpoint_status_scanned ON endpoint_scans(endpoint_id, status, scanned_at DESC);
109
+
CREATE INDEX IF NOT EXISTS idx_endpoint_scans_scanned_at ON endpoint_scans(scanned_at);
110
+
CREATE INDEX IF NOT EXISTS idx_endpoint_scans_user_count ON endpoint_scans(user_count DESC NULLS LAST); -- NEW: Index for sorting
109
111
110
-
CREATE TABLE IF NOT EXISTS plc_metrics (
112
+
CREATE TABLE IF NOT EXISTS plc_metrics (
111
113
id BIGSERIAL PRIMARY KEY,
112
114
total_dids BIGINT,
113
115
total_pds BIGINT,
···
337
339
SET ip = $1, ip_resolved_at = $2, updated_at = $3
338
340
WHERE id = $4
339
341
`
340
-
_, err := p.db.ExecContext(ctx, query, ip, resolvedAt, time.Now(), endpointID)
342
+
_, err := p.db.ExecContext(ctx, query, ip, resolvedAt, time.Now().UTC(), endpointID)
341
343
return err
342
344
}
343
345
···
350
352
}
351
353
352
354
query := `
353
-
INSERT INTO endpoint_scans (endpoint_id, status, response_time, scan_data, scanned_at)
354
-
VALUES ($1, $2, $3, $4, $5)
355
-
`
356
-
_, err := p.db.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scanDataJSON, scan.ScannedAt)
355
+
INSERT INTO endpoint_scans (endpoint_id, status, response_time, user_count, scan_data, scanned_at)
356
+
VALUES ($1, $2, $3, $4, $5, $6)
357
+
`
358
+
_, err := p.db.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scan.UserCount, scanDataJSON, scan.ScannedAt)
357
359
return err
358
360
}
359
361
360
362
func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) {
361
363
query := `
362
-
SELECT id, endpoint_id, status, response_time, scan_data, scanned_at
363
-
FROM endpoint_scans
364
-
WHERE endpoint_id = $1
364
+
SELECT id, endpoint_id, status, response_time, user_count, scan_data, scanned_at
365
+
FROM endpoint_scans
366
+
WHERE endpoint_id = $1
365
367
ORDER BY scanned_at DESC
366
368
LIMIT $2
367
369
`
···
376
378
for rows.Next() {
377
379
var scan EndpointScan
378
380
var responseTime sql.NullFloat64
381
+
var userCount sql.NullInt64 // NEW: Use NullInt64
379
382
var scanDataJSON []byte
380
383
381
-
err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &scanDataJSON, &scan.ScannedAt)
384
+
err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &userCount, &scanDataJSON, &scan.ScannedAt)
382
385
if err != nil {
383
386
return nil, err
384
387
}
···
387
390
scan.ResponseTime = responseTime.Float64
388
391
}
389
392
393
+
if userCount.Valid { // NEW: Check validity and set
394
+
scan.UserCount = userCount.Int64
395
+
}
396
+
390
397
if len(scanDataJSON) > 0 {
391
398
var scanData EndpointScanData
392
399
if err := json.Unmarshal(scanDataJSON, &scanData); err == nil {
···
404
411
405
412
func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) {
406
413
query := `
407
-
SELECT
408
-
e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip,
409
-
latest.user_count, latest.response_time, latest.scanned_at,
410
-
i.city, i.country, i.country_code, i.asn, i.asn_org,
414
+
SELECT
415
+
e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip,
416
+
latest.user_count, latest.response_time, latest.scanned_at,
417
+
i.city, i.country, i.country_code, i.asn, i.asn_org,
411
418
i.is_datacenter, i.is_vpn, i.latitude, i.longitude
412
419
FROM endpoints e
413
-
LEFT JOIN LATERAL (
414
-
SELECT
415
-
COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count,
416
-
response_time,
417
-
scanned_at
420
+
LEFT JOIN LATERAL (
421
+
SELECT
422
+
-- COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count, -- OLD
423
+
user_count, -- NEW: Use the column directly
424
+
response_time,
425
+
scanned_at
418
426
FROM endpoint_scans
419
427
WHERE endpoint_id = e.id AND status = 1
420
428
ORDER BY scanned_at DESC
···
523
531
524
532
func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) {
525
533
query := `
526
-
SELECT
527
-
e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip,
528
-
COALESCE((latest.scan_data->'metadata'->>'user_count')::int, 0) as user_count,
529
-
latest.response_time,
530
-
latest.scan_data->'metadata'->'server_info' as server_info,
531
-
latest.scanned_at,
532
-
i.city, i.country, i.country_code, i.asn, i.asn_org,
533
-
i.is_datacenter, i.is_vpn, i.latitude, i.longitude
534
-
FROM endpoints e
535
-
LEFT JOIN LATERAL (
536
-
SELECT scan_data, response_time, scanned_at
537
-
FROM endpoint_scans
538
-
WHERE endpoint_id = e.id
534
+
SELECT
535
+
e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip,
536
+
latest.user_count,
537
+
latest.response_time,
538
+
latest.scan_data->'metadata'->'server_info' as server_info,
539
+
latest.scanned_at,
540
+
i.city, i.country, i.country_code, i.asn, i.asn_org,
541
+
i.is_datacenter, i.is_vpn, i.latitude, i.longitude,
542
+
i.raw_data -- NEW: Select the raw_data column
543
+
FROM endpoints e
544
+
LEFT JOIN LATERAL (
545
+
SELECT scan_data, response_time, scanned_at, user_count -- NEW: Select user_count
546
+
FROM endpoint_scans
547
+
WHERE endpoint_id = e.id
539
548
ORDER BY scanned_at DESC
540
549
LIMIT 1
541
550
) latest ON true
···
552
561
var responseTime sql.NullFloat64
553
562
var serverInfoJSON []byte
554
563
var scannedAt sql.NullTime
564
+
var rawDataJSON []byte // NEW: Add a variable for raw_data
555
565
556
566
err := p.db.QueryRowContext(ctx, query, endpoint).Scan(
557
567
&detail.ID, &detail.Endpoint, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip,
558
568
&userCount, &responseTime, &serverInfoJSON, &scannedAt,
559
569
&city, &country, &countryCode, &asn, &asnOrg,
560
570
&isDatacenter, &isVPN, &lat, &lon,
571
+
&rawDataJSON, // NEW: Scan into the variable
561
572
)
562
573
if err != nil {
563
574
return nil, err
···
600
611
IsVPN: isVPN.Bool,
601
612
Latitude: float32(lat.Float64),
602
613
Longitude: float32(lon.Float64),
614
+
// RawData is unmarshaled below
615
+
}
616
+
617
+
// NEW: Unmarshal the raw_data JSON
618
+
if len(rawDataJSON) > 0 {
619
+
if err := json.Unmarshal(rawDataJSON, &detail.IPInfo.RawData); err != nil {
620
+
// Log the error but don't fail the request
621
+
fmt.Printf("Warning: failed to unmarshal raw_data for IP %s: %v\n", ip.String, err)
622
+
}
603
623
}
604
624
}
605
625
···
609
629
func (p *PostgresDB) GetPDSStats(ctx context.Context) (*PDSStats, error) {
610
630
// PDS stats - aggregate from latest scans
611
631
query := `
612
-
WITH latest_scans AS (
613
-
SELECT DISTINCT ON (endpoint_id)
614
-
endpoint_id,
615
-
COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count,
616
-
status
617
-
FROM endpoint_scans
618
-
WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds')
632
+
WITH latest_scans AS (
633
+
SELECT DISTINCT ON (endpoint_id)
634
+
endpoint_id,
635
+
-- COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count, -- OLD
636
+
user_count, -- NEW: Use the column directly
637
+
status
638
+
FROM endpoint_scans
639
+
WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds')
619
640
ORDER BY endpoint_id, scanned_at DESC
620
641
)
621
642
SELECT
···
681
702
682
703
// Get total DIDs from latest PDS scans
683
704
didQuery := `
684
-
WITH latest_pds_scans AS (
685
-
SELECT DISTINCT ON (endpoint_id)
686
-
endpoint_id,
687
-
COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count
688
-
FROM endpoint_scans
689
-
WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds')
690
-
ORDER BY endpoint_id, scanned_at DESC
705
+
WITH latest_pds_scans AS (
706
+
SELECT DISTINCT ON (endpoint_id)
707
+
endpoint_id,
708
+
-- COALESCE((scan_data->'metadata'->>'user_count')::int, 0) as user_count -- OLD
709
+
user_count -- NEW: Use the column directly
710
+
FROM endpoint_scans
711
+
WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds')
712
+
ORDER BY endpoint_id, scanned_at DESC
691
713
)
692
714
SELECT SUM(user_count) FROM latest_pds_scans
693
715
`
···
733
755
fetched_at = EXCLUDED.fetched_at,
734
756
updated_at = CURRENT_TIMESTAMP
735
757
`
736
-
_, err := p.db.ExecContext(ctx, query, ip, city, country, countryCode, asn, asnOrg, isDatacenter, isVPN, lat, lon, rawDataJSON, time.Now())
758
+
_, err := p.db.ExecContext(ctx, query, ip, city, country, countryCode, asn, asnOrg, isDatacenter, isVPN, lat, lon, rawDataJSON, time.Now().UTC())
737
759
return err
738
760
}
739
761
+14
-13
internal/storage/types.go
+14
-13
internal/storage/types.go
···
50
50
EndpointID int64
51
51
Status int
52
52
ResponseTime float64
53
+
UserCount int64
53
54
ScanData *EndpointScanData
54
55
ScannedAt time.Time
55
56
}
···
161
162
162
163
// IPInfo represents IP information (stored with IP as primary key)
163
164
type IPInfo struct {
164
-
IP string
165
-
City string
166
-
Country string
167
-
CountryCode string
168
-
ASN int
169
-
ASNOrg string
170
-
IsDatacenter bool
171
-
IsVPN bool
172
-
Latitude float32
173
-
Longitude float32
174
-
RawData map[string]interface{}
175
-
FetchedAt time.Time
176
-
UpdatedAt time.Time
165
+
IP string `json:"ip"`
166
+
City string `json:"city,omitempty"`
167
+
Country string `json:"country,omitempty"`
168
+
CountryCode string `json:"country_code,omitempty"`
169
+
ASN int `json:"asn,omitempty"`
170
+
ASNOrg string `json:"asn_org,omitempty"`
171
+
IsDatacenter bool `json:"is_datacenter"`
172
+
IsVPN bool `json:"is_vpn"`
173
+
Latitude float32 `json:"latitude,omitempty"`
174
+
Longitude float32 `json:"longitude,omitempty"`
175
+
RawData map[string]interface{} `json:"raw_data,omitempty"`
176
+
FetchedAt time.Time `json:"fetched_at"`
177
+
UpdatedAt time.Time `json:"updated_at"`
177
178
}
178
179
179
180
// PDSListItem is a virtual type created by JOIN for /pds endpoint