bundle labels

Changed files
+541 -1
cmd
import-labels
internal
utils
+168
cmd/import-labels/main.go
··· 1 + package main 2 + 3 + import ( 4 + "bufio" 5 + "flag" 6 + "fmt" 7 + "os" 8 + "path/filepath" 9 + "strings" 10 + "time" 11 + 12 + "github.com/klauspost/compress/zstd" 13 + "gopkg.in/yaml.v3" 14 + ) 15 + 16 + type Config struct { 17 + PLC struct { 18 + BundleDir string `yaml:"bundle_dir"` 19 + } `yaml:"plc"` 20 + } 21 + 22 + var CONFIG_FILE = "config.yaml" 23 + 24 + // --------------------- 25 + 26 + func main() { 27 + // Define a new flag for changing the directory 28 + workDir := flag.String("C", ".", "Change to this directory before running (for finding config.yaml)") 29 + flag.Usage = func() { // Custom usage message 30 + fmt.Fprintf(os.Stderr, "Usage: ... | %s [-C /path/to/dir]\n", os.Args[0]) 31 + fmt.Fprintln(os.Stderr, "Reads sorted CSV from stdin and writes compressed bundle files.") 32 + flag.PrintDefaults() 33 + } 34 + flag.Parse() // Parse all defined flags 35 + 36 + // Change directory if the flag was used 37 + if *workDir != "." { 38 + fmt.Printf("Changing working directory to %s...\n", *workDir) 39 + if err := os.Chdir(*workDir); err != nil { 40 + fmt.Fprintf(os.Stderr, "Error changing directory to %s: %v\n", *workDir, err) 41 + os.Exit(1) 42 + } 43 + } 44 + 45 + // --- REMOVED UNUSED CODE --- 46 + // The csvFilePath variable and NArg check were removed 47 + // as the script now reads from stdin. 48 + // --------------------------- 49 + 50 + fmt.Println("========================================") 51 + fmt.Println("PLC Operation Labels Import (Go STDIN)") 52 + fmt.Println("========================================") 53 + 54 + // 1. Read config (will now read from the new CWD) 55 + fmt.Printf("Loading config from %s...\n", CONFIG_FILE) 56 + configData, err := os.ReadFile(CONFIG_FILE) 57 + if err != nil { 58 + fmt.Fprintf(os.Stderr, "Error reading config file: %v\n", err) 59 + os.Exit(1) 60 + } 61 + 62 + var config Config 63 + if err := yaml.Unmarshal(configData, &config); err != nil { 64 + fmt.Fprintf(os.Stderr, "Error parsing config.yaml: %v\n", err) 65 + os.Exit(1) 66 + } 67 + 68 + if config.PLC.BundleDir == "" { 69 + fmt.Fprintln(os.Stderr, "Error: Could not parse plc.bundle_dir from config.yaml") 70 + os.Exit(1) 71 + } 72 + 73 + finalLabelsDir := filepath.Join(config.PLC.BundleDir, "labels") 74 + if err := os.MkdirAll(finalLabelsDir, 0755); err != nil { 75 + fmt.Fprintf(os.Stderr, "Error creating output directory: %v\n", err) 76 + os.Exit(1) 77 + } 78 + 79 + fmt.Printf("Output Dir: %s\n", finalLabelsDir) 80 + fmt.Println("Waiting for sorted data from stdin...") 81 + 82 + // 2. Process sorted data from stdin 83 + // This script *requires* the input to be sorted by bundle number. 84 + 85 + var currentWriter *zstd.Encoder 86 + var currentFile *os.File 87 + var lastBundleKey string = "" 88 + 89 + lineCount := 0 90 + startTime := time.Now() 91 + 92 + scanner := bufio.NewScanner(os.Stdin) 93 + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) 94 + 95 + for scanner.Scan() { 96 + line := scanner.Text() 97 + lineCount++ 98 + 99 + parts := strings.SplitN(line, ",", 2) 100 + if len(parts) < 1 { 101 + continue // Skip empty/bad lines 102 + } 103 + 104 + bundleNumStr := parts[0] 105 + bundleKey := fmt.Sprintf("%06s", bundleNumStr) // Pad with zeros 106 + 107 + // If the bundle key is new, close the old writer and open a new one. 108 + if bundleKey != lastBundleKey { 109 + // Close the previous writer/file 110 + if currentWriter != nil { 111 + if err := currentWriter.Close(); err != nil { 112 + fmt.Fprintf(os.Stderr, "Error closing writer for %s: %v\n", lastBundleKey, err) 113 + } 114 + currentFile.Close() 115 + } 116 + 117 + // Start the new one 118 + fmt.Printf(" -> Writing bundle %s\n", bundleKey) 119 + outPath := filepath.Join(finalLabelsDir, fmt.Sprintf("%s.csv.zst", bundleKey)) 120 + 121 + file, err := os.Create(outPath) 122 + if err != nil { 123 + fmt.Fprintf(os.Stderr, "Error creating file %s: %v\n", outPath, err) 124 + os.Exit(1) 125 + } 126 + currentFile = file 127 + 128 + writer, err := zstd.NewWriter(file) 129 + if err != nil { 130 + fmt.Fprintf(os.Stderr, "Error creating zstd writer: %v\n", err) 131 + os.Exit(1) 132 + } 133 + currentWriter = writer 134 + lastBundleKey = bundleKey 135 + } 136 + 137 + // Write the line to the currently active writer 138 + if _, err := currentWriter.Write([]byte(line + "\n")); err != nil { 139 + fmt.Fprintf(os.Stderr, "Error writing line: %v\n", err) 140 + } 141 + 142 + // Progress update 143 + if lineCount%100000 == 0 { 144 + elapsed := time.Since(startTime).Seconds() 145 + rate := float64(lineCount) / elapsed 146 + fmt.Printf(" ... processed %,d lines (%.0f lines/sec)\n", lineCount, rate) 147 + } 148 + } 149 + 150 + // 3. Close the very last writer 151 + if currentWriter != nil { 152 + if err := currentWriter.Close(); err != nil { 153 + fmt.Fprintf(os.Stderr, "Error closing final writer: %v\n", err) 154 + } 155 + currentFile.Close() 156 + } 157 + 158 + if err := scanner.Err(); err != nil { 159 + fmt.Fprintf(os.Stderr, "Error reading stdin: %v\n", err) 160 + } 161 + 162 + totalTime := time.Since(startTime) 163 + fmt.Println("\n========================================") 164 + fmt.Println("Import Summary") 165 + fmt.Println("========================================") 166 + fmt.Printf("✓ Import completed in %v\n", totalTime) 167 + fmt.Printf("Total lines processed: %,d\n", lineCount) 168 + }
+1 -1
go.mod
··· 8 8 gopkg.in/yaml.v3 v3.0.1 9 9 ) 10 10 11 - require github.com/klauspost/compress v1.18.1 // indirect 11 + require github.com/klauspost/compress v1.18.1 12 12 13 13 require ( 14 14 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
+22
internal/api/handlers.go
··· 1482 1482 }) 1483 1483 } 1484 1484 1485 + func (s *Server) handleGetBundleLabels(w http.ResponseWriter, r *http.Request) { 1486 + resp := newResponse(w) 1487 + 1488 + bundleNum, err := getBundleNumber(r) 1489 + if err != nil { 1490 + resp.error("invalid bundle number", http.StatusBadRequest) 1491 + return 1492 + } 1493 + 1494 + labels, err := s.bundleManager.GetBundleLabels(r.Context(), bundleNum) 1495 + if err != nil { 1496 + resp.error(err.Error(), http.StatusInternalServerError) 1497 + return 1498 + } 1499 + 1500 + resp.json(map[string]interface{}{ 1501 + "bundle": bundleNum, 1502 + "count": len(labels), 1503 + "labels": labels, 1504 + }) 1505 + } 1506 + 1485 1507 // ===== UTILITY FUNCTIONS ===== 1486 1508 1487 1509 func normalizeEndpoint(endpoint string) string {
+1
internal/api/server.go
··· 84 84 api.HandleFunc("/plc/bundles/{number}", s.handleGetPLCBundle).Methods("GET") 85 85 api.HandleFunc("/plc/bundles/{number}/dids", s.handleGetPLCBundleDIDs).Methods("GET") 86 86 api.HandleFunc("/plc/bundles/{number}/download", s.handleDownloadPLCBundle).Methods("GET") 87 + api.HandleFunc("/plc/bundles/{number}/labels", s.handleGetBundleLabels).Methods("GET") 87 88 88 89 // PLC history/metrics 89 90 api.HandleFunc("/plc/history", s.handleGetPLCHistory).Methods("GET")
+135
internal/plc/manager.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/csv" 5 6 "fmt" 6 7 "io" 8 + "os" 9 + "path/filepath" 7 10 "sort" 11 + "strconv" 12 + "strings" 8 13 "time" 9 14 10 15 "github.com/atscan/atscand/internal/log" 11 16 "github.com/atscan/atscand/internal/storage" 17 + "github.com/klauspost/compress/zstd" 12 18 plcbundle "tangled.org/atscan.net/plcbundle" 13 19 ) 14 20 ··· 385 391 386 392 return history, nil 387 393 } 394 + 395 + // GetBundleLabels reads labels from a compressed CSV file for a specific bundle 396 + func (bm *BundleManager) GetBundleLabels(ctx context.Context, bundleNum int) ([]*PLCOpLabel, error) { 397 + // Define the path to the labels file 398 + labelsDir := filepath.Join(bm.bundleDir, "labels") 399 + labelsFile := filepath.Join(labelsDir, fmt.Sprintf("%06d.csv.zst", bundleNum)) 400 + 401 + // Check if file exists 402 + if _, err := os.Stat(labelsFile); os.IsNotExist(err) { 403 + log.Verbose("No labels file found for bundle %d at %s", bundleNum, labelsFile) 404 + // Return empty, not an error 405 + return []*PLCOpLabel{}, nil 406 + } 407 + 408 + // Open the Zstd-compressed file 409 + file, err := os.Open(labelsFile) 410 + if err != nil { 411 + return nil, fmt.Errorf("failed to open labels file: %w", err) 412 + } 413 + defer file.Close() 414 + 415 + // Create a Zstd reader 416 + zstdReader, err := zstd.NewReader(file) 417 + if err != nil { 418 + return nil, fmt.Errorf("failed to create zstd reader: %w", err) 419 + } 420 + defer zstdReader.Close() 421 + 422 + // Create a CSV reader 423 + csvReader := csv.NewReader(zstdReader) 424 + // We skipped the header, so no header read needed 425 + // Set FieldsPerRecord to 7 for validation 426 + //csvReader.FieldsPerRecord = 7 427 + 428 + var labels []*PLCOpLabel 429 + 430 + // Read all records 431 + for { 432 + // Check for context cancellation 433 + if err := ctx.Err(); err != nil { 434 + return nil, err 435 + } 436 + 437 + record, err := csvReader.Read() 438 + if err == io.EOF { 439 + break // End of file 440 + } 441 + if err != nil { 442 + log.Error("Error reading CSV record in %s: %v", labelsFile, err) 443 + continue // Skip bad line 444 + } 445 + 446 + // Parse the CSV record (which is []string) 447 + label, err := parseLabelRecord(record) 448 + if err != nil { 449 + log.Error("Error parsing CSV data for bundle %d: %v", bundleNum, err) 450 + continue // Skip bad data 451 + } 452 + 453 + labels = append(labels, label) 454 + } 455 + 456 + return labels, nil 457 + } 458 + 459 + // parseLabelRecord converts a new format CSV record into a PLCOpLabel struct 460 + func parseLabelRecord(record []string) (*PLCOpLabel, error) { 461 + // New format: 0:bundle, 1:position, 2:cid(short), 3:size, 4:confidence, 5:labels 462 + if len(record) != 6 { 463 + err := fmt.Errorf("invalid record length: expected 6, got %d", len(record)) 464 + // --- ADDED LOG --- 465 + log.Warn("Skipping malformed CSV line: %v (data: %s)", err, strings.Join(record, ",")) 466 + // --- 467 + return nil, err 468 + } 469 + 470 + // 0:bundle 471 + bundle, err := strconv.Atoi(record[0]) 472 + if err != nil { 473 + // --- ADDED LOG --- 474 + log.Warn("Skipping malformed CSV line: 'bundle' column: %v (data: %s)", err, strings.Join(record, ",")) 475 + // --- 476 + return nil, fmt.Errorf("parsing 'bundle': %w", err) 477 + } 478 + 479 + // 1:position 480 + position, err := strconv.Atoi(record[1]) 481 + if err != nil { 482 + // --- ADDED LOG --- 483 + log.Warn("Skipping malformed CSV line: 'position' column: %v (data: %s)", err, strings.Join(record, ",")) 484 + // --- 485 + return nil, fmt.Errorf("parsing 'position': %w", err) 486 + } 487 + 488 + // 2:cid(short) 489 + shortCID := record[2] 490 + 491 + // 3:size 492 + size, err := strconv.Atoi(record[3]) 493 + if err != nil { 494 + // --- ADDED LOG --- 495 + log.Warn("Skipping malformed CSV line: 'size' column: %v (data: %s)", err, strings.Join(record, ",")) 496 + // --- 497 + return nil, fmt.Errorf("parsing 'size': %w", err) 498 + } 499 + 500 + // 4:confidence 501 + confidence, err := strconv.ParseFloat(record[4], 64) 502 + if err != nil { 503 + // --- ADDED LOG --- 504 + log.Warn("Skipping malformed CSV line: 'confidence' column: %v (data: %s)", err, strings.Join(record, ",")) 505 + // --- 506 + return nil, fmt.Errorf("parsing 'confidence': %w", err) 507 + } 508 + 509 + // 5:labels 510 + detectors := strings.Split(record[5], ";") 511 + 512 + label := &PLCOpLabel{ 513 + Bundle: bundle, 514 + Position: position, 515 + CID: shortCID, 516 + Size: size, 517 + Confidence: confidence, 518 + Detectors: detectors, 519 + } 520 + 521 + return label, nil 522 + }
+10
internal/plc/types.go
··· 28 28 Type string 29 29 Endpoint string 30 30 } 31 + 32 + // PLCOpLabel holds metadata from the label CSV file 33 + type PLCOpLabel struct { 34 + Bundle int `json:"bundle"` 35 + Position int `json:"position"` 36 + CID string `json:"cid"` 37 + Size int `json:"size"` 38 + Confidence float64 `json:"confidence"` 39 + Detectors []string `json:"detectors"` 40 + }
+113
utils/import-labels.js
··· 1 + import { file, write } from "bun"; 2 + import { join } from "path"; 3 + import { mkdir } from "fs/promises"; 4 + import { init, compress } from "@bokuweb/zstd-wasm"; 5 + 6 + // --- Configuration --- 7 + const CSV_FILE = process.argv[2]; 8 + const CONFIG_FILE = "config.yaml"; 9 + const COMPRESSION_LEVEL = 5; // zstd level 1-22 (5 is a good balance) 10 + // --------------------- 11 + 12 + if (!CSV_FILE) { 13 + console.error("Usage: bun run utils/import-labels.js <path-to-csv-file>"); 14 + process.exit(1); 15 + } 16 + 17 + console.log("========================================"); 18 + console.log("PLC Operation Labels Import (Bun + WASM)"); 19 + console.log("========================================"); 20 + 21 + // 1. Read and parse config 22 + console.log(`Loading config from ${CONFIG_FILE}...`); 23 + const configFile = await file(CONFIG_FILE).text(); 24 + const config = Bun.YAML.parse(configFile); 25 + const bundleDir = config?.plc?.bundle_dir; 26 + 27 + if (!bundleDir) { 28 + console.error("Error: Could not parse plc.bundle_dir from config.yaml"); 29 + process.exit(1); 30 + } 31 + 32 + const FINAL_LABELS_DIR = join(bundleDir, "labels"); 33 + await mkdir(FINAL_LABELS_DIR, { recursive: true }); 34 + 35 + console.log(`CSV File: ${CSV_FILE}`); 36 + console.log(`Output Dir: ${FINAL_LABELS_DIR}`); 37 + console.log(""); 38 + 39 + // 2. Initialize Zstd WASM module 40 + await init(); 41 + 42 + // --- Pass 1: Read entire file into memory and group by bundle --- 43 + console.log("Pass 1/2: Reading and grouping all lines by bundle..."); 44 + console.warn("This will use a large amount of RAM!"); 45 + 46 + const startTime = Date.now(); 47 + const bundles = new Map(); // Map<string, string[]> 48 + let lineCount = 0; 49 + 50 + const inputFile = file(CSV_FILE); 51 + const fileStream = inputFile.stream(); 52 + const decoder = new TextDecoder(); 53 + let remainder = ""; 54 + 55 + for await (const chunk of fileStream) { 56 + const text = remainder + decoder.decode(chunk); 57 + const lines = text.split("\n"); 58 + remainder = lines.pop() || ""; 59 + 60 + for (const line of lines) { 61 + if (line === "") continue; 62 + lineCount++; 63 + 64 + if (lineCount === 1 && line.startsWith("bundle,")) { 65 + continue; // Skip header 66 + } 67 + 68 + const firstCommaIndex = line.indexOf(","); 69 + if (firstCommaIndex === -1) { 70 + console.warn(`Skipping malformed line: ${line}`); 71 + continue; 72 + } 73 + const bundleNumStr = line.substring(0, firstCommaIndex); 74 + const bundleKey = bundleNumStr.padStart(6, "0"); 75 + 76 + // Add line to the correct bundle's array 77 + if (!bundles.has(bundleKey)) { 78 + bundles.set(bundleKey, []); 79 + } 80 + bundles.get(bundleKey).push(line); 81 + } 82 + } 83 + // Note: We ignore any final `remainder` as it's likely an empty line 84 + 85 + console.log(`Finished reading ${lineCount.toLocaleString()} lines.`); 86 + console.log(`Found ${bundles.size} unique bundles.`); 87 + 88 + // --- Pass 2: Compress and write each bundle --- 89 + console.log("\nPass 2/2: Compressing and writing bundle files..."); 90 + let i = 0; 91 + for (const [bundleKey, lines] of bundles.entries()) { 92 + i++; 93 + console.log(` (${i}/${bundles.size}) Compressing bundle ${bundleKey}...`); 94 + 95 + // Join all lines for this bundle into one big string 96 + const content = lines.join("\n"); 97 + 98 + // Compress the string 99 + const compressedData = compress(Buffer.from(content), COMPRESSION_LEVEL); 100 + 101 + // Write the compressed data to the file 102 + const outPath = join(FINAL_LABELS_DIR, `${bundleKey}.csv.zst`); 103 + await write(outPath, compressedData); 104 + } 105 + 106 + // 3. Clean up 107 + const totalTime = (Date.now() - startTime) / 1000; 108 + console.log("\n========================================"); 109 + console.log("Import Summary"); 110 + console.log("========================================"); 111 + console.log(`✓ Import completed in ${totalTime.toFixed(2)} seconds.`); 112 + console.log(`Total lines processed: ${lineCount.toLocaleString()}`); 113 + console.log(`Label files are stored in: ${FINAL_LABELS_DIR}`);
+91
utils/import-labels.sh
··· 1 + #!/bin/bash 2 + # import-labels-v4-sorted-pipe.sh 3 + 4 + set -e 5 + 6 + if [ $# -lt 1 ]; then 7 + echo "Usage: ./utils/import-labels-v4-sorted-pipe.sh <csv-file>" 8 + exit 1 9 + fi 10 + 11 + CSV_FILE="$1" 12 + CONFIG_FILE="config.yaml" 13 + 14 + [ ! -f "$CSV_FILE" ] && echo "Error: CSV file not found" && exit 1 15 + [ ! -f "$CONFIG_FILE" ] && echo "Error: config.yaml not found" && exit 1 16 + 17 + # Extract bundle directory path 18 + BUNDLE_DIR=$(grep -A 5 "^plc:" "$CONFIG_FILE" | grep "bundle_dir:" | sed 's/.*bundle_dir: *"//' | sed 's/".*//' | head -1) 19 + 20 + [ -z "$BUNDLE_DIR" ] && echo "Error: Could not parse plc.bundle_dir from config.yaml" && exit 1 21 + 22 + FINAL_LABELS_DIR="$BUNDLE_DIR/labels" 23 + 24 + echo "========================================" 25 + echo "PLC Operation Labels Import (Sorted Pipe)" 26 + echo "========================================" 27 + echo "CSV File: $CSV_FILE" 28 + echo "Output Dir: $FINAL_LABELS_DIR" 29 + echo "" 30 + 31 + # Ensure the final directory exists 32 + mkdir -p "$FINAL_LABELS_DIR" 33 + 34 + echo "Streaming, sorting, and compressing on the fly..." 35 + echo "This will take time. `pv` will show progress of the TAIL command." 36 + echo "The `sort` command will run after `pv` is complete." 37 + echo "" 38 + 39 + # This is the single-pass pipeline 40 + tail -n +2 "$CSV_FILE" | \ 41 + pv -l -s $(tail -n +2 "$CSV_FILE" | wc -l) | \ 42 + sort -t, -k1,1n | \ 43 + awk -F',' -v final_dir="$FINAL_LABELS_DIR" ' 44 + # This awk script EXPECTS input sorted by bundle number (col 1) 45 + BEGIN { 46 + # last_bundle_num tracks the bundle we are currently writing 47 + last_bundle_num = -1 48 + # cmd holds the current zstd pipe command 49 + cmd = "" 50 + } 51 + { 52 + current_bundle_num = $1 53 + 54 + # Check if the bundle number has changed 55 + if (current_bundle_num != last_bundle_num) { 56 + 57 + # If it changed, and we have an old pipe open, close it 58 + if (last_bundle_num != -1) { 59 + close(cmd) 60 + } 61 + 62 + # Create the new pipe command, writing to the final .zst file 63 + outfile = sprintf("%s/%06d.csv.zst", final_dir, current_bundle_num) 64 + cmd = "zstd -T0 -o " outfile 65 + 66 + # Update the tracker 67 + last_bundle_num = current_bundle_num 68 + 69 + # Print progress to stderr 70 + printf " -> Writing bundle %06d\n", current_bundle_num > "/dev/stderr" 71 + } 72 + 73 + # Print the current line ($0) to the open pipe 74 + # The first time this runs for a bundle, it opens the pipe 75 + # Subsequent times, it writes to the already-open pipe 76 + print $0 | cmd 77 + } 78 + # END block: close the very last pipe 79 + END { 80 + if (last_bundle_num != -1) { 81 + close(cmd) 82 + } 83 + printf " Finished. Total lines: %d\n", NR > "/dev/stderr" 84 + }' 85 + 86 + echo "" 87 + echo "========================================" 88 + echo "Import Summary" 89 + echo "========================================" 90 + echo "✓ Import completed successfully!" 91 + echo "Label files are stored in: $FINAL_LABELS_DIR"