Compare changes

Choose any two refs to compare.

+1 -1
.gitignore
··· 6 6 plc_cache\.tmp/* 7 7 plc_bundles* 8 8 config.yaml 9 - atscand 9 + /atscand
+1 -1
Makefile
··· 32 32 $(GORUN) cmd/atscand/main.go -verbose 33 33 34 34 update-plcbundle: 35 - GOPROXY=direct go get -u github.com/atscan/plcbundle@latest 35 + GOPROXY=direct go get -u tangled.org/atscan.net/plcbundle@latest 36 36 37 37 # Show help 38 38 help:
+159
cmd/atscand/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "flag" 6 + "fmt" 7 + "os" 8 + "os/signal" 9 + "syscall" 10 + "time" 11 + 12 + "github.com/atscan/atscand/internal/api" 13 + "github.com/atscan/atscand/internal/config" 14 + "github.com/atscan/atscand/internal/log" 15 + "github.com/atscan/atscand/internal/pds" 16 + "github.com/atscan/atscand/internal/plc" 17 + "github.com/atscan/atscand/internal/storage" 18 + "github.com/atscan/atscand/internal/worker" 19 + ) 20 + 21 + const VERSION = "1.0.0" 22 + 23 + func main() { 24 + configPath := flag.String("config", "config.yaml", "path to config file") 25 + verbose := flag.Bool("verbose", false, "enable verbose logging") 26 + flag.Parse() 27 + 28 + // Load configuration 29 + cfg, err := config.Load(*configPath) 30 + if err != nil { 31 + fmt.Fprintf(os.Stderr, "Failed to load config: %v\n", err) 32 + os.Exit(1) 33 + } 34 + 35 + // Override verbose setting if flag is provided 36 + if *verbose { 37 + cfg.API.Verbose = true 38 + } 39 + 40 + // Initialize logger 41 + log.Init(cfg.API.Verbose) 42 + 43 + // Print banner 44 + log.Banner(VERSION) 45 + 46 + // Print configuration summary 47 + log.PrintConfig(map[string]string{ 48 + "Database Type": cfg.Database.Type, 49 + "Database Path": cfg.Database.Path, // Will be auto-redacted 50 + "PLC Directory": cfg.PLC.DirectoryURL, 51 + "PLC Scan Interval": cfg.PLC.ScanInterval.String(), 52 + "PLC Bundle Dir": cfg.PLC.BundleDir, 53 + "PLC Cache": fmt.Sprintf("%v", cfg.PLC.UseCache), 54 + "PLC Index DIDs": fmt.Sprintf("%v", cfg.PLC.IndexDIDs), 55 + "PDS Scan Interval": cfg.PDS.ScanInterval.String(), 56 + "PDS Workers": fmt.Sprintf("%d", cfg.PDS.Workers), 57 + "PDS Timeout": cfg.PDS.Timeout.String(), 58 + "API Host": cfg.API.Host, 59 + "API Port": fmt.Sprintf("%d", cfg.API.Port), 60 + "Verbose Logging": fmt.Sprintf("%v", cfg.API.Verbose), 61 + }) 62 + 63 + // Initialize database using factory pattern 64 + db, err := storage.NewDatabase(cfg.Database.Type, cfg.Database.Path) 65 + if err != nil { 66 + log.Fatal("Failed to initialize database: %v", err) 67 + } 68 + defer func() { 69 + log.Info("Closing database connection...") 70 + db.Close() 71 + }() 72 + 73 + // Set scan retention from config 74 + if cfg.PDS.ScanRetention > 0 { 75 + db.SetScanRetention(cfg.PDS.ScanRetention) 76 + log.Verbose("Scan retention set to %d scans per endpoint", cfg.PDS.ScanRetention) 77 + } 78 + 79 + // Run migrations 80 + if err := db.Migrate(); err != nil { 81 + log.Fatal("Failed to run migrations: %v", err) 82 + } 83 + 84 + ctx, cancel := context.WithCancel(context.Background()) 85 + defer cancel() 86 + 87 + // Initialize workers 88 + log.Info("Initializing scanners...") 89 + 90 + bundleManager, err := plc.NewBundleManager(cfg.PLC.BundleDir, cfg.PLC.DirectoryURL, db, cfg.PLC.IndexDIDs) 91 + if err != nil { 92 + log.Fatal("Failed to create bundle manager: %v", err) 93 + } 94 + defer bundleManager.Close() 95 + log.Verbose("✓ Bundle manager initialized (shared)") 96 + 97 + plcScanner := plc.NewScanner(db, cfg.PLC, bundleManager) 98 + defer plcScanner.Close() 99 + log.Verbose("✓ PLC scanner initialized") 100 + 101 + pdsScanner := pds.NewScanner(db, cfg.PDS) 102 + log.Verbose("✓ PDS scanner initialized") 103 + 104 + scheduler := worker.NewScheduler() 105 + 106 + // Schedule PLC directory scan 107 + scheduler.AddJob("plc_scan", cfg.PLC.ScanInterval, func() { 108 + if err := plcScanner.Scan(ctx); err != nil { 109 + log.Error("PLC scan error: %v", err) 110 + } 111 + }) 112 + log.Verbose("✓ PLC scan job scheduled (interval: %s)", cfg.PLC.ScanInterval) 113 + 114 + // Schedule PDS availability checks 115 + scheduler.AddJob("pds_scan", cfg.PDS.ScanInterval, func() { 116 + if err := pdsScanner.ScanAll(ctx); err != nil { 117 + log.Error("PDS scan error: %v", err) 118 + } 119 + }) 120 + log.Verbose("✓ PDS scan job scheduled (interval: %s)", cfg.PDS.ScanInterval) 121 + 122 + // Start API server 123 + log.Info("Starting API server on %s:%d...", cfg.API.Host, cfg.API.Port) 124 + apiServer := api.NewServer(db, cfg.API, cfg.PLC, bundleManager) 125 + go func() { 126 + if err := apiServer.Start(); err != nil { 127 + log.Fatal("API server error: %v", err) 128 + } 129 + }() 130 + 131 + // Give the API server a moment to start 132 + time.Sleep(100 * time.Millisecond) 133 + log.Info("✓ API server started successfully") 134 + log.Info("") 135 + log.Info("🚀 ATScanner is running!") 136 + log.Info(" API available at: http://%s:%d", cfg.API.Host, cfg.API.Port) 137 + log.Info(" Press Ctrl+C to stop") 138 + log.Info("") 139 + 140 + // Start scheduler 141 + scheduler.Start(ctx) 142 + 143 + // Wait for interrupt 144 + sigChan := make(chan os.Signal, 1) 145 + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 146 + <-sigChan 147 + 148 + log.Info("") 149 + log.Info("Shutting down gracefully...") 150 + cancel() 151 + 152 + log.Info("Stopping API server...") 153 + apiServer.Shutdown(context.Background()) 154 + 155 + log.Info("Waiting for active tasks to complete...") 156 + time.Sleep(2 * time.Second) 157 + 158 + log.Info("✓ Shutdown complete. Goodbye!") 159 + }
+168
cmd/import-labels/main.go
··· 1 + package main 2 + 3 + import ( 4 + "bufio" 5 + "flag" 6 + "fmt" 7 + "os" 8 + "path/filepath" 9 + "strings" 10 + "time" 11 + 12 + "github.com/klauspost/compress/zstd" 13 + "gopkg.in/yaml.v3" 14 + ) 15 + 16 + type Config struct { 17 + PLC struct { 18 + BundleDir string `yaml:"bundle_dir"` 19 + } `yaml:"plc"` 20 + } 21 + 22 + var CONFIG_FILE = "config.yaml" 23 + 24 + // --------------------- 25 + 26 + func main() { 27 + // Define a new flag for changing the directory 28 + workDir := flag.String("C", ".", "Change to this directory before running (for finding config.yaml)") 29 + flag.Usage = func() { // Custom usage message 30 + fmt.Fprintf(os.Stderr, "Usage: ... | %s [-C /path/to/dir]\n", os.Args[0]) 31 + fmt.Fprintln(os.Stderr, "Reads sorted CSV from stdin and writes compressed bundle files.") 32 + flag.PrintDefaults() 33 + } 34 + flag.Parse() // Parse all defined flags 35 + 36 + // Change directory if the flag was used 37 + if *workDir != "." { 38 + fmt.Printf("Changing working directory to %s...\n", *workDir) 39 + if err := os.Chdir(*workDir); err != nil { 40 + fmt.Fprintf(os.Stderr, "Error changing directory to %s: %v\n", *workDir, err) 41 + os.Exit(1) 42 + } 43 + } 44 + 45 + // --- REMOVED UNUSED CODE --- 46 + // The csvFilePath variable and NArg check were removed 47 + // as the script now reads from stdin. 48 + // --------------------------- 49 + 50 + fmt.Println("========================================") 51 + fmt.Println("PLC Operation Labels Import (Go STDIN)") 52 + fmt.Println("========================================") 53 + 54 + // 1. Read config (will now read from the new CWD) 55 + fmt.Printf("Loading config from %s...\n", CONFIG_FILE) 56 + configData, err := os.ReadFile(CONFIG_FILE) 57 + if err != nil { 58 + fmt.Fprintf(os.Stderr, "Error reading config file: %v\n", err) 59 + os.Exit(1) 60 + } 61 + 62 + var config Config 63 + if err := yaml.Unmarshal(configData, &config); err != nil { 64 + fmt.Fprintf(os.Stderr, "Error parsing config.yaml: %v\n", err) 65 + os.Exit(1) 66 + } 67 + 68 + if config.PLC.BundleDir == "" { 69 + fmt.Fprintln(os.Stderr, "Error: Could not parse plc.bundle_dir from config.yaml") 70 + os.Exit(1) 71 + } 72 + 73 + finalLabelsDir := filepath.Join(config.PLC.BundleDir, "labels") 74 + if err := os.MkdirAll(finalLabelsDir, 0755); err != nil { 75 + fmt.Fprintf(os.Stderr, "Error creating output directory: %v\n", err) 76 + os.Exit(1) 77 + } 78 + 79 + fmt.Printf("Output Dir: %s\n", finalLabelsDir) 80 + fmt.Println("Waiting for sorted data from stdin...") 81 + 82 + // 2. Process sorted data from stdin 83 + // This script *requires* the input to be sorted by bundle number. 84 + 85 + var currentWriter *zstd.Encoder 86 + var currentFile *os.File 87 + var lastBundleKey string = "" 88 + 89 + lineCount := 0 90 + startTime := time.Now() 91 + 92 + scanner := bufio.NewScanner(os.Stdin) 93 + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) 94 + 95 + for scanner.Scan() { 96 + line := scanner.Text() 97 + lineCount++ 98 + 99 + parts := strings.SplitN(line, ",", 2) 100 + if len(parts) < 1 { 101 + continue // Skip empty/bad lines 102 + } 103 + 104 + bundleNumStr := parts[0] 105 + bundleKey := fmt.Sprintf("%06s", bundleNumStr) // Pad with zeros 106 + 107 + // If the bundle key is new, close the old writer and open a new one. 108 + if bundleKey != lastBundleKey { 109 + // Close the previous writer/file 110 + if currentWriter != nil { 111 + if err := currentWriter.Close(); err != nil { 112 + fmt.Fprintf(os.Stderr, "Error closing writer for %s: %v\n", lastBundleKey, err) 113 + } 114 + currentFile.Close() 115 + } 116 + 117 + // Start the new one 118 + fmt.Printf(" -> Writing bundle %s\n", bundleKey) 119 + outPath := filepath.Join(finalLabelsDir, fmt.Sprintf("%s.csv.zst", bundleKey)) 120 + 121 + file, err := os.Create(outPath) 122 + if err != nil { 123 + fmt.Fprintf(os.Stderr, "Error creating file %s: %v\n", outPath, err) 124 + os.Exit(1) 125 + } 126 + currentFile = file 127 + 128 + writer, err := zstd.NewWriter(file) 129 + if err != nil { 130 + fmt.Fprintf(os.Stderr, "Error creating zstd writer: %v\n", err) 131 + os.Exit(1) 132 + } 133 + currentWriter = writer 134 + lastBundleKey = bundleKey 135 + } 136 + 137 + // Write the line to the currently active writer 138 + if _, err := currentWriter.Write([]byte(line + "\n")); err != nil { 139 + fmt.Fprintf(os.Stderr, "Error writing line: %v\n", err) 140 + } 141 + 142 + // Progress update 143 + if lineCount%100000 == 0 { 144 + elapsed := time.Since(startTime).Seconds() 145 + rate := float64(lineCount) / elapsed 146 + fmt.Printf(" ... processed %d lines (%.0f lines/sec)\n", lineCount, rate) 147 + } 148 + } 149 + 150 + // 3. Close the very last writer 151 + if currentWriter != nil { 152 + if err := currentWriter.Close(); err != nil { 153 + fmt.Fprintf(os.Stderr, "Error closing final writer: %v\n", err) 154 + } 155 + currentFile.Close() 156 + } 157 + 158 + if err := scanner.Err(); err != nil { 159 + fmt.Fprintf(os.Stderr, "Error reading stdin: %v\n", err) 160 + } 161 + 162 + totalTime := time.Since(startTime) 163 + fmt.Println("\n========================================") 164 + fmt.Println("Import Summary") 165 + fmt.Println("========================================") 166 + fmt.Printf("✓ Import completed in %v\n", totalTime) 167 + fmt.Printf("Total lines processed: %d\n", lineCount) 168 + }
+2 -3
go.mod
··· 8 8 gopkg.in/yaml.v3 v3.0.1 9 9 ) 10 10 11 - require github.com/klauspost/compress v1.18.1 // indirect 11 + require github.com/klauspost/compress v1.18.1 12 12 13 13 require ( 14 - github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d 15 - github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7 16 14 github.com/gorilla/handlers v1.5.2 17 15 github.com/jackc/pgx/v5 v5.7.6 16 + tangled.org/atscan.net/plcbundle v0.3.6 18 17 ) 19 18 20 19 require (
+2 -4
go.sum
··· 1 - github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= 2 - github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= 3 - github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7 h1:u5mCzLGQPSThUPjQnAn64xs3ZWuPltKpua1M+bMxtww= 4 - github.com/atscan/plcbundle v0.0.0-20251027220105-866aef8771a7/go.mod h1:vqyqs+zyaxFYtIp6I4+zSQD76oiylnGenzD7ZeA4cxs= 5 1 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= 6 2 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 7 3 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= ··· 49 45 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 50 46 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 51 47 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 48 + tangled.org/atscan.net/plcbundle v0.3.6 h1:8eSOxEwHRRT7cLhOTUxut80fYLAi+jodR9UTshofIvY= 49 + tangled.org/atscan.net/plcbundle v0.3.6/go.mod h1:XUzSi6wmqAECCLThmuBTzYV5mEd0q/J1wE45cagoBqs=
+39 -36
internal/api/handlers.go
··· 15 15 "github.com/atscan/atscand/internal/monitor" 16 16 "github.com/atscan/atscand/internal/plc" 17 17 "github.com/atscan/atscand/internal/storage" 18 - "github.com/atscan/plcbundle" 19 18 "github.com/gorilla/mux" 19 + "tangled.org/atscan.net/plcbundle" 20 20 ) 21 21 22 22 // ===== RESPONSE HELPERS ===== ··· 75 75 76 76 // ===== FORMATTING HELPERS ===== 77 77 78 - func formatBundleResponse(bundle *plcbundle.BundleMetadata) map[string]interface{} { 79 - return map[string]interface{}{ 80 - "plc_bundle_number": bundle.BundleNumber, 81 - "start_time": bundle.StartTime, 82 - "end_time": bundle.EndTime, 83 - "operation_count": plc.BUNDLE_SIZE, 84 - "did_count": bundle.DIDCount, // Use DIDCount instead of len(DIDs) 85 - "hash": bundle.Hash, 86 - "compressed_hash": bundle.CompressedHash, 87 - "compressed_size": bundle.CompressedSize, 88 - "uncompressed_size": bundle.UncompressedSize, 89 - "compression_ratio": float64(bundle.UncompressedSize) / float64(bundle.CompressedSize), 90 - "cursor": bundle.Cursor, 91 - "prev_bundle_hash": bundle.PrevBundleHash, 92 - "created_at": bundle.CreatedAt, 93 - } 94 - } 95 - 96 78 func formatEndpointResponse(ep *storage.Endpoint) map[string]interface{} { 97 79 response := map[string]interface{}{ 98 80 "id": ep.ID, ··· 266 248 "endpoint": pds.Endpoint, 267 249 "discovered_at": pds.DiscoveredAt, 268 250 "status": statusToString(pds.Status), 251 + "valid": pds.Valid, // NEW 269 252 } 270 253 271 254 // Add server_did if available ··· 748 731 "end_time": meta.EndTime, 749 732 "operation_count": meta.OperationCount, 750 733 "did_count": meta.DIDCount, 751 - "hash": meta.Hash, 734 + "hash": meta.Hash, // Chain hash (primary) 735 + "content_hash": meta.ContentHash, // Content hash 736 + "parent": meta.Parent, // Parent chain hash 752 737 "compressed_hash": meta.CompressedHash, 753 738 "compressed_size": meta.CompressedSize, 754 739 "uncompressed_size": meta.UncompressedSize, 755 740 "compression_ratio": float64(meta.UncompressedSize) / float64(meta.CompressedSize), 756 741 "cursor": meta.Cursor, 757 - "prev_bundle_hash": meta.PrevBundleHash, 758 742 "created_at": meta.CreatedAt, 759 743 } 760 744 } ··· 780 764 "is_upcoming": true, 781 765 "status": "filling", 782 766 "operation_count": count, 767 + "did_count": stats["did_count"], 783 768 "target_operation_count": 10000, 784 769 "progress_percent": float64(count) / 100.0, 785 770 "operations_needed": 10000 - count, ··· 806 791 // Get previous bundle info 807 792 if bundleNum > 1 { 808 793 if prevBundle, err := s.bundleManager.GetBundleMetadata(bundleNum - 1); err == nil { 809 - result["prev_bundle_hash"] = prevBundle.Hash 794 + result["parent"] = prevBundle.Hash // Parent chain hash 810 795 result["cursor"] = prevBundle.EndTime.Format(time.RFC3339Nano) 811 796 } 812 797 } ··· 1005 990 1006 991 response := make([]map[string]interface{}, len(bundles)) 1007 992 for i, bundle := range bundles { 1008 - response[i] = formatBundleResponse(bundle) 993 + response[i] = formatBundleMetadata(bundle) 1009 994 } 1010 995 1011 996 resp.json(response) ··· 1022 1007 lastBundle := stats["last_bundle"].(int64) 1023 1008 1024 1009 resp.json(map[string]interface{}{ 1025 - "plc_bundle_count": bundleCount, 1026 - "last_bundle_number": lastBundle, 1027 - "total_compressed_size": totalSize, 1028 - "total_compressed_size_mb": float64(totalSize) / 1024 / 1024, 1029 - "total_compressed_size_gb": float64(totalSize) / 1024 / 1024 / 1024, 1030 - "total_uncompressed_size": totalUncompressedSize, 1031 - "total_uncompressed_size_mb": float64(totalUncompressedSize) / 1024 / 1024, 1032 - "total_uncompressed_size_gb": float64(totalUncompressedSize) / 1024 / 1024 / 1024, 1033 - "overall_compression_ratio": float64(totalUncompressedSize) / float64(totalSize), 1010 + "plc_bundle_count": bundleCount, 1011 + "last_bundle_number": lastBundle, 1012 + "total_compressed_size": totalSize, 1013 + "total_uncompressed_size": totalUncompressedSize, 1014 + "overall_compression_ratio": float64(totalUncompressedSize) / float64(totalSize), 1034 1015 }) 1035 1016 } 1036 1017 ··· 1145 1126 break 1146 1127 } 1147 1128 1148 - if bundle.PrevBundleHash != prevBundle.Hash { 1129 + if bundle.Parent != prevBundle.Hash { 1149 1130 valid = false 1150 1131 brokenAt = i 1151 - errorMsg = fmt.Sprintf("Chain broken: bundle %06d prev_hash doesn't match bundle %06d hash", i, i-1) 1132 + errorMsg = fmt.Sprintf("Chain broken: bundle %06d parent doesn't match bundle %06d hash", i, i-1) 1152 1133 break 1153 1134 } 1154 1135 } ··· 1191 1172 "chain_start_time": firstBundle.StartTime, 1192 1173 "chain_end_time": lastBundleData.EndTime, 1193 1174 "chain_head_hash": lastBundleData.Hash, 1194 - "first_prev_hash": firstBundle.PrevBundleHash, 1195 - "last_prev_hash": lastBundleData.PrevBundleHash, 1175 + "first_parent": firstBundle.Parent, 1176 + "last_parent": lastBundleData.Parent, 1196 1177 }) 1197 1178 } 1198 1179 ··· 1499 1480 "tables": tableSizes, 1500 1481 "indexes": indexSizes, 1501 1482 "retrievedAt": time.Now().UTC(), 1483 + }) 1484 + } 1485 + 1486 + func (s *Server) handleGetBundleLabels(w http.ResponseWriter, r *http.Request) { 1487 + resp := newResponse(w) 1488 + 1489 + bundleNum, err := getBundleNumber(r) 1490 + if err != nil { 1491 + resp.error("invalid bundle number", http.StatusBadRequest) 1492 + return 1493 + } 1494 + 1495 + labels, err := s.bundleManager.GetBundleLabels(r.Context(), bundleNum) 1496 + if err != nil { 1497 + resp.error(err.Error(), http.StatusInternalServerError) 1498 + return 1499 + } 1500 + 1501 + resp.json(map[string]interface{}{ 1502 + "bundle": bundleNum, 1503 + "count": len(labels), 1504 + "labels": labels, 1502 1505 }) 1503 1506 } 1504 1507
+1
internal/api/server.go
··· 84 84 api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET") 85 85 api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET") 86 86 api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET") 87 + api.HandleFunc("/plc/bundles/{number}/labels", s.handleGetBundleLabels).Methods("GET") 87 88 88 89 // PLC history/metrics 89 90 api.HandleFunc("/plc/history", s.handleGetPLCHistory).Methods("GET")
+44 -45
internal/pds/client.go
··· 84 84 } 85 85 86 86 // DescribeServer fetches com.atproto.server.describeServer 87 - func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, error) { 87 + // Returns: description, responseTime, usedIP, error 88 + func (c *Client) DescribeServer(ctx context.Context, endpoint string) (*ServerDescription, time.Duration, string, error) { 89 + startTime := time.Now() 88 90 url := fmt.Sprintf("%s/xrpc/com.atproto.server.describeServer", endpoint) 89 91 90 - //fmt.Println(url) 92 + // Track which IP was used 93 + var usedIP string 94 + transport := &http.Transport{ 95 + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 96 + conn, err := (&net.Dialer{ 97 + Timeout: 30 * time.Second, 98 + KeepAlive: 30 * time.Second, 99 + }).DialContext(ctx, network, addr) 100 + 101 + if err == nil && conn != nil { 102 + if remoteAddr := conn.RemoteAddr(); remoteAddr != nil { 103 + if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { 104 + usedIP = tcpAddr.IP.String() 105 + } 106 + } 107 + } 108 + return conn, err 109 + }, 110 + } 111 + 112 + client := &http.Client{ 113 + Timeout: c.httpClient.Timeout, 114 + Transport: transport, 115 + } 91 116 92 117 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 93 118 if err != nil { 94 - return nil, err 119 + return nil, 0, "", err 95 120 } 96 121 97 - resp, err := c.httpClient.Do(req) 122 + resp, err := client.Do(req) 123 + responseTime := time.Since(startTime) 124 + 98 125 if err != nil { 99 - return nil, err 126 + return nil, responseTime, usedIP, err 100 127 } 101 128 defer resp.Body.Close() 102 129 103 130 if resp.StatusCode != http.StatusOK { 104 - return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 131 + return nil, responseTime, usedIP, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 105 132 } 106 133 107 134 var desc ServerDescription 108 135 if err := json.NewDecoder(resp.Body).Decode(&desc); err != nil { 109 - return nil, err 136 + return nil, responseTime, usedIP, err 110 137 } 111 138 112 - return &desc, nil 139 + return &desc, responseTime, usedIP, nil 113 140 } 114 141 115 142 // CheckHealth performs a basic health check, ensuring the endpoint returns JSON with a "version" 116 - // Returns: available, responseTime, version, usedIP, error 117 - func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, string, error) { 143 + // Returns: available, responseTime, version, error 144 + func (c *Client) CheckHealth(ctx context.Context, endpoint string) (bool, time.Duration, string, error) { 118 145 startTime := time.Now() 119 146 120 147 url := fmt.Sprintf("%s/xrpc/_health", endpoint) 121 148 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 122 149 if err != nil { 123 - return false, 0, "", "", err 124 - } 125 - 126 - // Create a custom dialer to track which IP was actually used 127 - var usedIP string 128 - transport := &http.Transport{ 129 - DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 130 - conn, err := (&net.Dialer{ 131 - Timeout: 30 * time.Second, 132 - KeepAlive: 30 * time.Second, 133 - }).DialContext(ctx, network, addr) 134 - 135 - if err == nil && conn != nil { 136 - if remoteAddr := conn.RemoteAddr(); remoteAddr != nil { 137 - // Extract IP from "ip:port" format 138 - if tcpAddr, ok := remoteAddr.(*net.TCPAddr); ok { 139 - usedIP = tcpAddr.IP.String() 140 - } 141 - } 142 - } 143 - 144 - return conn, err 145 - }, 150 + return false, 0, "", err 146 151 } 147 152 148 - // Create a client with our custom transport 149 - client := &http.Client{ 150 - Timeout: c.httpClient.Timeout, 151 - Transport: transport, 152 - } 153 - 154 - resp, err := client.Do(req) 153 + resp, err := c.httpClient.Do(req) 155 154 duration := time.Since(startTime) 156 155 157 156 if err != nil { 158 - return false, duration, "", usedIP, err 157 + return false, duration, "", err 159 158 } 160 159 defer resp.Body.Close() 161 160 162 161 if resp.StatusCode != http.StatusOK { 163 - return false, duration, "", usedIP, fmt.Errorf("health check returned status %d", resp.StatusCode) 162 + return false, duration, "", fmt.Errorf("health check returned status %d", resp.StatusCode) 164 163 } 165 164 166 165 // Decode the JSON response and check for "version" ··· 169 168 } 170 169 171 170 if err := json.NewDecoder(resp.Body).Decode(&healthResponse); err != nil { 172 - return false, duration, "", usedIP, fmt.Errorf("failed to decode health JSON: %w", err) 171 + return false, duration, "", fmt.Errorf("failed to decode health JSON: %w", err) 173 172 } 174 173 175 174 if healthResponse.Version == "" { 176 - return false, duration, "", usedIP, fmt.Errorf("health JSON response missing 'version' field") 175 + return false, duration, "", fmt.Errorf("health JSON response missing 'version' field") 177 176 } 178 177 179 178 // All checks passed 180 - return true, duration, healthResponse.Version, usedIP, nil 179 + return true, duration, healthResponse.Version, nil 181 180 }
+31 -27
internal/pds/scanner.go
··· 8 8 "sync/atomic" 9 9 "time" 10 10 11 - "github.com/acarl005/stripansi" 12 11 "github.com/atscan/atscand/internal/config" 13 12 "github.com/atscan/atscand/internal/ipinfo" 14 13 "github.com/atscan/atscand/internal/log" ··· 40 39 servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{ 41 40 Type: "pds", 42 41 OnlyStale: true, 42 + OnlyValid: true, 43 43 RecheckInterval: s.config.RecheckInterval, 44 44 }) 45 45 if err != nil { ··· 127 127 // STEP 1: Resolve IPs (both IPv4 and IPv6) 128 128 ips, err := ipinfo.ExtractIPsFromEndpoint(ep.Endpoint) 129 129 if err != nil { 130 - // Mark as offline due to DNS failure 131 130 s.saveScanResult(ctx, ep.ID, &ScanResult{ 132 131 Status: storage.EndpointStatusOffline, 133 132 ErrorMessage: fmt.Sprintf("DNS resolution failed: %v", err), ··· 146 145 go s.updateIPInfoIfNeeded(ctx, ips.IPv6) 147 146 } 148 147 149 - // STEP 2: Health check (now returns which IP was used) 150 - available, responseTime, version, usedIP, err := s.client.CheckHealth(ctx, ep.Endpoint) 151 - if err != nil || !available { 152 - errMsg := "health check failed" 153 - if err != nil { 154 - errMsg = err.Error() 155 - } 148 + // STEP 2: Call describeServer (primary health check + metadata) 149 + desc, descResponseTime, usedIP, err := s.client.DescribeServer(ctx, ep.Endpoint) 150 + if err != nil { 156 151 s.saveScanResult(ctx, ep.ID, &ScanResult{ 157 152 Status: storage.EndpointStatusOffline, 158 - ResponseTime: responseTime, 159 - ErrorMessage: errMsg, 160 - UsedIP: usedIP, // Save even if failed 153 + ResponseTime: descResponseTime, 154 + ErrorMessage: fmt.Sprintf("describeServer failed: %v", err), 155 + UsedIP: usedIP, 161 156 }) 162 157 return 163 158 } 164 159 165 - // STEP 3: Fetch PDS-specific data 166 - desc, err := s.client.DescribeServer(ctx, ep.Endpoint) 167 - if err != nil { 168 - log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err) 169 - } else if desc != nil && desc.DID != "" { 160 + // Update server DID immediately 161 + if desc.DID != "" { 170 162 s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID) 171 163 } 172 164 173 - // Fetch repos with full info 165 + // STEP 3: Call _health to get version 166 + available, healthResponseTime, version, err := s.client.CheckHealth(ctx, ep.Endpoint) 167 + if err != nil || !available { 168 + log.Verbose("Warning: _health check failed for %s: %v", ep.Endpoint, err) 169 + // Server is online (describeServer worked) but _health failed 170 + // Continue with empty version 171 + version = "" 172 + } 173 + 174 + // Calculate average response time from both calls 175 + avgResponseTime := descResponseTime 176 + if available { 177 + avgResponseTime = (descResponseTime + healthResponseTime) / 2 178 + } 179 + 180 + // STEP 4: Fetch repos 174 181 repoList, err := s.client.ListRepos(ctx, ep.Endpoint) 175 182 if err != nil { 176 183 log.Verbose("Warning: failed to list repos for %s: %v", ep.Endpoint, err) 177 184 repoList = []Repo{} 178 185 } 179 186 180 - // Convert to DIDs for backward compatibility 187 + // Convert to DIDs 181 188 dids := make([]string, len(repoList)) 182 189 for i, repo := range repoList { 183 190 dids[i] = repo.DID 184 191 } 185 192 186 - // STEP 4: SAVE scan result 193 + // STEP 5: SAVE scan result 187 194 s.saveScanResult(ctx, ep.ID, &ScanResult{ 188 195 Status: storage.EndpointStatusOnline, 189 - ResponseTime: responseTime, 196 + ResponseTime: avgResponseTime, 190 197 Description: desc, 191 198 DIDs: dids, 192 199 Version: version, 193 - UsedIP: usedIP, // NEW: Save which IP was used 200 + UsedIP: usedIP, // Only from describeServer 194 201 }) 195 202 196 - // Save repos in batches (only tracks changes) 203 + // STEP 6: Save repos in batches (only tracks changes) 197 204 if len(repoList) > 0 { 198 - batchSize := 200000 205 + batchSize := 100_000 199 206 200 207 log.Verbose("Processing %d repos for %s (tracking changes only)", len(repoList), ep.Endpoint) 201 208 ··· 235 242 236 243 log.Verbose("✓ Processed %d repos for %s", len(repoList), ep.Endpoint) 237 244 } 238 - 239 - // IP info fetch already started at the beginning (step 1.5) 240 - // It will complete in the background 241 245 } 242 246 243 247 func (s *Scanner) saveScanResult(ctx context.Context, endpointID int64, result *ScanResult) {
+136 -1
internal/plc/manager.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/csv" 5 6 "fmt" 6 7 "io" 8 + "os" 9 + "path/filepath" 7 10 "sort" 11 + "strconv" 12 + "strings" 8 13 "time" 9 14 10 15 "github.com/atscan/atscand/internal/log" 11 16 "github.com/atscan/atscand/internal/storage" 12 - plcbundle "github.com/atscan/plcbundle" 17 + "github.com/klauspost/compress/zstd" 18 + plcbundle "tangled.org/atscan.net/plcbundle" 13 19 ) 14 20 15 21 // BundleManager wraps the library's manager with database integration ··· 385 391 386 392 return history, nil 387 393 } 394 + 395 + // GetBundleLabels reads labels from a compressed CSV file for a specific bundle 396 + func (bm *BundleManager) GetBundleLabels(ctx context.Context, bundleNum int) ([]*PLCOpLabel, error) { 397 + // Define the path to the labels file 398 + labelsDir := filepath.Join(bm.bundleDir, "labels") 399 + labelsFile := filepath.Join(labelsDir, fmt.Sprintf("%06d.csv.zst", bundleNum)) 400 + 401 + // Check if file exists 402 + if _, err := os.Stat(labelsFile); os.IsNotExist(err) { 403 + log.Verbose("No labels file found for bundle %d at %s", bundleNum, labelsFile) 404 + // Return empty, not an error 405 + return []*PLCOpLabel{}, nil 406 + } 407 + 408 + // Open the Zstd-compressed file 409 + file, err := os.Open(labelsFile) 410 + if err != nil { 411 + return nil, fmt.Errorf("failed to open labels file: %w", err) 412 + } 413 + defer file.Close() 414 + 415 + // Create a Zstd reader 416 + zstdReader, err := zstd.NewReader(file) 417 + if err != nil { 418 + return nil, fmt.Errorf("failed to create zstd reader: %w", err) 419 + } 420 + defer zstdReader.Close() 421 + 422 + // Create a CSV reader 423 + csvReader := csv.NewReader(zstdReader) 424 + // We skipped the header, so no header read needed 425 + // Set FieldsPerRecord to 7 for validation 426 + //csvReader.FieldsPerRecord = 7 427 + 428 + var labels []*PLCOpLabel 429 + 430 + // Read all records 431 + for { 432 + // Check for context cancellation 433 + if err := ctx.Err(); err != nil { 434 + return nil, err 435 + } 436 + 437 + record, err := csvReader.Read() 438 + if err == io.EOF { 439 + break // End of file 440 + } 441 + if err != nil { 442 + log.Error("Error reading CSV record in %s: %v", labelsFile, err) 443 + continue // Skip bad line 444 + } 445 + 446 + // Parse the CSV record (which is []string) 447 + label, err := parseLabelRecord(record) 448 + if err != nil { 449 + log.Error("Error parsing CSV data for bundle %d: %v", bundleNum, err) 450 + continue // Skip bad data 451 + } 452 + 453 + labels = append(labels, label) 454 + } 455 + 456 + return labels, nil 457 + } 458 + 459 + // parseLabelRecord converts a new format CSV record into a PLCOpLabel struct 460 + func parseLabelRecord(record []string) (*PLCOpLabel, error) { 461 + // New format: 0:bundle, 1:position, 2:cid(short), 3:size, 4:confidence, 5:labels 462 + if len(record) != 6 { 463 + err := fmt.Errorf("invalid record length: expected 6, got %d", len(record)) 464 + // --- ADDED LOG --- 465 + log.Warn("Skipping malformed CSV line: %v (data: %s)", err, strings.Join(record, ",")) 466 + // --- 467 + return nil, err 468 + } 469 + 470 + // 0:bundle 471 + bundle, err := strconv.Atoi(record[0]) 472 + if err != nil { 473 + // --- ADDED LOG --- 474 + log.Warn("Skipping malformed CSV line: 'bundle' column: %v (data: %s)", err, strings.Join(record, ",")) 475 + // --- 476 + return nil, fmt.Errorf("parsing 'bundle': %w", err) 477 + } 478 + 479 + // 1:position 480 + position, err := strconv.Atoi(record[1]) 481 + if err != nil { 482 + // --- ADDED LOG --- 483 + log.Warn("Skipping malformed CSV line: 'position' column: %v (data: %s)", err, strings.Join(record, ",")) 484 + // --- 485 + return nil, fmt.Errorf("parsing 'position': %w", err) 486 + } 487 + 488 + // 2:cid(short) 489 + shortCID := record[2] 490 + 491 + // 3:size 492 + size, err := strconv.Atoi(record[3]) 493 + if err != nil { 494 + // --- ADDED LOG --- 495 + log.Warn("Skipping malformed CSV line: 'size' column: %v (data: %s)", err, strings.Join(record, ",")) 496 + // --- 497 + return nil, fmt.Errorf("parsing 'size': %w", err) 498 + } 499 + 500 + // 4:confidence 501 + confidence, err := strconv.ParseFloat(record[4], 64) 502 + if err != nil { 503 + // --- ADDED LOG --- 504 + log.Warn("Skipping malformed CSV line: 'confidence' column: %v (data: %s)", err, strings.Join(record, ",")) 505 + // --- 506 + return nil, fmt.Errorf("parsing 'confidence': %w", err) 507 + } 508 + 509 + // 5:labels 510 + detectors := strings.Split(record[5], ";") 511 + 512 + label := &PLCOpLabel{ 513 + Bundle: bundle, 514 + Position: position, 515 + CID: shortCID, 516 + Size: size, 517 + Confidence: confidence, 518 + Detectors: detectors, 519 + } 520 + 521 + return label, nil 522 + }
+2
internal/plc/scanner.go
··· 190 190 } 191 191 192 192 func (s *Scanner) storeEndpoint(ctx context.Context, epType, endpoint string, discoveredAt time.Time) error { 193 + valid := validateEndpoint(endpoint) 193 194 return s.db.UpsertEndpoint(ctx, &storage.Endpoint{ 194 195 EndpointType: epType, 195 196 Endpoint: endpoint, 196 197 DiscoveredAt: discoveredAt, 197 198 LastChecked: time.Time{}, 198 199 Status: storage.EndpointStatusUnknown, 200 + Valid: valid, 199 201 }) 200 202 } 201 203
+60 -1
internal/plc/types.go
··· 1 1 package plc 2 2 3 3 import ( 4 - plclib "github.com/atscan/plcbundle/plc" 4 + "net/url" 5 + "strings" 6 + 7 + plclib "tangled.org/atscan.net/plcbundle/plc" 5 8 ) 6 9 7 10 // Re-export library types ··· 28 31 Type string 29 32 Endpoint string 30 33 } 34 + 35 + // PLCOpLabel holds metadata from the label CSV file 36 + type PLCOpLabel struct { 37 + Bundle int `json:"bundle"` 38 + Position int `json:"position"` 39 + CID string `json:"cid"` 40 + Size int `json:"size"` 41 + Confidence float64 `json:"confidence"` 42 + Detectors []string `json:"detectors"` 43 + } 44 + 45 + // validateEndpoint checks if endpoint is in correct format: https://<domain> 46 + func validateEndpoint(endpoint string) bool { 47 + // Must not be empty 48 + if endpoint == "" { 49 + return false 50 + } 51 + 52 + // Must not have trailing slash 53 + if strings.HasSuffix(endpoint, "/") { 54 + return false 55 + } 56 + 57 + // Parse URL 58 + u, err := url.Parse(endpoint) 59 + if err != nil { 60 + return false 61 + } 62 + 63 + // Must use https scheme 64 + if u.Scheme != "https" { 65 + return false 66 + } 67 + 68 + // Must have a host 69 + if u.Host == "" { 70 + return false 71 + } 72 + 73 + // Must not have path (except empty) 74 + if u.Path != "" && u.Path != "/" { 75 + return false 76 + } 77 + 78 + // Must not have query parameters 79 + if u.RawQuery != "" { 80 + return false 81 + } 82 + 83 + // Must not have fragment 84 + if u.Fragment != "" { 85 + return false 86 + } 87 + 88 + return true 89 + }
+24 -17
internal/storage/postgres.go
··· 84 84 ip TEXT, 85 85 ipv6 TEXT, 86 86 ip_resolved_at TIMESTAMP, 87 + valid BOOLEAN DEFAULT true, 87 88 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 88 89 UNIQUE(endpoint_type, endpoint) 89 90 ); ··· 95 96 CREATE INDEX IF NOT EXISTS idx_endpoints_ipv6 ON endpoints(ipv6); 96 97 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did); 97 98 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did_type_discovered ON endpoints(server_did, endpoint_type, discovered_at); 99 + CREATE INDEX IF NOT EXISTS idx_endpoints_valid ON endpoints(valid); 98 100 99 101 -- IP infos table (IP as PRIMARY KEY) 100 102 CREATE TABLE IF NOT EXISTS ip_infos ( ··· 208 210 209 211 func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 210 212 query := ` 211 - INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at) 212 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 213 + INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at, valid) 214 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 213 215 ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 214 216 last_checked = EXCLUDED.last_checked, 215 217 status = EXCLUDED.status, ··· 225 227 WHEN (EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '') OR (EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '') THEN EXCLUDED.ip_resolved_at 226 228 ELSE endpoints.ip_resolved_at 227 229 END, 230 + valid = EXCLUDED.valid, 228 231 updated_at = CURRENT_TIMESTAMP 229 232 RETURNING id 230 233 ` 231 234 err := p.db.QueryRowContext(ctx, query, 232 235 endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 233 - endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt).Scan(&endpoint.ID) 236 + endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt, endpoint.Valid).Scan(&endpoint.ID) 234 237 return err 235 238 } 236 239 ··· 251 254 func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 252 255 query := ` 253 256 SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, 254 - ip, ipv6, ip_resolved_at, updated_at 257 + ip, ipv6, ip_resolved_at, valid, updated_at 255 258 FROM endpoints 256 259 WHERE endpoint = $1 AND endpoint_type = $2 257 260 ` ··· 262 265 263 266 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 264 267 &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 265 - &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.UpdatedAt, 268 + &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.Valid, &ep.UpdatedAt, 266 269 ) 267 270 if err != nil { 268 271 return nil, err ··· 288 291 query := ` 289 292 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 290 293 id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status, 291 - ip, ipv6, ip_resolved_at, updated_at 294 + ip, ipv6, ip_resolved_at, valid, updated_at 292 295 FROM endpoints 293 296 WHERE 1=1 294 297 ` ··· 300 303 query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx) 301 304 args = append(args, filter.Type) 302 305 argIdx++ 306 + } 307 + 308 + // NEW: Filter by valid flag 309 + if filter.OnlyValid { 310 + query += fmt.Sprintf(" AND valid = true", argIdx) 303 311 } 304 312 if filter.Status != "" { 305 313 statusInt := EndpointStatusUnknown ··· 566 574 last_checked, 567 575 status, 568 576 ip, 569 - ipv6 577 + ipv6, 578 + valid 570 579 FROM endpoints 571 580 WHERE endpoint_type = 'pds' 572 581 ORDER BY COALESCE(server_did, id::text), discovered_at ASC 573 582 ) 574 583 SELECT 575 - e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, 584 + e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, e.valid, 576 585 latest.user_count, latest.response_time, latest.version, latest.scanned_at, 577 586 i.city, i.country, i.country_code, i.asn, i.asn_org, 578 587 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, ··· 643 652 var scannedAt sql.NullTime 644 653 645 654 err := rows.Scan( 646 - &item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, 655 + &item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, &item.Valid, 647 656 &userCount, &responseTime, &version, &scannedAt, 648 657 &city, &country, &countryCode, &asn, &asnOrg, 649 658 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, ··· 705 714 706 715 func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 707 716 query := ` 708 - WITH target_endpoint AS MATERIALIZED ( -- MATERIALIZED fence for optimization 717 + WITH target_endpoint AS MATERIALIZED ( 709 718 SELECT 710 719 e.id, 711 720 e.endpoint, ··· 714 723 e.last_checked, 715 724 e.status, 716 725 e.ip, 717 - e.ipv6 726 + e.ipv6, 727 + e.valid 718 728 FROM endpoints e 719 729 WHERE e.endpoint = $1 720 730 AND e.endpoint_type = 'pds' 721 - LIMIT 1 -- Early termination since we expect exactly 1 row 731 + LIMIT 1 722 732 ) 723 733 SELECT 724 734 te.id, ··· 729 739 te.status, 730 740 te.ip, 731 741 te.ipv6, 742 + te.valid, 732 743 latest.user_count, 733 744 latest.response_time, 734 745 latest.version, ··· 738 749 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, 739 750 i.latitude, i.longitude, 740 751 i.raw_data, 741 - -- Inline aliases aggregation (avoid second CTE) 742 752 COALESCE( 743 753 ARRAY( 744 754 SELECT e2.endpoint ··· 751 761 ), 752 762 ARRAY[]::text[] 753 763 ) as aliases, 754 - -- Inline first_discovered_at (avoid aggregation) 755 764 CASE 756 765 WHEN te.server_did IS NOT NULL THEN ( 757 766 SELECT MIN(e3.discovered_at) ··· 792 801 var firstDiscoveredAt sql.NullTime 793 802 794 803 err := p.db.QueryRowContext(ctx, query, endpoint).Scan( 795 - &detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, 804 + &detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, &detail.Valid, 796 805 &userCount, &responseTime, &version, &serverInfoJSON, &scannedAt, 797 806 &city, &country, &countryCode, &asn, &asnOrg, 798 807 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, ··· 819 828 // Set aliases and is_primary 820 829 detail.Aliases = aliases 821 830 if serverDID.Valid && serverDID.String != "" && firstDiscoveredAt.Valid { 822 - // Has server_did - check if this is the first discovered 823 831 detail.IsPrimary = detail.DiscoveredAt.Equal(firstDiscoveredAt.Time) || 824 832 detail.DiscoveredAt.Before(firstDiscoveredAt.Time) 825 833 } else { 826 - // No server_did means unique server 827 834 detail.IsPrimary = true 828 835 } 829 836
+7 -4
internal/storage/types.go
··· 26 26 LastChecked time.Time 27 27 Status int 28 28 IP string 29 - IPv6 string // NEW 29 + IPv6 string 30 30 IPResolvedAt time.Time 31 + Valid bool 31 32 UpdatedAt time.Time 32 33 } 33 34 ··· 76 77 77 78 // EndpointFilter for querying endpoints 78 79 type EndpointFilter struct { 79 - Type string // "pds", "labeler", etc. 80 + Type string 80 81 Status string 81 82 MinUserCount int64 82 83 OnlyStale bool 84 + OnlyValid bool 83 85 RecheckInterval time.Duration 84 - Random bool // NEW: Return results in random order 86 + Random bool 85 87 Limit int 86 88 Offset int 87 89 } ··· 213 215 LastChecked time.Time 214 216 Status int 215 217 IP string 216 - IPv6 string // NEW 218 + IPv6 string 219 + Valid bool // NEW 217 220 218 221 // From latest endpoint_scans (via JOIN) 219 222 LatestScan *struct {
+113
utils/import-labels.js
··· 1 + import { file, write } from "bun"; 2 + import { join } from "path"; 3 + import { mkdir } from "fs/promises"; 4 + import { init, compress } from "@bokuweb/zstd-wasm"; 5 + 6 + // --- Configuration --- 7 + const CSV_FILE = process.argv[2]; 8 + const CONFIG_FILE = "config.yaml"; 9 + const COMPRESSION_LEVEL = 5; // zstd level 1-22 (5 is a good balance) 10 + // --------------------- 11 + 12 + if (!CSV_FILE) { 13 + console.error("Usage: bun run utils/import-labels.js <path-to-csv-file>"); 14 + process.exit(1); 15 + } 16 + 17 + console.log("========================================"); 18 + console.log("PLC Operation Labels Import (Bun + WASM)"); 19 + console.log("========================================"); 20 + 21 + // 1. Read and parse config 22 + console.log(`Loading config from ${CONFIG_FILE}...`); 23 + const configFile = await file(CONFIG_FILE).text(); 24 + const config = Bun.YAML.parse(configFile); 25 + const bundleDir = config?.plc?.bundle_dir; 26 + 27 + if (!bundleDir) { 28 + console.error("Error: Could not parse plc.bundle_dir from config.yaml"); 29 + process.exit(1); 30 + } 31 + 32 + const FINAL_LABELS_DIR = join(bundleDir, "labels"); 33 + await mkdir(FINAL_LABELS_DIR, { recursive: true }); 34 + 35 + console.log(`CSV File: ${CSV_FILE}`); 36 + console.log(`Output Dir: ${FINAL_LABELS_DIR}`); 37 + console.log(""); 38 + 39 + // 2. Initialize Zstd WASM module 40 + await init(); 41 + 42 + // --- Pass 1: Read entire file into memory and group by bundle --- 43 + console.log("Pass 1/2: Reading and grouping all lines by bundle..."); 44 + console.warn("This will use a large amount of RAM!"); 45 + 46 + const startTime = Date.now(); 47 + const bundles = new Map(); // Map<string, string[]> 48 + let lineCount = 0; 49 + 50 + const inputFile = file(CSV_FILE); 51 + const fileStream = inputFile.stream(); 52 + const decoder = new TextDecoder(); 53 + let remainder = ""; 54 + 55 + for await (const chunk of fileStream) { 56 + const text = remainder + decoder.decode(chunk); 57 + const lines = text.split("\n"); 58 + remainder = lines.pop() || ""; 59 + 60 + for (const line of lines) { 61 + if (line === "") continue; 62 + lineCount++; 63 + 64 + if (lineCount === 1 && line.startsWith("bundle,")) { 65 + continue; // Skip header 66 + } 67 + 68 + const firstCommaIndex = line.indexOf(","); 69 + if (firstCommaIndex === -1) { 70 + console.warn(`Skipping malformed line: ${line}`); 71 + continue; 72 + } 73 + const bundleNumStr = line.substring(0, firstCommaIndex); 74 + const bundleKey = bundleNumStr.padStart(6, "0"); 75 + 76 + // Add line to the correct bundle's array 77 + if (!bundles.has(bundleKey)) { 78 + bundles.set(bundleKey, []); 79 + } 80 + bundles.get(bundleKey).push(line); 81 + } 82 + } 83 + // Note: We ignore any final `remainder` as it's likely an empty line 84 + 85 + console.log(`Finished reading ${lineCount.toLocaleString()} lines.`); 86 + console.log(`Found ${bundles.size} unique bundles.`); 87 + 88 + // --- Pass 2: Compress and write each bundle --- 89 + console.log("\nPass 2/2: Compressing and writing bundle files..."); 90 + let i = 0; 91 + for (const [bundleKey, lines] of bundles.entries()) { 92 + i++; 93 + console.log(` (${i}/${bundles.size}) Compressing bundle ${bundleKey}...`); 94 + 95 + // Join all lines for this bundle into one big string 96 + const content = lines.join("\n"); 97 + 98 + // Compress the string 99 + const compressedData = compress(Buffer.from(content), COMPRESSION_LEVEL); 100 + 101 + // Write the compressed data to the file 102 + const outPath = join(FINAL_LABELS_DIR, `${bundleKey}.csv.zst`); 103 + await write(outPath, compressedData); 104 + } 105 + 106 + // 3. Clean up 107 + const totalTime = (Date.now() - startTime) / 1000; 108 + console.log("\n========================================"); 109 + console.log("Import Summary"); 110 + console.log("========================================"); 111 + console.log(`✓ Import completed in ${totalTime.toFixed(2)} seconds.`); 112 + console.log(`Total lines processed: ${lineCount.toLocaleString()}`); 113 + console.log(`Label files are stored in: ${FINAL_LABELS_DIR}`);
+91
utils/import-labels.sh
··· 1 + #!/bin/bash 2 + # import-labels-v4-sorted-pipe.sh 3 + 4 + set -e 5 + 6 + if [ $# -lt 1 ]; then 7 + echo "Usage: ./utils/import-labels-v4-sorted-pipe.sh <csv-file>" 8 + exit 1 9 + fi 10 + 11 + CSV_FILE="$1" 12 + CONFIG_FILE="config.yaml" 13 + 14 + [ ! -f "$CSV_FILE" ] && echo "Error: CSV file not found" && exit 1 15 + [ ! -f "$CONFIG_FILE" ] && echo "Error: config.yaml not found" && exit 1 16 + 17 + # Extract bundle directory path 18 + BUNDLE_DIR=$(grep -A 5 "^plc:" "$CONFIG_FILE" | grep "bundle_dir:" | sed 's/.*bundle_dir: *"//' | sed 's/".*//' | head -1) 19 + 20 + [ -z "$BUNDLE_DIR" ] && echo "Error: Could not parse plc.bundle_dir from config.yaml" && exit 1 21 + 22 + FINAL_LABELS_DIR="$BUNDLE_DIR/labels" 23 + 24 + echo "========================================" 25 + echo "PLC Operation Labels Import (Sorted Pipe)" 26 + echo "========================================" 27 + echo "CSV File: $CSV_FILE" 28 + echo "Output Dir: $FINAL_LABELS_DIR" 29 + echo "" 30 + 31 + # Ensure the final directory exists 32 + mkdir -p "$FINAL_LABELS_DIR" 33 + 34 + echo "Streaming, sorting, and compressing on the fly..." 35 + echo "This will take time. `pv` will show progress of the TAIL command." 36 + echo "The `sort` command will run after `pv` is complete." 37 + echo "" 38 + 39 + # This is the single-pass pipeline 40 + tail -n +2 "$CSV_FILE" | \ 41 + pv -l -s $(tail -n +2 "$CSV_FILE" | wc -l) | \ 42 + sort -t, -k1,1n | \ 43 + awk -F',' -v final_dir="$FINAL_LABELS_DIR" ' 44 + # This awk script EXPECTS input sorted by bundle number (col 1) 45 + BEGIN { 46 + # last_bundle_num tracks the bundle we are currently writing 47 + last_bundle_num = -1 48 + # cmd holds the current zstd pipe command 49 + cmd = "" 50 + } 51 + { 52 + current_bundle_num = $1 53 + 54 + # Check if the bundle number has changed 55 + if (current_bundle_num != last_bundle_num) { 56 + 57 + # If it changed, and we have an old pipe open, close it 58 + if (last_bundle_num != -1) { 59 + close(cmd) 60 + } 61 + 62 + # Create the new pipe command, writing to the final .zst file 63 + outfile = sprintf("%s/%06d.csv.zst", final_dir, current_bundle_num) 64 + cmd = "zstd -T0 -o " outfile 65 + 66 + # Update the tracker 67 + last_bundle_num = current_bundle_num 68 + 69 + # Print progress to stderr 70 + printf " -> Writing bundle %06d\n", current_bundle_num > "/dev/stderr" 71 + } 72 + 73 + # Print the current line ($0) to the open pipe 74 + # The first time this runs for a bundle, it opens the pipe 75 + # Subsequent times, it writes to the already-open pipe 76 + print $0 | cmd 77 + } 78 + # END block: close the very last pipe 79 + END { 80 + if (last_bundle_num != -1) { 81 + close(cmd) 82 + } 83 + printf " Finished. Total lines: %d\n", NR > "/dev/stderr" 84 + }' 85 + 86 + echo "" 87 + echo "========================================" 88 + echo "Import Summary" 89 + echo "========================================" 90 + echo "✓ Import completed successfully!" 91 + echo "Label files are stored in: $FINAL_LABELS_DIR"