+1
-1
Makefile
+1
-1
Makefile
+159
cmd/atscand/main.go
+159
cmd/atscand/main.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"flag"
6
+
"fmt"
7
+
"os"
8
+
"os/signal"
9
+
"syscall"
10
+
"time"
11
+
12
+
"github.com/atscan/atscand/internal/api"
13
+
"github.com/atscan/atscand/internal/config"
14
+
"github.com/atscan/atscand/internal/log"
15
+
"github.com/atscan/atscand/internal/pds"
16
+
"github.com/atscan/atscand/internal/plc"
17
+
"github.com/atscan/atscand/internal/storage"
18
+
"github.com/atscan/atscand/internal/worker"
19
+
)
20
+
21
+
const VERSION = "1.0.0"
22
+
23
+
func main() {
24
+
configPath := flag.String("config", "config.yaml", "path to config file")
25
+
verbose := flag.Bool("verbose", false, "enable verbose logging")
26
+
flag.Parse()
27
+
28
+
// Load configuration
29
+
cfg, err := config.Load(*configPath)
30
+
if err != nil {
31
+
fmt.Fprintf(os.Stderr, "Failed to load config: %v\n", err)
32
+
os.Exit(1)
33
+
}
34
+
35
+
// Override verbose setting if flag is provided
36
+
if *verbose {
37
+
cfg.API.Verbose = true
38
+
}
39
+
40
+
// Initialize logger
41
+
log.Init(cfg.API.Verbose)
42
+
43
+
// Print banner
44
+
log.Banner(VERSION)
45
+
46
+
// Print configuration summary
47
+
log.PrintConfig(map[string]string{
48
+
"Database Type": cfg.Database.Type,
49
+
"Database Path": cfg.Database.Path, // Will be auto-redacted
50
+
"PLC Directory": cfg.PLC.DirectoryURL,
51
+
"PLC Scan Interval": cfg.PLC.ScanInterval.String(),
52
+
"PLC Bundle Dir": cfg.PLC.BundleDir,
53
+
"PLC Cache": fmt.Sprintf("%v", cfg.PLC.UseCache),
54
+
"PLC Index DIDs": fmt.Sprintf("%v", cfg.PLC.IndexDIDs),
55
+
"PDS Scan Interval": cfg.PDS.ScanInterval.String(),
56
+
"PDS Workers": fmt.Sprintf("%d", cfg.PDS.Workers),
57
+
"PDS Timeout": cfg.PDS.Timeout.String(),
58
+
"API Host": cfg.API.Host,
59
+
"API Port": fmt.Sprintf("%d", cfg.API.Port),
60
+
"Verbose Logging": fmt.Sprintf("%v", cfg.API.Verbose),
61
+
})
62
+
63
+
// Initialize database using factory pattern
64
+
db, err := storage.NewDatabase(cfg.Database.Type, cfg.Database.Path)
65
+
if err != nil {
66
+
log.Fatal("Failed to initialize database: %v", err)
67
+
}
68
+
defer func() {
69
+
log.Info("Closing database connection...")
70
+
db.Close()
71
+
}()
72
+
73
+
// Set scan retention from config
74
+
if cfg.PDS.ScanRetention > 0 {
75
+
db.SetScanRetention(cfg.PDS.ScanRetention)
76
+
log.Verbose("Scan retention set to %d scans per endpoint", cfg.PDS.ScanRetention)
77
+
}
78
+
79
+
// Run migrations
80
+
if err := db.Migrate(); err != nil {
81
+
log.Fatal("Failed to run migrations: %v", err)
82
+
}
83
+
84
+
ctx, cancel := context.WithCancel(context.Background())
85
+
defer cancel()
86
+
87
+
// Initialize workers
88
+
log.Info("Initializing scanners...")
89
+
90
+
bundleManager, err := plc.NewBundleManager(cfg.PLC.BundleDir, cfg.PLC.DirectoryURL, db, cfg.PLC.IndexDIDs)
91
+
if err != nil {
92
+
log.Fatal("Failed to create bundle manager: %v", err)
93
+
}
94
+
defer bundleManager.Close()
95
+
log.Verbose("✓ Bundle manager initialized (shared)")
96
+
97
+
plcScanner := plc.NewScanner(db, cfg.PLC, bundleManager)
98
+
defer plcScanner.Close()
99
+
log.Verbose("✓ PLC scanner initialized")
100
+
101
+
pdsScanner := pds.NewScanner(db, cfg.PDS)
102
+
log.Verbose("✓ PDS scanner initialized")
103
+
104
+
scheduler := worker.NewScheduler()
105
+
106
+
// Schedule PLC directory scan
107
+
scheduler.AddJob("plc_scan", cfg.PLC.ScanInterval, func() {
108
+
if err := plcScanner.Scan(ctx); err != nil {
109
+
log.Error("PLC scan error: %v", err)
110
+
}
111
+
})
112
+
log.Verbose("✓ PLC scan job scheduled (interval: %s)", cfg.PLC.ScanInterval)
113
+
114
+
// Schedule PDS availability checks
115
+
scheduler.AddJob("pds_scan", cfg.PDS.ScanInterval, func() {
116
+
if err := pdsScanner.ScanAll(ctx); err != nil {
117
+
log.Error("PDS scan error: %v", err)
118
+
}
119
+
})
120
+
log.Verbose("✓ PDS scan job scheduled (interval: %s)", cfg.PDS.ScanInterval)
121
+
122
+
// Start API server
123
+
log.Info("Starting API server on %s:%d...", cfg.API.Host, cfg.API.Port)
124
+
apiServer := api.NewServer(db, cfg.API, cfg.PLC, bundleManager)
125
+
go func() {
126
+
if err := apiServer.Start(); err != nil {
127
+
log.Fatal("API server error: %v", err)
128
+
}
129
+
}()
130
+
131
+
// Give the API server a moment to start
132
+
time.Sleep(100 * time.Millisecond)
133
+
log.Info("✓ API server started successfully")
134
+
log.Info("")
135
+
log.Info("🚀 ATScanner is running!")
136
+
log.Info(" API available at: http://%s:%d", cfg.API.Host, cfg.API.Port)
137
+
log.Info(" Press Ctrl+C to stop")
138
+
log.Info("")
139
+
140
+
// Start scheduler
141
+
scheduler.Start(ctx)
142
+
143
+
// Wait for interrupt
144
+
sigChan := make(chan os.Signal, 1)
145
+
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
146
+
<-sigChan
147
+
148
+
log.Info("")
149
+
log.Info("Shutting down gracefully...")
150
+
cancel()
151
+
152
+
log.Info("Stopping API server...")
153
+
apiServer.Shutdown(context.Background())
154
+
155
+
log.Info("Waiting for active tasks to complete...")
156
+
time.Sleep(2 * time.Second)
157
+
158
+
log.Info("✓ Shutdown complete. Goodbye!")
159
+
}
+168
cmd/import-labels/main.go
+168
cmd/import-labels/main.go
···
1
+
package main
2
+
3
+
import (
4
+
"bufio"
5
+
"flag"
6
+
"fmt"
7
+
"os"
8
+
"path/filepath"
9
+
"strings"
10
+
"time"
11
+
12
+
"github.com/klauspost/compress/zstd"
13
+
"gopkg.in/yaml.v3"
14
+
)
15
+
16
+
type Config struct {
17
+
PLC struct {
18
+
BundleDir string `yaml:"bundle_dir"`
19
+
} `yaml:"plc"`
20
+
}
21
+
22
+
var CONFIG_FILE = "config.yaml"
23
+
24
+
// ---------------------
25
+
26
+
func main() {
27
+
// Define a new flag for changing the directory
28
+
workDir := flag.String("C", ".", "Change to this directory before running (for finding config.yaml)")
29
+
flag.Usage = func() { // Custom usage message
30
+
fmt.Fprintf(os.Stderr, "Usage: ... | %s [-C /path/to/dir]\n", os.Args[0])
31
+
fmt.Fprintln(os.Stderr, "Reads sorted CSV from stdin and writes compressed bundle files.")
32
+
flag.PrintDefaults()
33
+
}
34
+
flag.Parse() // Parse all defined flags
35
+
36
+
// Change directory if the flag was used
37
+
if *workDir != "." {
38
+
fmt.Printf("Changing working directory to %s...\n", *workDir)
39
+
if err := os.Chdir(*workDir); err != nil {
40
+
fmt.Fprintf(os.Stderr, "Error changing directory to %s: %v\n", *workDir, err)
41
+
os.Exit(1)
42
+
}
43
+
}
44
+
45
+
// --- REMOVED UNUSED CODE ---
46
+
// The csvFilePath variable and NArg check were removed
47
+
// as the script now reads from stdin.
48
+
// ---------------------------
49
+
50
+
fmt.Println("========================================")
51
+
fmt.Println("PLC Operation Labels Import (Go STDIN)")
52
+
fmt.Println("========================================")
53
+
54
+
// 1. Read config (will now read from the new CWD)
55
+
fmt.Printf("Loading config from %s...\n", CONFIG_FILE)
56
+
configData, err := os.ReadFile(CONFIG_FILE)
57
+
if err != nil {
58
+
fmt.Fprintf(os.Stderr, "Error reading config file: %v\n", err)
59
+
os.Exit(1)
60
+
}
61
+
62
+
var config Config
63
+
if err := yaml.Unmarshal(configData, &config); err != nil {
64
+
fmt.Fprintf(os.Stderr, "Error parsing config.yaml: %v\n", err)
65
+
os.Exit(1)
66
+
}
67
+
68
+
if config.PLC.BundleDir == "" {
69
+
fmt.Fprintln(os.Stderr, "Error: Could not parse plc.bundle_dir from config.yaml")
70
+
os.Exit(1)
71
+
}
72
+
73
+
finalLabelsDir := filepath.Join(config.PLC.BundleDir, "labels")
74
+
if err := os.MkdirAll(finalLabelsDir, 0755); err != nil {
75
+
fmt.Fprintf(os.Stderr, "Error creating output directory: %v\n", err)
76
+
os.Exit(1)
77
+
}
78
+
79
+
fmt.Printf("Output Dir: %s\n", finalLabelsDir)
80
+
fmt.Println("Waiting for sorted data from stdin...")
81
+
82
+
// 2. Process sorted data from stdin
83
+
// This script *requires* the input to be sorted by bundle number.
84
+
85
+
var currentWriter *zstd.Encoder
86
+
var currentFile *os.File
87
+
var lastBundleKey string = ""
88
+
89
+
lineCount := 0
90
+
startTime := time.Now()
91
+
92
+
scanner := bufio.NewScanner(os.Stdin)
93
+
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
94
+
95
+
for scanner.Scan() {
96
+
line := scanner.Text()
97
+
lineCount++
98
+
99
+
parts := strings.SplitN(line, ",", 2)
100
+
if len(parts) < 1 {
101
+
continue // Skip empty/bad lines
102
+
}
103
+
104
+
bundleNumStr := parts[0]
105
+
bundleKey := fmt.Sprintf("%06s", bundleNumStr) // Pad with zeros
106
+
107
+
// If the bundle key is new, close the old writer and open a new one.
108
+
if bundleKey != lastBundleKey {
109
+
// Close the previous writer/file
110
+
if currentWriter != nil {
111
+
if err := currentWriter.Close(); err != nil {
112
+
fmt.Fprintf(os.Stderr, "Error closing writer for %s: %v\n", lastBundleKey, err)
113
+
}
114
+
currentFile.Close()
115
+
}
116
+
117
+
// Start the new one
118
+
fmt.Printf(" -> Writing bundle %s\n", bundleKey)
119
+
outPath := filepath.Join(finalLabelsDir, fmt.Sprintf("%s.csv.zst", bundleKey))
120
+
121
+
file, err := os.Create(outPath)
122
+
if err != nil {
123
+
fmt.Fprintf(os.Stderr, "Error creating file %s: %v\n", outPath, err)
124
+
os.Exit(1)
125
+
}
126
+
currentFile = file
127
+
128
+
writer, err := zstd.NewWriter(file)
129
+
if err != nil {
130
+
fmt.Fprintf(os.Stderr, "Error creating zstd writer: %v\n", err)
131
+
os.Exit(1)
132
+
}
133
+
currentWriter = writer
134
+
lastBundleKey = bundleKey
135
+
}
136
+
137
+
// Write the line to the currently active writer
138
+
if _, err := currentWriter.Write([]byte(line + "\n")); err != nil {
139
+
fmt.Fprintf(os.Stderr, "Error writing line: %v\n", err)
140
+
}
141
+
142
+
// Progress update
143
+
if lineCount%100000 == 0 {
144
+
elapsed := time.Since(startTime).Seconds()
145
+
rate := float64(lineCount) / elapsed
146
+
fmt.Printf(" ... processed %d lines (%.0f lines/sec)\n", lineCount, rate)
147
+
}
148
+
}
149
+
150
+
// 3. Close the very last writer
151
+
if currentWriter != nil {
152
+
if err := currentWriter.Close(); err != nil {
153
+
fmt.Fprintf(os.Stderr, "Error closing final writer: %v\n", err)
154
+
}
155
+
currentFile.Close()
156
+
}
157
+
158
+
if err := scanner.Err(); err != nil {
159
+
fmt.Fprintf(os.Stderr, "Error reading stdin: %v\n", err)
160
+
}
161
+
162
+
totalTime := time.Since(startTime)
163
+
fmt.Println("\n========================================")
164
+
fmt.Println("Import Summary")
165
+
fmt.Println("========================================")
166
+
fmt.Printf("✓ Import completed in %v\n", totalTime)
167
+
fmt.Printf("Total lines processed: %d\n", lineCount)
168
+
}
+2
-3
go.mod
+2
-3
go.mod
···
8
8
gopkg.in/yaml.v3 v3.0.1
9
9
)
10
10
11
-
require github.com/klauspost/compress v1.18.1 // indirect
11
+
require github.com/klauspost/compress v1.18.1
12
12
13
13
require (
14
-
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
15
-
github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7
16
14
github.com/gorilla/handlers v1.5.2
17
15
github.com/jackc/pgx/v5 v5.7.6
16
+
tangled.org/atscan.net/plcbundle v0.3.6
18
17
)
19
18
20
19
require (
+2
-4
go.sum
+2
-4
go.sum
···
1
-
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
2
-
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
3
-
github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7 h1:u5mCzLGQPSThUPjQnAn64xs3ZWuPltKpua1M+bMxtww=
4
-
github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs=
5
1
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
6
2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7
3
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
···
49
45
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
50
46
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
51
47
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
48
+
tangled.org/atscan.net/plcbundle v0.3.6 h1:8eSOxEwHRRT7cLhOTUxut80fYLAi+jodR9UTshofIvY=
49
+
tangled.org/atscan.net/plcbundle v0.3.6/go.mod h1:XUzSi6wmqAECCLThmuBTzYV5mEd0q/J1wE45cagoBqs=
+39
-36
internal/api/handlers.go
+39
-36
internal/api/handlers.go
···
15
15
"github.com/atscan/atscand/internal/monitor"
16
16
"github.com/atscan/atscand/internal/plc"
17
17
"github.com/atscan/atscand/internal/storage"
18
-
"github.com/atscan/plcbundle"
19
18
"github.com/gorilla/mux"
19
+
"tangled.org/atscan.net/plcbundle"
20
20
)
21
21
22
22
// ===== RESPONSE HELPERS =====
···
75
75
76
76
// ===== FORMATTING HELPERS =====
77
77
78
-
func formatBundleResponse(bundle *plcbundle.BundleMetadata) map[string]interface{} {
79
-
return map[string]interface{}{
80
-
"plc_bundle_number": bundle.BundleNumber,
81
-
"start_time": bundle.StartTime,
82
-
"end_time": bundle.EndTime,
83
-
"operation_count": plc.BUNDLE_SIZE,
84
-
"did_count": bundle.DIDCount, // Use DIDCount instead of len(DIDs)
85
-
"hash": bundle.Hash,
86
-
"compressed_hash": bundle.CompressedHash,
87
-
"compressed_size": bundle.CompressedSize,
88
-
"uncompressed_size": bundle.UncompressedSize,
89
-
"compression_ratio": float64(bundle.UncompressedSize) / float64(bundle.CompressedSize),
90
-
"cursor": bundle.Cursor,
91
-
"prev_bundle_hash": bundle.PrevBundleHash,
92
-
"created_at": bundle.CreatedAt,
93
-
}
94
-
}
95
-
96
78
func formatEndpointResponse(ep *storage.Endpoint) map[string]interface{} {
97
79
response := map[string]interface{}{
98
80
"id": ep.ID,
···
266
248
"endpoint": pds.Endpoint,
267
249
"discovered_at": pds.DiscoveredAt,
268
250
"status": statusToString(pds.Status),
251
+
"valid": pds.Valid, // NEW
269
252
}
270
253
271
254
// Add server_did if available
···
748
731
"end_time": meta.EndTime,
749
732
"operation_count": meta.OperationCount,
750
733
"did_count": meta.DIDCount,
751
-
"hash": meta.Hash,
734
+
"hash": meta.Hash, // Chain hash (primary)
735
+
"content_hash": meta.ContentHash, // Content hash
736
+
"parent": meta.Parent, // Parent chain hash
752
737
"compressed_hash": meta.CompressedHash,
753
738
"compressed_size": meta.CompressedSize,
754
739
"uncompressed_size": meta.UncompressedSize,
755
740
"compression_ratio": float64(meta.UncompressedSize) / float64(meta.CompressedSize),
756
741
"cursor": meta.Cursor,
757
-
"prev_bundle_hash": meta.PrevBundleHash,
758
742
"created_at": meta.CreatedAt,
759
743
}
760
744
}
···
780
764
"is_upcoming": true,
781
765
"status": "filling",
782
766
"operation_count": count,
767
+
"did_count": stats["did_count"],
783
768
"target_operation_count": 10000,
784
769
"progress_percent": float64(count) / 100.0,
785
770
"operations_needed": 10000 - count,
···
806
791
// Get previous bundle info
807
792
if bundleNum > 1 {
808
793
if prevBundle, err := s.bundleManager.GetBundleMetadata(bundleNum - 1); err == nil {
809
-
result["prev_bundle_hash"] = prevBundle.Hash
794
+
result["parent"] = prevBundle.Hash // Parent chain hash
810
795
result["cursor"] = prevBundle.EndTime.Format(time.RFC3339Nano)
811
796
}
812
797
}
···
1005
990
1006
991
response := make([]map[string]interface{}, len(bundles))
1007
992
for i, bundle := range bundles {
1008
-
response[i] = formatBundleResponse(bundle)
993
+
response[i] = formatBundleMetadata(bundle)
1009
994
}
1010
995
1011
996
resp.json(response)
···
1022
1007
lastBundle := stats["last_bundle"].(int64)
1023
1008
1024
1009
resp.json(map[string]interface{}{
1025
-
"plc_bundle_count": bundleCount,
1026
-
"last_bundle_number": lastBundle,
1027
-
"total_compressed_size": totalSize,
1028
-
"total_compressed_size_mb": float64(totalSize) / 1024 / 1024,
1029
-
"total_compressed_size_gb": float64(totalSize) / 1024 / 1024 / 1024,
1030
-
"total_uncompressed_size": totalUncompressedSize,
1031
-
"total_uncompressed_size_mb": float64(totalUncompressedSize) / 1024 / 1024,
1032
-
"total_uncompressed_size_gb": float64(totalUncompressedSize) / 1024 / 1024 / 1024,
1033
-
"overall_compression_ratio": float64(totalUncompressedSize) / float64(totalSize),
1010
+
"plc_bundle_count": bundleCount,
1011
+
"last_bundle_number": lastBundle,
1012
+
"total_compressed_size": totalSize,
1013
+
"total_uncompressed_size": totalUncompressedSize,
1014
+
"overall_compression_ratio": float64(totalUncompressedSize) / float64(totalSize),
1034
1015
})
1035
1016
}
1036
1017
···
1145
1126
break
1146
1127
}
1147
1128
1148
-
if bundle.PrevBundleHash != prevBundle.Hash {
1129
+
if bundle.Parent != prevBundle.Hash {
1149
1130
valid = false
1150
1131
brokenAt = i
1151
-
errorMsg = fmt.Sprintf("Chain broken: bundle %06d prev_hash doesn't match bundle %06d hash", i, i-1)
1132
+
errorMsg = fmt.Sprintf("Chain broken: bundle %06d parent doesn't match bundle %06d hash", i, i-1)
1152
1133
break
1153
1134
}
1154
1135
}
···
1191
1172
"chain_start_time": firstBundle.StartTime,
1192
1173
"chain_end_time": lastBundleData.EndTime,
1193
1174
"chain_head_hash": lastBundleData.Hash,
1194
-
"first_prev_hash": firstBundle.PrevBundleHash,
1195
-
"last_prev_hash": lastBundleData.PrevBundleHash,
1175
+
"first_parent": firstBundle.Parent,
1176
+
"last_parent": lastBundleData.Parent,
1196
1177
})
1197
1178
}
1198
1179
···
1499
1480
"tables": tableSizes,
1500
1481
"indexes": indexSizes,
1501
1482
"retrievedAt": time.Now().UTC(),
1483
+
})
1484
+
}
1485
+
1486
+
func (s *Server) handleGetBundleLabels(w http.ResponseWriter, r *http.Request) {
1487
+
resp := newResponse(w)
1488
+
1489
+
bundleNum, err := getBundleNumber(r)
1490
+
if err != nil {
1491
+
resp.error("invalid bundle number", http.StatusBadRequest)
1492
+
return
1493
+
}
1494
+
1495
+
labels, err := s.bundleManager.GetBundleLabels(r.Context(), bundleNum)
1496
+
if err != nil {
1497
+
resp.error(err.Error(), http.StatusInternalServerError)
1498
+
return
1499
+
}
1500
+
1501
+
resp.json(map[string]interface{}{
1502
+
"bundle": bundleNum,
1503
+
"count": len(labels),
1504
+
"labels": labels,
1502
1505
})
1503
1506
}
1504
1507
+1
internal/api/server.go
+1
internal/api/server.go
···
84
84
api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET")
85
85
api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET")
86
86
api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET")
87
+
api.HandleFunc("/plc/bundles/{number}/labels", s.handleGetBundleLabels).Methods("GET")
87
88
88
89
// PLC history/metrics
89
90
api.HandleFunc("/plc/history", s.handleGetPLCHistory).Methods("GET")
+44
-45
internal/pds/client.go
+44
-45
internal/pds/client.go
···
84
84
}
85
85
86
86
// DescribeServer fetches com.atproto.server.describeServer
87
-
func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, error) {
87
+
// Returns: description, responseTime, usedIP, error
88
+
func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, time.Duration, string, error) {
89
+
startTime := time.Now()
88
90
url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint)
89
91
90
-
//fmt.Println(url)
92
+
// Track which IP was used
93
+
var usedIP string
94
+
transport := &http.Transport{
95
+
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
96
+
conn, err := (&net.Dialer{
97
+
Timeout: 30 * time.Second,
98
+
KeepAlive: 30 * time.Second,
99
+
}).DialContext(ctx, network, addr)
100
+
101
+
if err == nil && conn != nil {
102
+
if remoteAddr := conn.RemoteAddr(); remoteAddr != nil {
103
+
if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok {
104
+
usedIP = tcpAddr.IP.String()
105
+
}
106
+
}
107
+
}
108
+
return conn, err
109
+
},
110
+
}
111
+
112
+
client := &http.Client{
113
+
Timeout: c.httpClient.Timeout,
114
+
Transport: transport,
115
+
}
91
116
92
117
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
93
118
if err != nil {
94
-
return nil, err
119
+
return nil, 0, "", err
95
120
}
96
121
97
-
resp, err := c.httpClient.Do(req)
122
+
resp, err := client.Do(req)
123
+
responseTime := time.Since(startTime)
124
+
98
125
if err != nil {
99
-
return nil, err
126
+
return nil, responseTime, usedIP, err
100
127
}
101
128
defer resp.Body.Close()
102
129
103
130
if resp.StatusCode != http.StatusOK {
104
-
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
131
+
return nil, responseTime, usedIP, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
105
132
}
106
133
107
134
var desc ServerDescription
108
135
if err := json.NewDecoder(resp.Body).Decode(&desc); err != nil {
109
-
return nil, err
136
+
return nil, responseTime, usedIP, err
110
137
}
111
138
112
-
return &desc, nil
139
+
return &desc, responseTime, usedIP, nil
113
140
}
114
141
115
142
// CheckHealth performs a basic health check, ensuring the endpoint returns JSON with a "version"
116
-
// Returns: available, responseTime, version, usedIP, error
117
-
func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, string, error) {
143
+
// Returns: available, responseTime, version, error
144
+
func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, error) {
118
145
startTime := time.Now()
119
146
120
147
url := fmt.Sprintf("%s/xrpc/_health", endpoint)
121
148
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
122
149
if err != nil {
123
-
return false, 0, "", "", err
124
-
}
125
-
126
-
// Create a custom dialer to track which IP was actually used
127
-
var usedIP string
128
-
transport := &http.Transport{
129
-
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
130
-
conn, err := (&net.Dialer{
131
-
Timeout: 30 * time.Second,
132
-
KeepAlive: 30 * time.Second,
133
-
}).DialContext(ctx, network, addr)
134
-
135
-
if err == nil && conn != nil {
136
-
if remoteAddr := conn.RemoteAddr(); remoteAddr != nil {
137
-
// Extract IP from "ip:port" format
138
-
if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok {
139
-
usedIP = tcpAddr.IP.String()
140
-
}
141
-
}
142
-
}
143
-
144
-
return conn, err
145
-
},
150
+
return false, 0, "", err
146
151
}
147
152
148
-
// Create a client with our custom transport
149
-
client := &http.Client{
150
-
Timeout: c.httpClient.Timeout,
151
-
Transport: transport,
152
-
}
153
-
154
-
resp, err := client.Do(req)
153
+
resp, err := c.httpClient.Do(req)
155
154
duration := time.Since(startTime)
156
155
157
156
if err != nil {
158
-
return false, duration, "", usedIP, err
157
+
return false, duration, "", err
159
158
}
160
159
defer resp.Body.Close()
161
160
162
161
if resp.StatusCode != http.StatusOK {
163
-
return false, duration, "", usedIP, fmt.Errorf("health check returned status %d", resp.StatusCode)
162
+
return false, duration, "", fmt.Errorf("health check returned status %d", resp.StatusCode)
164
163
}
165
164
166
165
// Decode the JSON response and check for "version"
···
169
168
}
170
169
171
170
if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil {
172
-
return false, duration, "", usedIP, fmt.Errorf("failed to decode health JSON: %w", err)
171
+
return false, duration, "", fmt.Errorf("failed to decode health JSON: %w", err)
173
172
}
174
173
175
174
if healthResponse.Version == "" {
176
-
return false, duration, "", usedIP, fmt.Errorf("health JSON response missing 'version' field")
175
+
return false, duration, "", fmt.Errorf("health JSON response missing 'version' field")
177
176
}
178
177
179
178
// All checks passed
180
-
return true, duration, healthResponse.Version, usedIP, nil
179
+
return true, duration, healthResponse.Version, nil
181
180
}
+31
-27
internal/pds/scanner.go
+31
-27
internal/pds/scanner.go
···
8
8
"sync/atomic"
9
9
"time"
10
10
11
-
"github.com/acarl005/stripansi"
12
11
"github.com/atscan/atscand/internal/config"
13
12
"github.com/atscan/atscand/internal/ipinfo"
14
13
"github.com/atscan/atscand/internal/log"
···
40
39
servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{
41
40
Type: "pds",
42
41
OnlyStale: true,
42
+
OnlyValid: true,
43
43
RecheckInterval: s.config.RecheckInterval,
44
44
})
45
45
if err != nil {
···
127
127
// STEP 1: Resolve IPs (both IPv4 and IPv6)
128
128
ips, err := ipinfo.ExtractIPsFromEndpoint(ep.Endpoint)
129
129
if err != nil {
130
-
// Mark as offline due to DNS failure
131
130
s.saveScanResult(ctx, ep.ID, &ScanResult{
132
131
Status: storage.EndpointStatusOffline,
133
132
ErrorMessage: fmt.Sprintf("DNS resolution failed: %v", err),
···
146
145
go s.updateIPInfoIfNeeded(ctx, ips.IPv6)
147
146
}
148
147
149
-
// STEP 2: Health check (now returns which IP was used)
150
-
available, responseTime, version, usedIP, err := s.client.CheckHealth(ctx, ep.Endpoint)
151
-
if err != nil || !available {
152
-
errMsg := "health check failed"
153
-
if err != nil {
154
-
errMsg = err.Error()
155
-
}
148
+
// STEP 2: Call describeServer (primary health check + metadata)
149
+
desc, descResponseTime, usedIP, err := s.client.DescribeServer(ctx, ep.Endpoint)
150
+
if err != nil {
156
151
s.saveScanResult(ctx, ep.ID, &ScanResult{
157
152
Status: storage.EndpointStatusOffline,
158
-
ResponseTime: responseTime,
159
-
ErrorMessage: errMsg,
160
-
UsedIP: usedIP, // Save even if failed
153
+
ResponseTime: descResponseTime,
154
+
ErrorMessage: fmt.Sprintf("describeServer failed: %v", err),
155
+
UsedIP: usedIP,
161
156
})
162
157
return
163
158
}
164
159
165
-
// STEP 3: Fetch PDS-specific data
166
-
desc, err := s.client.DescribeServer(ctx, ep.Endpoint)
167
-
if err != nil {
168
-
log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err)
169
-
} else if desc != nil && desc.DID != "" {
160
+
// Update server DID immediately
161
+
if desc.DID != "" {
170
162
s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID)
171
163
}
172
164
173
-
// Fetch repos with full info
165
+
// STEP 3: Call _health to get version
166
+
available, healthResponseTime, version, err := s.client.CheckHealth(ctx, ep.Endpoint)
167
+
if err != nil || !available {
168
+
log.Verbose("Warning: _health check failed for %s: %v", ep.Endpoint, err)
169
+
// Server is online (describeServer worked) but _health failed
170
+
// Continue with empty version
171
+
version = ""
172
+
}
173
+
174
+
// Calculate average response time from both calls
175
+
avgResponseTime := descResponseTime
176
+
if available {
177
+
avgResponseTime = (descResponseTime + healthResponseTime) / 2
178
+
}
179
+
180
+
// STEP 4: Fetch repos
174
181
repoList, err := s.client.ListRepos(ctx, ep.Endpoint)
175
182
if err != nil {
176
183
log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err)
177
184
repoList = []Repo{}
178
185
}
179
186
180
-
// Convert to DIDs for backward compatibility
187
+
// Convert to DIDs
181
188
dids := make([]string, len(repoList))
182
189
for i, repo := range repoList {
183
190
dids[i] = repo.DID
184
191
}
185
192
186
-
// STEP 4: SAVE scan result
193
+
// STEP 5: SAVE scan result
187
194
s.saveScanResult(ctx, ep.ID, &ScanResult{
188
195
Status: storage.EndpointStatusOnline,
189
-
ResponseTime: responseTime,
196
+
ResponseTime: avgResponseTime,
190
197
Description: desc,
191
198
DIDs: dids,
192
199
Version: version,
193
-
UsedIP: usedIP, // NEW: Save which IP was used
200
+
UsedIP: usedIP, // Only from describeServer
194
201
})
195
202
196
-
// Save repos in batches (only tracks changes)
203
+
// STEP 6: Save repos in batches (only tracks changes)
197
204
if len(repoList) > 0 {
198
-
batchSize := 200000
205
+
batchSize := 100_000
199
206
200
207
log.Verbose("Processing %d repos for %s (tracking changes only)", len(repoList), ep.Endpoint)
201
208
···
235
242
236
243
log.Verbose("✓ Processed %d repos for %s", len(repoList), ep.Endpoint)
237
244
}
238
-
239
-
// IP info fetch already started at the beginning (step 1.5)
240
-
// It will complete in the background
241
245
}
242
246
243
247
func (s *Scanner) saveScanResult(ctx context.Context, endpointID int64, result *ScanResult) {
+136
-1
internal/plc/manager.go
+136
-1
internal/plc/manager.go
···
2
2
3
3
import (
4
4
"context"
5
+
"encoding/csv"
5
6
"fmt"
6
7
"io"
8
+
"os"
9
+
"path/filepath"
7
10
"sort"
11
+
"strconv"
12
+
"strings"
8
13
"time"
9
14
10
15
"github.com/atscan/atscand/internal/log"
11
16
"github.com/atscan/atscand/internal/storage"
12
-
plcbundle "github.com/atscan/plcbundle"
17
+
"github.com/klauspost/compress/zstd"
18
+
plcbundle "tangled.org/atscan.net/plcbundle"
13
19
)
14
20
15
21
// BundleManager wraps the library's manager with database integration
···
385
391
386
392
return history, nil
387
393
}
394
+
395
+
// GetBundleLabels reads labels from a compressed CSV file for a specific bundle
396
+
func (bm *BundleManager) GetBundleLabels(ctx context.Context, bundleNum int) ([]*PLCOpLabel, error) {
397
+
// Define the path to the labels file
398
+
labelsDir := filepath.Join(bm.bundleDir, "labels")
399
+
labelsFile := filepath.Join(labelsDir, fmt.Sprintf("%06d.csv.zst", bundleNum))
400
+
401
+
// Check if file exists
402
+
if _, err := os.Stat(labelsFile); os.IsNotExist(err) {
403
+
log.Verbose("No labels file found for bundle %d at %s", bundleNum, labelsFile)
404
+
// Return empty, not an error
405
+
return []*PLCOpLabel{}, nil
406
+
}
407
+
408
+
// Open the Zstd-compressed file
409
+
file, err := os.Open(labelsFile)
410
+
if err != nil {
411
+
return nil, fmt.Errorf("failed to open labels file: %w", err)
412
+
}
413
+
defer file.Close()
414
+
415
+
// Create a Zstd reader
416
+
zstdReader, err := zstd.NewReader(file)
417
+
if err != nil {
418
+
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
419
+
}
420
+
defer zstdReader.Close()
421
+
422
+
// Create a CSV reader
423
+
csvReader := csv.NewReader(zstdReader)
424
+
// We skipped the header, so no header read needed
425
+
// Set FieldsPerRecord to 7 for validation
426
+
//csvReader.FieldsPerRecord = 7
427
+
428
+
var labels []*PLCOpLabel
429
+
430
+
// Read all records
431
+
for {
432
+
// Check for context cancellation
433
+
if err := ctx.Err(); err != nil {
434
+
return nil, err
435
+
}
436
+
437
+
record, err := csvReader.Read()
438
+
if err == io.EOF {
439
+
break // End of file
440
+
}
441
+
if err != nil {
442
+
log.Error("Error reading CSV record in %s: %v", labelsFile, err)
443
+
continue // Skip bad line
444
+
}
445
+
446
+
// Parse the CSV record (which is []string)
447
+
label, err := parseLabelRecord(record)
448
+
if err != nil {
449
+
log.Error("Error parsing CSV data for bundle %d: %v", bundleNum, err)
450
+
continue // Skip bad data
451
+
}
452
+
453
+
labels = append(labels, label)
454
+
}
455
+
456
+
return labels, nil
457
+
}
458
+
459
+
// parseLabelRecord converts a new format CSV record into a PLCOpLabel struct
460
+
func parseLabelRecord(record []string) (*PLCOpLabel, error) {
461
+
// New format: 0:bundle, 1:position, 2:cid(short), 3:size, 4:confidence, 5:labels
462
+
if len(record) != 6 {
463
+
err := fmt.Errorf("invalid record length: expected 6, got %d", len(record))
464
+
// --- ADDED LOG ---
465
+
log.Warn("Skipping malformed CSV line: %v (data: %s)", err, strings.Join(record, ","))
466
+
// ---
467
+
return nil, err
468
+
}
469
+
470
+
// 0:bundle
471
+
bundle, err := strconv.Atoi(record[0])
472
+
if err != nil {
473
+
// --- ADDED LOG ---
474
+
log.Warn("Skipping malformed CSV line: 'bundle' column: %v (data: %s)", err, strings.Join(record, ","))
475
+
// ---
476
+
return nil, fmt.Errorf("parsing 'bundle': %w", err)
477
+
}
478
+
479
+
// 1:position
480
+
position, err := strconv.Atoi(record[1])
481
+
if err != nil {
482
+
// --- ADDED LOG ---
483
+
log.Warn("Skipping malformed CSV line: 'position' column: %v (data: %s)", err, strings.Join(record, ","))
484
+
// ---
485
+
return nil, fmt.Errorf("parsing 'position': %w", err)
486
+
}
487
+
488
+
// 2:cid(short)
489
+
shortCID := record[2]
490
+
491
+
// 3:size
492
+
size, err := strconv.Atoi(record[3])
493
+
if err != nil {
494
+
// --- ADDED LOG ---
495
+
log.Warn("Skipping malformed CSV line: 'size' column: %v (data: %s)", err, strings.Join(record, ","))
496
+
// ---
497
+
return nil, fmt.Errorf("parsing 'size': %w", err)
498
+
}
499
+
500
+
// 4:confidence
501
+
confidence, err := strconv.ParseFloat(record[4], 64)
502
+
if err != nil {
503
+
// --- ADDED LOG ---
504
+
log.Warn("Skipping malformed CSV line: 'confidence' column: %v (data: %s)", err, strings.Join(record, ","))
505
+
// ---
506
+
return nil, fmt.Errorf("parsing 'confidence': %w", err)
507
+
}
508
+
509
+
// 5:labels
510
+
detectors := strings.Split(record[5], ";")
511
+
512
+
label := &PLCOpLabel{
513
+
Bundle: bundle,
514
+
Position: position,
515
+
CID: shortCID,
516
+
Size: size,
517
+
Confidence: confidence,
518
+
Detectors: detectors,
519
+
}
520
+
521
+
return label, nil
522
+
}
+2
internal/plc/scanner.go
+2
internal/plc/scanner.go
···
190
190
}
191
191
192
192
func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error {
193
+
valid := validateEndpoint(endpoint)
193
194
return s.db.UpsertEndpoint(ctx, &storage.Endpoint{
194
195
EndpointType: epType,
195
196
Endpoint: endpoint,
196
197
DiscoveredAt: discoveredAt,
197
198
LastChecked: time.Time{},
198
199
Status: storage.EndpointStatusUnknown,
200
+
Valid: valid,
199
201
})
200
202
}
201
203
+60
-1
internal/plc/types.go
+60
-1
internal/plc/types.go
···
1
1
package plc
2
2
3
3
import (
4
-
plclib "github.com/atscan/plcbundle/plc"
4
+
"net/url"
5
+
"strings"
6
+
7
+
plclib "tangled.org/atscan.net/plcbundle/plc"
5
8
)
6
9
7
10
// Re-export library types
···
28
31
Type string
29
32
Endpoint string
30
33
}
34
+
35
+
// PLCOpLabel holds metadata from the label CSV file
36
+
type PLCOpLabel struct {
37
+
Bundle int `json:"bundle"`
38
+
Position int `json:"position"`
39
+
CID string `json:"cid"`
40
+
Size int `json:"size"`
41
+
Confidence float64 `json:"confidence"`
42
+
Detectors []string `json:"detectors"`
43
+
}
44
+
45
+
// validateEndpoint checks if endpoint is in correct format: https://<domain>
46
+
func validateEndpoint(endpoint string) bool {
47
+
// Must not be empty
48
+
if endpoint == "" {
49
+
return false
50
+
}
51
+
52
+
// Must not have trailing slash
53
+
if strings.HasSuffix(endpoint, "/") {
54
+
return false
55
+
}
56
+
57
+
// Parse URL
58
+
u, err := url.Parse(endpoint)
59
+
if err != nil {
60
+
return false
61
+
}
62
+
63
+
// Must use https scheme
64
+
if u.Scheme != "https" {
65
+
return false
66
+
}
67
+
68
+
// Must have a host
69
+
if u.Host == "" {
70
+
return false
71
+
}
72
+
73
+
// Must not have path (except empty)
74
+
if u.Path != "" && u.Path != "/" {
75
+
return false
76
+
}
77
+
78
+
// Must not have query parameters
79
+
if u.RawQuery != "" {
80
+
return false
81
+
}
82
+
83
+
// Must not have fragment
84
+
if u.Fragment != "" {
85
+
return false
86
+
}
87
+
88
+
return true
89
+
}
+24
-17
internal/storage/postgres.go
+24
-17
internal/storage/postgres.go
···
84
84
ip TEXT,
85
85
ipv6 TEXT,
86
86
ip_resolved_at TIMESTAMP,
87
+
valid BOOLEAN DEFAULT true,
87
88
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
88
89
UNIQUE(endpoint_type, endpoint)
89
90
);
···
95
96
CREATE INDEX IF NOT EXISTS idx_endpoints_ipv6 ON endpoints(ipv6);
96
97
CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did);
97
98
CREATE INDEX IF NOT EXISTS idx_endpoints_server_did_type_discovered ON endpoints(server_did, endpoint_type, discovered_at);
99
+
CREATE INDEX IF NOT EXISTS idx_endpoints_valid ON endpoints(valid);
98
100
99
101
-- IP infos table (IP as PRIMARY KEY)
100
102
CREATE TABLE IF NOT EXISTS ip_infos (
···
208
210
209
211
func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error {
210
212
query := `
211
-
INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at)
212
-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
213
+
INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at, valid)
214
+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
213
215
ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET
214
216
last_checked = EXCLUDED.last_checked,
215
217
status = EXCLUDED.status,
···
225
227
WHEN (EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '') OR (EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '') THEN EXCLUDED.ip_resolved_at
226
228
ELSE endpoints.ip_resolved_at
227
229
END,
230
+
valid = EXCLUDED.valid,
228
231
updated_at = CURRENT_TIMESTAMP
229
232
RETURNING id
230
233
`
231
234
err := p.db.QueryRowContext(ctx, query,
232
235
endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt,
233
-
endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt).Scan(&endpoint.ID)
236
+
endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt, endpoint.Valid).Scan(&endpoint.ID)
234
237
return err
235
238
}
236
239
···
251
254
func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) {
252
255
query := `
253
256
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status,
254
-
ip, ipv6, ip_resolved_at, updated_at
257
+
ip, ipv6, ip_resolved_at, valid, updated_at
255
258
FROM endpoints
256
259
WHERE endpoint = $1 AND endpoint_type = $2
257
260
`
···
262
265
263
266
err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(
264
267
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
265
-
&ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.UpdatedAt,
268
+
&ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.Valid, &ep.UpdatedAt,
266
269
)
267
270
if err != nil {
268
271
return nil, err
···
288
291
query := `
289
292
SELECT DISTINCT ON (COALESCE(server_did, id::text))
290
293
id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status,
291
-
ip, ipv6, ip_resolved_at, updated_at
294
+
ip, ipv6, ip_resolved_at, valid, updated_at
292
295
FROM endpoints
293
296
WHERE 1=1
294
297
`
···
300
303
query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx)
301
304
args = append(args, filter.Type)
302
305
argIdx++
306
+
}
307
+
308
+
// NEW: Filter by valid flag
309
+
if filter.OnlyValid {
310
+
query += fmt.Sprintf(" AND valid = true", argIdx)
303
311
}
304
312
if filter.Status != "" {
305
313
statusInt := EndpointStatusUnknown
···
566
574
last_checked,
567
575
status,
568
576
ip,
569
-
ipv6
577
+
ipv6,
578
+
valid
570
579
FROM endpoints
571
580
WHERE endpoint_type = 'pds'
572
581
ORDER BY COALESCE(server_did, id::text), discovered_at ASC
573
582
)
574
583
SELECT
575
-
e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6,
584
+
e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, e.valid,
576
585
latest.user_count, latest.response_time, latest.version, latest.scanned_at,
577
586
i.city, i.country, i.country_code, i.asn, i.asn_org,
578
587
i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy,
···
643
652
var scannedAt sql.NullTime
644
653
645
654
err := rows.Scan(
646
-
&item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6,
655
+
&item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, &item.Valid,
647
656
&userCount, &responseTime, &version, &scannedAt,
648
657
&city, &country, &countryCode, &asn, &asnOrg,
649
658
&isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy,
···
705
714
706
715
func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) {
707
716
query := `
708
-
WITH target_endpoint AS MATERIALIZED ( -- MATERIALIZED fence for optimization
717
+
WITH target_endpoint AS MATERIALIZED (
709
718
SELECT
710
719
e.id,
711
720
e.endpoint,
···
714
723
e.last_checked,
715
724
e.status,
716
725
e.ip,
717
-
e.ipv6
726
+
e.ipv6,
727
+
e.valid
718
728
FROM endpoints e
719
729
WHERE e.endpoint = $1
720
730
AND e.endpoint_type = 'pds'
721
-
LIMIT 1 -- Early termination since we expect exactly 1 row
731
+
LIMIT 1
722
732
)
723
733
SELECT
724
734
te.id,
···
729
739
te.status,
730
740
te.ip,
731
741
te.ipv6,
742
+
te.valid,
732
743
latest.user_count,
733
744
latest.response_time,
734
745
latest.version,
···
738
749
i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy,
739
750
i.latitude, i.longitude,
740
751
i.raw_data,
741
-
-- Inline aliases aggregation (avoid second CTE)
742
752
COALESCE(
743
753
ARRAY(
744
754
SELECT e2.endpoint
···
751
761
),
752
762
ARRAY[]::text[]
753
763
) as aliases,
754
-
-- Inline first_discovered_at (avoid aggregation)
755
764
CASE
756
765
WHEN te.server_did IS NOT NULL THEN (
757
766
SELECT MIN(e3.discovered_at)
···
792
801
var firstDiscoveredAt sql.NullTime
793
802
794
803
err := p.db.QueryRowContext(ctx, query, endpoint).Scan(
795
-
&detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6,
804
+
&detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, &detail.Valid,
796
805
&userCount, &responseTime, &version, &serverInfoJSON, &scannedAt,
797
806
&city, &country, &countryCode, &asn, &asnOrg,
798
807
&isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy,
···
819
828
// Set aliases and is_primary
820
829
detail.Aliases = aliases
821
830
if serverDID.Valid && serverDID.String != "" && firstDiscoveredAt.Valid {
822
-
// Has server_did - check if this is the first discovered
823
831
detail.IsPrimary = detail.DiscoveredAt.Equal(firstDiscoveredAt.Time) ||
824
832
detail.DiscoveredAt.Before(firstDiscoveredAt.Time)
825
833
} else {
826
-
// No server_did means unique server
827
834
detail.IsPrimary = true
828
835
}
829
836
+7
-4
internal/storage/types.go
+7
-4
internal/storage/types.go
···
26
26
LastChecked time.Time
27
27
Status int
28
28
IP string
29
-
IPv6 string // NEW
29
+
IPv6 string
30
30
IPResolvedAt time.Time
31
+
Valid bool
31
32
UpdatedAt time.Time
32
33
}
33
34
···
76
77
77
78
// EndpointFilter for querying endpoints
78
79
type EndpointFilter struct {
79
-
Type string // "pds", "labeler", etc.
80
+
Type string
80
81
Status string
81
82
MinUserCount int64
82
83
OnlyStale bool
84
+
OnlyValid bool
83
85
RecheckInterval time.Duration
84
-
Random bool // NEW: Return results in random order
86
+
Random bool
85
87
Limit int
86
88
Offset int
87
89
}
···
213
215
LastChecked time.Time
214
216
Status int
215
217
IP string
216
-
IPv6 string // NEW
218
+
IPv6 string
219
+
Valid bool // NEW
217
220
218
221
// From latest endpoint_scans (via JOIN)
219
222
LatestScan *struct {
+113
utils/import-labels.js
+113
utils/import-labels.js
···
1
+
import { file, write } from "bun";
2
+
import { join } from "path";
3
+
import { mkdir } from "fs/promises";
4
+
import { init, compress } from "@bokuweb/zstd-wasm";
5
+
6
+
// --- Configuration ---
7
+
const CSV_FILE = process.argv[2];
8
+
const CONFIG_FILE = "config.yaml";
9
+
const COMPRESSION_LEVEL = 5; // zstd level 1-22 (5 is a good balance)
10
+
// ---------------------
11
+
12
+
if (!CSV_FILE) {
13
+
console.error("Usage: bun run utils/import-labels.js <path-to-csv-file>");
14
+
process.exit(1);
15
+
}
16
+
17
+
console.log("========================================");
18
+
console.log("PLC Operation Labels Import (Bun + WASM)");
19
+
console.log("========================================");
20
+
21
+
// 1. Read and parse config
22
+
console.log(`Loading config from ${CONFIG_FILE}...`);
23
+
const configFile = await file(CONFIG_FILE).text();
24
+
const config = Bun.YAML.parse(configFile);
25
+
const bundleDir = config?.plc?.bundle_dir;
26
+
27
+
if (!bundleDir) {
28
+
console.error("Error: Could not parse plc.bundle_dir from config.yaml");
29
+
process.exit(1);
30
+
}
31
+
32
+
const FINAL_LABELS_DIR = join(bundleDir, "labels");
33
+
await mkdir(FINAL_LABELS_DIR, { recursive: true });
34
+
35
+
console.log(`CSV File: ${CSV_FILE}`);
36
+
console.log(`Output Dir: ${FINAL_LABELS_DIR}`);
37
+
console.log("");
38
+
39
+
// 2. Initialize Zstd WASM module
40
+
await init();
41
+
42
+
// --- Pass 1: Read entire file into memory and group by bundle ---
43
+
console.log("Pass 1/2: Reading and grouping all lines by bundle...");
44
+
console.warn("This will use a large amount of RAM!");
45
+
46
+
const startTime = Date.now();
47
+
const bundles = new Map(); // Map<string, string[]>
48
+
let lineCount = 0;
49
+
50
+
const inputFile = file(CSV_FILE);
51
+
const fileStream = inputFile.stream();
52
+
const decoder = new TextDecoder();
53
+
let remainder = "";
54
+
55
+
for await (const chunk of fileStream) {
56
+
const text = remainder + decoder.decode(chunk);
57
+
const lines = text.split("\n");
58
+
remainder = lines.pop() || "";
59
+
60
+
for (const line of lines) {
61
+
if (line === "") continue;
62
+
lineCount++;
63
+
64
+
if (lineCount === 1 && line.startsWith("bundle,")) {
65
+
continue; // Skip header
66
+
}
67
+
68
+
const firstCommaIndex = line.indexOf(",");
69
+
if (firstCommaIndex === -1) {
70
+
console.warn(`Skipping malformed line: ${line}`);
71
+
continue;
72
+
}
73
+
const bundleNumStr = line.substring(0, firstCommaIndex);
74
+
const bundleKey = bundleNumStr.padStart(6, "0");
75
+
76
+
// Add line to the correct bundle's array
77
+
if (!bundles.has(bundleKey)) {
78
+
bundles.set(bundleKey, []);
79
+
}
80
+
bundles.get(bundleKey).push(line);
81
+
}
82
+
}
83
+
// Note: We ignore any final `remainder` as it's likely an empty line
84
+
85
+
console.log(`Finished reading ${lineCount.toLocaleString()} lines.`);
86
+
console.log(`Found ${bundles.size} unique bundles.`);
87
+
88
+
// --- Pass 2: Compress and write each bundle ---
89
+
console.log("\nPass 2/2: Compressing and writing bundle files...");
90
+
let i = 0;
91
+
for (const [bundleKey, lines] of bundles.entries()) {
92
+
i++;
93
+
console.log(` (${i}/${bundles.size}) Compressing bundle ${bundleKey}...`);
94
+
95
+
// Join all lines for this bundle into one big string
96
+
const content = lines.join("\n");
97
+
98
+
// Compress the string
99
+
const compressedData = compress(Buffer.from(content), COMPRESSION_LEVEL);
100
+
101
+
// Write the compressed data to the file
102
+
const outPath = join(FINAL_LABELS_DIR, `${bundleKey}.csv.zst`);
103
+
await write(outPath, compressedData);
104
+
}
105
+
106
+
// 3. Clean up
107
+
const totalTime = (Date.now() - startTime) / 1000;
108
+
console.log("\n========================================");
109
+
console.log("Import Summary");
110
+
console.log("========================================");
111
+
console.log(`✓ Import completed in ${totalTime.toFixed(2)} seconds.`);
112
+
console.log(`Total lines processed: ${lineCount.toLocaleString()}`);
113
+
console.log(`Label files are stored in: ${FINAL_LABELS_DIR}`);
+91
utils/import-labels.sh
+91
utils/import-labels.sh
···
1
+
#!/bin/bash
2
+
# import-labels-v4-sorted-pipe.sh
3
+
4
+
set -e
5
+
6
+
if [ $# -lt 1 ]; then
7
+
echo "Usage: ./utils/import-labels-v4-sorted-pipe.sh <csv-file>"
8
+
exit 1
9
+
fi
10
+
11
+
CSV_FILE="$1"
12
+
CONFIG_FILE="config.yaml"
13
+
14
+
[ ! -f "$CSV_FILE" ] && echo "Error: CSV file not found" && exit 1
15
+
[ ! -f "$CONFIG_FILE" ] && echo "Error: config.yaml not found" && exit 1
16
+
17
+
# Extract bundle directory path
18
+
BUNDLE_DIR=$(grep -A 5 "^plc:" "$CONFIG_FILE" | grep "bundle_dir:" | sed 's/.*bundle_dir: *"//' | sed 's/".*//' | head -1)
19
+
20
+
[ -z "$BUNDLE_DIR" ] && echo "Error: Could not parse plc.bundle_dir from config.yaml" && exit 1
21
+
22
+
FINAL_LABELS_DIR="$BUNDLE_DIR/labels"
23
+
24
+
echo "========================================"
25
+
echo "PLC Operation Labels Import (Sorted Pipe)"
26
+
echo "========================================"
27
+
echo "CSV File: $CSV_FILE"
28
+
echo "Output Dir: $FINAL_LABELS_DIR"
29
+
echo ""
30
+
31
+
# Ensure the final directory exists
32
+
mkdir -p "$FINAL_LABELS_DIR"
33
+
34
+
echo "Streaming, sorting, and compressing on the fly..."
35
+
echo "This will take time. `pv` will show progress of the TAIL command."
36
+
echo "The `sort` command will run after `pv` is complete."
37
+
echo ""
38
+
39
+
# This is the single-pass pipeline
40
+
tail -n +2 "$CSV_FILE" | \
41
+
pv -l -s $(tail -n +2 "$CSV_FILE" | wc -l) | \
42
+
sort -t, -k1,1n | \
43
+
awk -F',' -v final_dir="$FINAL_LABELS_DIR" '
44
+
# This awk script EXPECTS input sorted by bundle number (col 1)
45
+
BEGIN {
46
+
# last_bundle_num tracks the bundle we are currently writing
47
+
last_bundle_num = -1
48
+
# cmd holds the current zstd pipe command
49
+
cmd = ""
50
+
}
51
+
{
52
+
current_bundle_num = $1
53
+
54
+
# Check if the bundle number has changed
55
+
if (current_bundle_num != last_bundle_num) {
56
+
57
+
# If it changed, and we have an old pipe open, close it
58
+
if (last_bundle_num != -1) {
59
+
close(cmd)
60
+
}
61
+
62
+
# Create the new pipe command, writing to the final .zst file
63
+
outfile = sprintf("%s/%06d.csv.zst", final_dir, current_bundle_num)
64
+
cmd = "zstd -T0 -o " outfile
65
+
66
+
# Update the tracker
67
+
last_bundle_num = current_bundle_num
68
+
69
+
# Print progress to stderr
70
+
printf " -> Writing bundle %06d\n", current_bundle_num > "/dev/stderr"
71
+
}
72
+
73
+
# Print the current line ($0) to the open pipe
74
+
# The first time this runs for a bundle, it opens the pipe
75
+
# Subsequent times, it writes to the already-open pipe
76
+
print $0 | cmd
77
+
}
78
+
# END block: close the very last pipe
79
+
END {
80
+
if (last_bundle_num != -1) {
81
+
close(cmd)
82
+
}
83
+
printf " Finished. Total lines: %d\n", NR > "/dev/stderr"
84
+
}'
85
+
86
+
echo ""
87
+
echo "========================================"
88
+
echo "Import Summary"
89
+
echo "========================================"
90
+
echo "✓ Import completed successfully!"
91
+
echo "Label files are stored in: $FINAL_LABELS_DIR"