A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

chainhash

+718 -265
+17
SECURITY
··· 1 + # Security Model 2 + 3 + ## Trust Model 4 + 5 + PLC Bundle creates an **immutable, cryptographically-chained** archive of PLC directory operations. However, the security depends on external verification. 6 + 7 + ### What the Chain Provides 8 + 9 + ✅ **Tamper Evidence**: Any modification breaks the chain 10 + ✅ **Integrity Verification**: Detect corruption or tampering 11 + ✅ **Reproducibility**: Anyone can recreate bundles from PLC 12 + ✅ **Transparency**: All operations are publicly auditable 13 + 14 + ### What the Chain Does NOT Provide 15 + 16 + ❌ **Standalone Trust**: The chain alone isn't proof of authenticity 17 + ❌ **Protection Against Total Replacement**: Someone controlling all bundles can rewrite history
+246 -18
bundle/manager.go
··· 10 10 "sort" 11 11 "strconv" 12 12 "strings" 13 + "sync" 13 14 "time" 14 15 15 16 "github.com/atscan/plcbundle/plc" ··· 173 174 return bundle, nil 174 175 } 175 176 176 - // SaveBundle saves a bundle to disk and updates the index 177 177 func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle) error { 178 178 if err := bundle.ValidateForSave(); err != nil { 179 179 return fmt.Errorf("bundle validation failed: %w", err) ··· 194 194 bundle.CompressedSize = compressedSize 195 195 bundle.CreatedAt = time.Now().UTC() 196 196 197 + // Calculate chain hash 198 + prevBundle := m.index.GetLastBundle() 199 + if prevBundle != nil { 200 + bundle.PrevChainHash = prevBundle.ChainHash 201 + bundle.PrevBundleHash = prevBundle.Hash 202 + } 203 + 204 + bundle.ChainHash = m.operations.CalculateChainHash(bundle.PrevChainHash, bundle.Hash) 205 + 197 206 // Add to index 198 207 m.index.AddBundle(bundle.ToMetadata()) 199 208 ··· 202 211 return fmt.Errorf("failed to save index: %w", err) 203 212 } 204 213 205 - m.logger.Printf("Saved bundle %06d (hash: %s...)", bundle.BundleNumber, bundle.Hash[:16]) 214 + m.logger.Printf("Saved bundle %06d (hash: %s..., chain: %s...)", 215 + bundle.BundleNumber, bundle.Hash[:16], bundle.ChainHash[:16]) 206 216 207 217 // IMPORTANT: Clean up old mempool and create new one for next bundle 208 218 oldMempoolFile := m.mempool.GetFilename() ··· 473 483 474 484 result.ChainLength = len(bundles) 475 485 476 - // Verify each bundle 477 486 for i, meta := range bundles { 478 487 // Verify file hash 479 488 vr, err := m.VerifyBundle(ctx, meta.BundleNumber) 480 - if err != nil { 481 - result.Error = fmt.Sprintf("Failed to verify bundle %d: %v", meta.BundleNumber, err) 482 - result.BrokenAt = meta.BundleNumber 483 - return result, nil 484 - } 485 - 486 - if !vr.Valid { 489 + if err != nil || !vr.Valid { 487 490 result.Error = fmt.Sprintf("Bundle %d hash verification failed", meta.BundleNumber) 488 491 result.BrokenAt = meta.BundleNumber 489 492 return result, nil 490 493 } 491 494 492 - // Verify chain link (prev_bundle_hash) 495 + // Verify chain link 493 496 if i > 0 { 494 497 prevMeta := bundles[i-1] 498 + 499 + // Check prev_bundle_hash 495 500 if meta.PrevBundleHash != prevMeta.Hash { 496 501 result.Error = fmt.Sprintf("Chain broken at bundle %d: prev_hash mismatch", meta.BundleNumber) 497 502 result.BrokenAt = meta.BundleNumber 498 503 return result, nil 499 504 } 505 + 506 + // Check chain_hash (NEW - stronger verification) 507 + expectedChainHash := m.operations.CalculateChainHash(prevMeta.ChainHash, meta.Hash) 508 + if meta.ChainHash != expectedChainHash { 509 + result.Error = fmt.Sprintf("Chain broken at bundle %d: chain_hash mismatch", meta.BundleNumber) 510 + result.BrokenAt = meta.BundleNumber 511 + return result, nil 512 + } 500 513 } 501 514 502 515 result.VerifiedBundles = append(result.VerifiedBundles, meta.BundleNumber) ··· 589 602 compressedData, _ := os.ReadFile(path) 590 603 compressedHash := m.operations.Hash(compressedData) 591 604 592 - // Determine cursor (would need previous bundle's end_time in real scenario) 593 - cursor := "" 605 + // Get previous bundle's hashes for chain calculation 594 606 prevHash := "" 607 + prevChainHash := "" 595 608 if num > 1 && len(newMetadata) > 0 { 596 609 prevMeta := newMetadata[len(newMetadata)-1] 597 - cursor = prevMeta.EndTime.Format(time.RFC3339Nano) 598 610 prevHash = prevMeta.Hash 611 + prevChainHash = prevMeta.ChainHash 612 + } 613 + 614 + // Calculate chain hash 615 + chainHash := m.operations.CalculateChainHash(prevChainHash, uncompressedHash) 616 + 617 + // Determine cursor 618 + cursor := "" 619 + if num > 1 && prevHash != "" { 620 + cursor = ops[0].CreatedAt.Format(time.RFC3339Nano) 599 621 } 600 622 601 623 meta := &BundleMetadata{ ··· 605 627 OperationCount: len(ops), 606 628 DIDCount: len(dids), 607 629 Hash: uncompressedHash, 630 + ChainHash: chainHash, 608 631 CompressedHash: compressedHash, 609 632 CompressedSize: size, 610 633 UncompressedSize: int64(len(jsonlData)), 611 634 Cursor: cursor, 612 635 PrevBundleHash: prevHash, 636 + PrevChainHash: prevChainHash, 613 637 CreatedAt: time.Now().UTC(), 614 638 } 615 639 ··· 635 659 return result, nil 636 660 } 637 661 662 + // ScanDirectoryParallel scans the bundle directory in parallel and rebuilds the index 663 + func (m *Manager) ScanDirectoryParallel(workers int, progressCallback func(current, total int)) (*DirectoryScanResult, error) { 664 + result := &DirectoryScanResult{ 665 + BundleDir: m.config.BundleDir, 666 + } 667 + 668 + m.logger.Printf("Scanning directory (parallel, %d workers): %s", workers, m.config.BundleDir) 669 + 670 + // Find all bundle files 671 + files, err := filepath.Glob(filepath.Join(m.config.BundleDir, "*.jsonl.zst")) 672 + if err != nil { 673 + return nil, fmt.Errorf("failed to scan directory: %w", err) 674 + } 675 + 676 + if len(files) == 0 { 677 + m.logger.Printf("No bundle files found") 678 + return result, nil 679 + } 680 + 681 + // Parse bundle numbers 682 + var bundleNumbers []int 683 + for _, file := range files { 684 + base := filepath.Base(file) 685 + numStr := strings.TrimSuffix(base, ".jsonl.zst") 686 + num, err := strconv.Atoi(numStr) 687 + if err != nil { 688 + m.logger.Printf("Warning: skipping invalid filename: %s", base) 689 + continue 690 + } 691 + bundleNumbers = append(bundleNumbers, num) 692 + } 693 + 694 + sort.Ints(bundleNumbers) 695 + 696 + result.BundleCount = len(bundleNumbers) 697 + if len(bundleNumbers) > 0 { 698 + result.FirstBundle = bundleNumbers[0] 699 + result.LastBundle = bundleNumbers[len(bundleNumbers)-1] 700 + } 701 + 702 + // Find gaps 703 + if len(bundleNumbers) > 1 { 704 + for i := result.FirstBundle; i <= result.LastBundle; i++ { 705 + found := false 706 + for _, num := range bundleNumbers { 707 + if num == i { 708 + found = true 709 + break 710 + } 711 + } 712 + if !found { 713 + result.MissingGaps = append(result.MissingGaps, i) 714 + } 715 + } 716 + } 717 + 718 + m.logger.Printf("Found %d bundles (gaps: %d)", result.BundleCount, len(result.MissingGaps)) 719 + 720 + // Process bundles in parallel 721 + type bundleResult struct { 722 + index int 723 + meta *BundleMetadata 724 + err error 725 + } 726 + 727 + jobs := make(chan int, len(bundleNumbers)) 728 + results := make(chan bundleResult, len(bundleNumbers)) 729 + 730 + // Start workers 731 + var wg sync.WaitGroup 732 + for w := 0; w < workers; w++ { 733 + wg.Add(1) 734 + go func() { 735 + defer wg.Done() 736 + for num := range jobs { 737 + path := filepath.Join(m.config.BundleDir, fmt.Sprintf("%06d.jsonl.zst", num)) 738 + 739 + // Load and process bundle 740 + ops, err := m.operations.LoadBundle(path) 741 + if err != nil { 742 + results <- bundleResult{index: num, err: err} 743 + continue 744 + } 745 + 746 + // Calculate metadata (without chain hash yet) 747 + meta, err := m.calculateBundleMetadataFast(num, path, ops) 748 + if err != nil { 749 + results <- bundleResult{index: num, err: err} 750 + continue 751 + } 752 + 753 + results <- bundleResult{index: num, meta: meta} 754 + } 755 + }() 756 + } 757 + 758 + // Send jobs 759 + for _, num := range bundleNumbers { 760 + jobs <- num 761 + } 762 + close(jobs) 763 + 764 + // Wait for all workers to finish 765 + go func() { 766 + wg.Wait() 767 + close(results) 768 + }() 769 + 770 + // Collect results (in a map first, then sort) 771 + metadataMap := make(map[int]*BundleMetadata) 772 + var totalSize int64 773 + var totalUncompressed int64 // NEW 774 + processed := 0 775 + 776 + for result := range results { 777 + processed++ 778 + 779 + // Update progress 780 + if progressCallback != nil { 781 + progressCallback(processed, len(bundleNumbers)) 782 + } 783 + 784 + if result.err != nil { 785 + m.logger.Printf("Warning: failed to process bundle %d: %v", result.index, result.err) 786 + continue 787 + } 788 + metadataMap[result.index] = result.meta 789 + totalSize += result.meta.CompressedSize 790 + totalUncompressed += result.meta.UncompressedSize // NEW 791 + } 792 + 793 + // Build ordered metadata slice and calculate chain hashes 794 + var newMetadata []*BundleMetadata 795 + var prevChainHash string 796 + 797 + for _, num := range bundleNumbers { 798 + meta, ok := metadataMap[num] 799 + if !ok { 800 + continue // Skip failed bundles 801 + } 802 + 803 + // Now calculate chain hash (must be done sequentially) 804 + meta.ChainHash = m.operations.CalculateChainHash(prevChainHash, meta.Hash) 805 + meta.PrevChainHash = prevChainHash 806 + 807 + // Update prev hashes for next iteration 808 + if len(newMetadata) > 0 { 809 + meta.PrevBundleHash = newMetadata[len(newMetadata)-1].Hash 810 + } 811 + 812 + newMetadata = append(newMetadata, meta) 813 + prevChainHash = meta.ChainHash 814 + } 815 + 816 + result.TotalSize = totalSize 817 + result.TotalUncompressed = totalUncompressed // NEW 818 + 819 + // Rebuild index 820 + m.index.Rebuild(newMetadata) 821 + 822 + // Save index 823 + if err := m.SaveIndex(); err != nil { 824 + return nil, fmt.Errorf("failed to save index: %w", err) 825 + } 826 + 827 + result.IndexUpdated = true 828 + 829 + m.logger.Printf("Index rebuilt with %d bundles", len(newMetadata)) 830 + 831 + return result, nil 832 + } 833 + 834 + // calculateBundleMetadataFast calculates metadata quickly (optimized for parallel processing) 835 + func (m *Manager) calculateBundleMetadataFast(bundleNumber int, path string, operations []plc.PLCOperation) (*BundleMetadata, error) { 836 + // Calculate hashes efficiently (read file once) 837 + compressedHash, compressedSize, uncompressedHash, uncompressedSize, err := m.operations.CalculateFileHashes(path) 838 + if err != nil { 839 + return nil, err 840 + } 841 + 842 + // Extract unique DIDs (this is fast) 843 + dids := m.operations.ExtractUniqueDIDs(operations) 844 + 845 + return &BundleMetadata{ 846 + BundleNumber: bundleNumber, 847 + StartTime: operations[0].CreatedAt, 848 + EndTime: operations[len(operations)-1].CreatedAt, 849 + OperationCount: len(operations), 850 + DIDCount: len(dids), 851 + Hash: uncompressedHash, 852 + CompressedHash: compressedHash, 853 + CompressedSize: compressedSize, 854 + UncompressedSize: uncompressedSize, 855 + CreatedAt: time.Now().UTC(), 856 + }, nil 857 + } 858 + 638 859 // GetInfo returns information about the bundle manager 639 860 func (m *Manager) GetInfo() map[string]interface{} { 640 861 stats := m.index.GetStats() ··· 706 927 return nil, fmt.Errorf("bundle is empty") 707 928 } 708 929 709 - // Get previous bundle hash from index 930 + // Get previous bundle hashes from index 710 931 prevHash := "" 932 + prevChainHash := "" 711 933 if bundleNumber > 1 { 712 934 if prevMeta, err := m.index.GetBundle(bundleNumber - 1); err == nil { 713 935 prevHash = prevMeta.Hash 936 + prevChainHash = prevMeta.ChainHash 714 937 } 715 938 } 716 939 717 - // Calculate metadata 718 - meta, err := m.calculateBundleMetadata(bundleNumber, path, operations, prevHash) 940 + // Calculate metadata (including chain hash) 941 + meta, err := m.calculateBundleMetadata(bundleNumber, path, operations, prevHash, prevChainHash) 719 942 if err != nil { 720 943 return nil, fmt.Errorf("failed to calculate metadata: %w", err) 721 944 } ··· 742 965 } 743 966 744 967 // calculateBundleMetadata calculates metadata for a bundle (internal helper) 745 - func (m *Manager) calculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, prevBundleHash string) (*BundleMetadata, error) { 968 + func (m *Manager) calculateBundleMetadata(bundleNumber int, path string, operations []plc.PLCOperation, prevBundleHash string, prevChainHash string) (*BundleMetadata, error) { 746 969 // Get file info 747 970 info, err := os.Stat(path) 748 971 if err != nil { ··· 763 986 } 764 987 compressedHash := m.operations.Hash(compressedData) 765 988 989 + // Calculate chain hash 990 + chainHash := m.operations.CalculateChainHash(prevChainHash, uncompressedHash) 991 + 766 992 // Determine cursor 767 993 cursor := "" 768 994 if bundleNumber > 1 && prevBundleHash != "" { ··· 776 1002 OperationCount: len(operations), 777 1003 DIDCount: len(dids), 778 1004 Hash: uncompressedHash, 1005 + ChainHash: chainHash, 779 1006 CompressedHash: compressedHash, 780 1007 CompressedSize: info.Size(), 781 1008 UncompressedSize: uncompressedSize, 782 1009 Cursor: cursor, 783 1010 PrevBundleHash: prevBundleHash, 1011 + PrevChainHash: prevChainHash, 784 1012 CreatedAt: time.Now().UTC(), 785 1013 }, nil 786 1014 }
+36
bundle/operations.go
··· 385 385 dr.decoder.Close() 386 386 return dr.file.Close() 387 387 } 388 + 389 + func (op *Operations) CalculateChainHash(prevChainHash string, contentHash string) string { 390 + var data string 391 + if prevChainHash == "" { 392 + // Genesis bundle 393 + data = "plcbundle:genesis:" + contentHash 394 + } else { 395 + data = prevChainHash + ":" + contentHash 396 + } 397 + return op.Hash([]byte(data)) 398 + } 399 + 400 + // CalculateFileHashes calculates both content and compressed hashes efficiently 401 + func (op *Operations) CalculateFileHashes(path string) (compressedHash string, compressedSize int64, uncompressedHash string, uncompressedSize int64, err error) { 402 + // Read compressed file 403 + compressedData, err := os.ReadFile(path) 404 + if err != nil { 405 + return "", 0, "", 0, fmt.Errorf("failed to read file: %w", err) 406 + } 407 + 408 + // Calculate compressed hash 409 + compressedHash = op.Hash(compressedData) 410 + compressedSize = int64(len(compressedData)) 411 + 412 + // Decompress 413 + uncompressedData, err := op.decoder.DecodeAll(compressedData, nil) 414 + if err != nil { 415 + return "", 0, "", 0, fmt.Errorf("failed to decompress: %w", err) 416 + } 417 + 418 + // Calculate uncompressed hash 419 + uncompressedHash = op.Hash(uncompressedData) 420 + uncompressedSize = int64(len(uncompressedData)) 421 + 422 + return compressedHash, compressedSize, uncompressedHash, uncompressedSize, nil 423 + }
+42 -29
bundle/types.go
··· 15 15 16 16 // Bundle represents a PLC bundle 17 17 type Bundle struct { 18 - BundleNumber int `json:"bundle_number"` 19 - StartTime time.Time `json:"start_time"` 20 - EndTime time.Time `json:"end_time"` 21 - Operations []plc.PLCOperation `json:"-"` // Not serialized to JSON 22 - DIDCount int `json:"did_count"` 23 - Hash string `json:"hash"` 24 - CompressedHash string `json:"compressed_hash"` 25 - CompressedSize int64 `json:"compressed_size"` 26 - UncompressedSize int64 `json:"uncompressed_size"` 27 - Cursor string `json:"cursor"` 28 - PrevBundleHash string `json:"prev_bundle_hash,omitempty"` 29 - BoundaryCIDs []string `json:"boundary_cids,omitempty"` 30 - Compressed bool `json:"compressed"` 31 - CreatedAt time.Time `json:"created_at"` 18 + BundleNumber int `json:"bundle_number"` 19 + StartTime time.Time `json:"start_time"` 20 + EndTime time.Time `json:"end_time"` 21 + Operations []plc.PLCOperation `json:"-"` 22 + DIDCount int `json:"did_count"` 23 + 24 + Hash string `json:"hash"` 25 + ChainHash string `json:"chain_hash"` 26 + 27 + CompressedHash string `json:"compressed_hash"` 28 + CompressedSize int64 `json:"compressed_size"` 29 + UncompressedSize int64 `json:"uncompressed_size"` 30 + Cursor string `json:"cursor"` 31 + PrevBundleHash string `json:"prev_bundle_hash,omitempty"` 32 + PrevChainHash string `json:"prev_chain_hash,omitempty"` 33 + BoundaryCIDs []string `json:"boundary_cids,omitempty"` 34 + Compressed bool `json:"compressed"` 35 + CreatedAt time.Time `json:"created_at"` 32 36 } 33 37 34 38 // GetFilePath returns the file path for this bundle ··· 85 89 return nil 86 90 } 87 91 88 - // BundleMetadata contains metadata about a bundle (for index) 89 92 type BundleMetadata struct { 90 - BundleNumber int `json:"bundle_number"` 91 - StartTime time.Time `json:"start_time"` 92 - EndTime time.Time `json:"end_time"` 93 - OperationCount int `json:"operation_count"` 94 - DIDCount int `json:"did_count"` 95 - Hash string `json:"hash"` 93 + BundleNumber int `json:"bundle_number"` 94 + StartTime time.Time `json:"start_time"` 95 + EndTime time.Time `json:"end_time"` 96 + OperationCount int `json:"operation_count"` 97 + DIDCount int `json:"did_count"` 98 + 99 + // Content hash (operations only) 100 + Hash string `json:"hash"` 101 + 102 + // Cumulative chain hash (includes all previous bundles) 103 + ChainHash string `json:"chain_hash"` 104 + 105 + // References 106 + PrevBundleHash string `json:"prev_bundle_hash,omitempty"` 107 + PrevChainHash string `json:"prev_chain_hash,omitempty"` 108 + 96 109 CompressedHash string `json:"compressed_hash"` 97 110 CompressedSize int64 `json:"compressed_size"` 98 111 UncompressedSize int64 `json:"uncompressed_size"` 99 112 Cursor string `json:"cursor"` 100 - PrevBundleHash string `json:"prev_bundle_hash,omitempty"` 101 113 CreatedAt time.Time `json:"created_at"` 102 114 } 103 115 ··· 141 153 142 154 // DirectoryScanResult contains results from scanning a directory 143 155 type DirectoryScanResult struct { 144 - BundleDir string 145 - BundleCount int 146 - FirstBundle int 147 - LastBundle int 148 - MissingGaps []int 149 - TotalSize int64 150 - IndexUpdated bool 156 + BundleDir string 157 + BundleCount int 158 + FirstBundle int 159 + LastBundle int 160 + MissingGaps []int 161 + TotalSize int64 // Compressed size 162 + TotalUncompressed int64 // Uncompressed size (NEW) 163 + IndexUpdated bool 151 164 } 152 165 153 166 // Logger interface for bundle operations
+1 -1
cmd/plcbundle/info.go
··· 35 35 fmt.Printf("\n") 36 36 fmt.Printf("Get started:\n") 37 37 fmt.Printf(" plcbundle fetch # Fetch bundles from PLC\n") 38 - fmt.Printf(" plcbundle scan # Scan existing bundle files\n") 38 + fmt.Printf(" plcbundle rebuild # Rebuild index from existing files\n") 39 39 fmt.Printf("\n") 40 40 return 41 41 }
+95 -44
cmd/plcbundle/main.go
··· 7 7 "net/http" 8 8 "os" 9 9 "path/filepath" 10 + "runtime" 10 11 "runtime/debug" 11 12 "sort" 12 13 "strings" ··· 60 61 switch command { 61 62 case "fetch": 62 63 cmdFetch() 63 - case "scan": 64 - cmdScan() 64 + case "rebuild": 65 + cmdRebuild() 65 66 case "verify": 66 67 cmdVerify() 67 68 case "info": ··· 88 89 } 89 90 90 91 func printUsage() { 91 - fmt.Println(`plcbundle - PLC Bundle Management Tool 92 + fmt.Printf(`plcbundle %s - DID PLC Bundle Management Tool 92 93 93 94 Usage: 94 95 plcbundle <command> [options] 95 96 96 97 Commands: 97 98 fetch Fetch next bundle from PLC directory 98 - scan Scan current directory for bundles (incremental) 99 + rebuild Rebuild index from existing bundle files 99 100 verify Verify bundle integrity 100 101 info Show bundle information 101 102 export Export operations from bundles ··· 105 106 compare Compare local index with target index 106 107 version Show version 107 108 108 - The tool works with the current directory. 109 - Bundle directory is detected by presence of .jsonl.zst files or plc_bundles.json.`) 109 + Security Model: 110 + Bundles are cryptographically chained but require external verification: 111 + - Verify against original PLC directory 112 + - Compare with multiple independent mirrors 113 + - Check published root and head hashes 114 + - Anyone can reproduce bundles from PLC directory`, version) 110 115 } 111 116 112 117 // getManager creates or opens a bundle manager in the detected directory ··· 258 263 return false 259 264 } 260 265 261 - func cmdScan() { 262 - fs := flag.NewFlagSet("scan", flag.ExitOnError) 266 + func cmdRebuild() { 267 + fs := flag.NewFlagSet("rebuild", flag.ExitOnError) 268 + verbose := fs.Bool("v", false, "verbose output") 269 + workers := fs.Int("workers", 4, "number of parallel workers (0 = CPU count)") 270 + noProgress := fs.Bool("no-progress", false, "disable progress bar") 263 271 fs.Parse(os.Args[2:]) 264 272 273 + // Auto-detect CPU count 274 + if *workers == 0 { 275 + *workers = runtime.NumCPU() 276 + } 277 + 265 278 mgr, dir, err := getManager("") 266 279 if err != nil { 267 280 fmt.Fprintf(os.Stderr, "Error: %v\n", err) ··· 269 282 } 270 283 defer mgr.Close() 271 284 272 - fmt.Printf("Scanning: %s\n", dir) 285 + fmt.Printf("Rebuilding index from: %s\n", dir) 286 + fmt.Printf("Using %d workers\n", *workers) 273 287 274 288 // Find all bundle files 275 289 files, err := filepath.Glob(filepath.Join(dir, "*.jsonl.zst")) ··· 283 297 return 284 298 } 285 299 286 - // Parse and sort bundle numbers 287 - var bundleNumbers []int 288 - for _, file := range files { 289 - base := filepath.Base(file) 290 - var num int 291 - if _, err := fmt.Sscanf(base, "%06d.jsonl.zst", &num); err == nil { 292 - bundleNumbers = append(bundleNumbers, num) 300 + fmt.Printf("Found %d bundle files\n", len(files)) 301 + fmt.Printf("\n") 302 + 303 + start := time.Now() 304 + 305 + // Create progress bar 306 + var progress *ProgressBar 307 + var progressCallback func(int, int) 308 + 309 + if !*noProgress { 310 + progress = NewProgressBar(len(files)) 311 + progressCallback = func(current, total int) { 312 + progress.Set(current) 313 + } 314 + fmt.Println("Processing bundles:") 315 + } 316 + 317 + // Use parallel scan 318 + result, err := mgr.ScanDirectoryParallel(*workers, progressCallback) 319 + if err != nil { 320 + if progress != nil { 321 + progress.Finish() 293 322 } 323 + fmt.Fprintf(os.Stderr, "Error: %v\n", err) 324 + os.Exit(1) 294 325 } 295 326 296 - // Sort 297 - sort.Ints(bundleNumbers) 327 + // Finish progress bar 328 + if progress != nil { 329 + progress.Finish() 330 + } 331 + 332 + elapsed := time.Since(start) 333 + 334 + fmt.Printf("\n") 335 + fmt.Printf("✓ Index rebuilt in %s\n", elapsed.Round(time.Millisecond)) 336 + fmt.Printf(" Total bundles: %d\n", result.BundleCount) 337 + fmt.Printf(" Compressed size: %s\n", formatBytes(result.TotalSize)) 338 + fmt.Printf(" Uncompressed size: %s\n", formatBytes(result.TotalUncompressed)) 298 339 299 - skippedCount := 0 300 - newCount := 0 340 + // Calculate compression ratio 341 + if result.TotalUncompressed > 0 { 342 + ratio := float64(result.TotalUncompressed) / float64(result.TotalSize) 343 + fmt.Printf(" Compression ratio: %.2fx\n", ratio) 344 + } 301 345 302 - fmt.Printf("Found %d bundle files\n", len(bundleNumbers)) 346 + fmt.Printf(" Average speed: %.1f bundles/sec\n", float64(result.BundleCount)/elapsed.Seconds()) 347 + 348 + if elapsed.Seconds() > 0 { 349 + compressedThroughput := float64(result.TotalSize) / elapsed.Seconds() / (1024 * 1024) 350 + uncompressedThroughput := float64(result.TotalUncompressed) / elapsed.Seconds() / (1024 * 1024) 351 + fmt.Printf(" Throughput (compressed): %.1f MB/s\n", compressedThroughput) 352 + fmt.Printf(" Throughput (uncompressed): %.1f MB/s\n", uncompressedThroughput) 353 + } 303 354 304 - // Process each bundle incrementally 305 - for _, num := range bundleNumbers { 306 - // Skip if already indexed 307 - if mgr.IsBundleIndexed(num) { 308 - skippedCount++ 309 - continue 310 - } 355 + fmt.Printf(" Index file: %s\n", filepath.Join(dir, bundle.INDEX_FILE)) 311 356 312 - fmt.Printf(" Processing bundle %06d...", num) 357 + if len(result.MissingGaps) > 0 { 358 + fmt.Printf(" ⚠️ Missing gaps: %d bundles\n", len(result.MissingGaps)) 359 + } 313 360 314 - path := filepath.Join(dir, fmt.Sprintf("%06d.jsonl.zst", num)) 361 + // Verify chain if requested 362 + if *verbose { 363 + fmt.Printf("\n") 364 + fmt.Printf("Verifying chain integrity...\n") 315 365 316 - // ✅ Use Manager - single point of entry 317 - meta, err := mgr.ScanAndIndexBundle(path, num) 366 + ctx := context.Background() 367 + verifyResult, err := mgr.VerifyChain(ctx) 318 368 if err != nil { 319 - fmt.Printf(" ERROR: %v\n", err) 320 - continue 321 - } 369 + fmt.Printf(" ⚠️ Verification error: %v\n", err) 370 + } else if verifyResult.Valid { 371 + fmt.Printf(" ✓ Chain is valid (%d bundles verified)\n", len(verifyResult.VerifiedBundles)) 322 372 323 - newCount++ 324 - fmt.Printf(" ✓ (%d ops, %d DIDs)\n", meta.OperationCount, meta.DIDCount) 373 + // Show head hash 374 + index := mgr.GetIndex() 375 + if lastMeta := index.GetLastBundle(); lastMeta != nil { 376 + fmt.Printf(" Chain head: %s...\n", lastMeta.ChainHash[:16]) 377 + } 378 + } else { 379 + fmt.Printf(" ✗ Chain verification failed\n") 380 + fmt.Printf(" Broken at: bundle %06d\n", verifyResult.BrokenAt) 381 + fmt.Printf(" Error: %s\n", verifyResult.Error) 382 + } 325 383 } 326 - 327 - fmt.Printf("\n") 328 - fmt.Printf("✓ Scan complete\n") 329 - fmt.Printf(" Total bundles: %d\n", len(bundleNumbers)) 330 - fmt.Printf(" Already indexed: %d\n", skippedCount) 331 - fmt.Printf(" Newly scanned: %d\n", newCount) 332 - fmt.Printf(" Index: %s\n", filepath.Join(dir, bundle.INDEX_FILE)) 333 384 } 334 385 335 386 func cmdVerify() {
+108
cmd/plcbundle/progress.go
··· 1 + package main 2 + 3 + import ( 4 + "fmt" 5 + "strings" 6 + "sync" 7 + "time" 8 + ) 9 + 10 + // ProgressBar shows progress of an operation 11 + type ProgressBar struct { 12 + total int 13 + current int 14 + startTime time.Time 15 + mu sync.Mutex 16 + width int 17 + lastPrint time.Time 18 + } 19 + 20 + // NewProgressBar creates a new progress bar 21 + func NewProgressBar(total int) *ProgressBar { 22 + return &ProgressBar{ 23 + total: total, 24 + current: 0, 25 + startTime: time.Now(), 26 + width: 40, 27 + lastPrint: time.Now(), 28 + } 29 + } 30 + 31 + // Increment increases the progress by 1 32 + func (pb *ProgressBar) Increment() { 33 + pb.mu.Lock() 34 + defer pb.mu.Unlock() 35 + pb.current++ 36 + pb.print() 37 + } 38 + 39 + // Set sets the current progress 40 + func (pb *ProgressBar) Set(current int) { 41 + pb.mu.Lock() 42 + defer pb.mu.Unlock() 43 + pb.current = current 44 + pb.print() 45 + } 46 + 47 + // Finish completes the progress bar 48 + func (pb *ProgressBar) Finish() { 49 + pb.mu.Lock() 50 + defer pb.mu.Unlock() 51 + pb.current = pb.total 52 + pb.print() 53 + fmt.Println() // New line after completion 54 + } 55 + 56 + // print renders the progress bar (must be called with lock held) 57 + func (pb *ProgressBar) print() { 58 + // Rate limit updates (max 10 per second) 59 + if time.Since(pb.lastPrint) < 100*time.Millisecond && pb.current < pb.total { 60 + return 61 + } 62 + pb.lastPrint = time.Now() 63 + 64 + // Calculate percentage 65 + percent := float64(pb.current) / float64(pb.total) * 100 66 + if pb.total == 0 { 67 + percent = 0 68 + } 69 + 70 + // Calculate bar 71 + filled := int(float64(pb.width) * float64(pb.current) / float64(pb.total)) 72 + if filled > pb.width { 73 + filled = pb.width 74 + } 75 + bar := strings.Repeat("█", filled) + strings.Repeat("░", pb.width-filled) 76 + 77 + // Calculate speed and ETA 78 + elapsed := time.Since(pb.startTime) 79 + speed := float64(pb.current) / elapsed.Seconds() 80 + remaining := pb.total - pb.current 81 + var eta time.Duration 82 + if speed > 0 { 83 + eta = time.Duration(float64(remaining)/speed) * time.Second 84 + } 85 + 86 + // Print progress bar 87 + fmt.Printf("\r [%s] %6.2f%% | %d/%d bundles | %.1f/s | ETA: %s ", 88 + bar, 89 + percent, 90 + pb.current, 91 + pb.total, 92 + speed, 93 + formatETA(eta)) 94 + } 95 + 96 + // formatETA formats the ETA duration 97 + func formatETA(d time.Duration) string { 98 + if d == 0 { 99 + return "calculating..." 100 + } 101 + if d < time.Minute { 102 + return fmt.Sprintf("%ds", int(d.Seconds())) 103 + } 104 + if d < time.Hour { 105 + return fmt.Sprintf("%dm %ds", int(d.Minutes()), int(d.Seconds())%60) 106 + } 107 + return fmt.Sprintf("%dh %dm", int(d.Hours()), int(d.Minutes())%60) 108 + }
+173 -173
cmd/plcbundle/server.go
··· 24 24 }, 25 25 } 26 26 27 + func handleRoot(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, syncMode bool, wsEnabled bool) { 28 + w.Header().Set("Content-Type", "text/plain; charset=utf-8") 29 + 30 + index := mgr.GetIndex() 31 + stats := index.GetStats() 32 + bundleCount := stats["bundle_count"].(int) 33 + 34 + baseURL := getBaseURL(r) 35 + wsURL := getWSURL(r) 36 + 37 + fmt.Fprintf(w, ` 38 + 39 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 40 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 41 + ⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀ 42 + ⠀⠀⠀⠀⠀⠀⠀⢀⢸⣿⣿⢿⣾⣯⣑⢄⡂⠀⠄⠂⠀⠀⢀⠀⠀⠐⠀⠀⠀⠀⠀⠀⠀⠀⠄⠐⠀⠀⠀⠀⣄⠭⠂⠈⠜⣩⣿⢝⠃⠀⠁⠀⠀⠀⠀⠀⠀⠀⠀ 43 + ⠀⠀⠀⠀⠀⠀⠀⢀⣻⡟⠏⠀⠚⠈⠚⡉⡝⢶⣱⢤⣅⠈⠀⠄⠀⠀⠀⠀⠀⠠⠀⠀⡂⠐⣤⢕⡪⢼⣈⡹⡇⠏⠏⠋⠅⢃⣪⡏⡇⡍⠀⠀⠀⠀⠀⠀⠀⠀⠀ 44 + ⠀⠀⠀⠀⠀⠀⠀⠀⠺⣻⡄⠀⠀⠀⢠⠌⠃⠐⠉⢡⠱⠧⠝⡯⣮⢶⣴⣤⡆⢐⣣⢅⣮⡟⠦⠍⠉⠀⠁⠐⠀⠀⠀⠄⠐⠡⣽⡸⣎⢁⠀⠀⠀⠀⠀⠀⠀⠀⠀ 45 + ⠀⠀⠀⠀⠀⠀⠀⢈⡻⣧⠀⠁⠐⠀⠀⠀⠀⠀⠀⠊⠀⠕⢀⡉⠈⡫⠽⡿⡟⠿⠟⠁⠀⠀⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠬⠥⣋⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 46 + ⠀⠀⠀⠀⠀⠀⠀⡀⣾⡍⠕⡀⠀⠀⠀⠄⠠⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠥⣤⢌⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠄⢀⠀⢝⢞⣫⡆⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀ 47 + ⠀⠀⠀⠀⠀⠀⠀⠀⣽⡶⡄⠐⡀⠀⠀⠀⠀⠀⠀⢀⠀⠄⠀⠀⠀⠄⠁⠇⣷⡆⠀⠀⠀⢀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡸⢝⣮⠍⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 48 + ⠀⠀⠀⠀⠀⠀⢀⠀⢾⣷⠀⠠⡀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠀⠁⡁⠀⠀⣾⡥⠖⠀⠀⠀⠂⠀⠀⠀⠀⠀⠁⠀⡀⠁⠀⠀⠻⢳⣻⢄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 49 + ⠀⠀⠀⠀⠀⠀⠀⠀⣞⡙⠨⣀⠠⠄⠀⠂⠀⠀⠀⠈⢀⠀⠀⠀⠀⠀⠤⢚⢢⣟⠀⠀⠀⠀⡐⠀⠀⡀⠀⠀⠀⠀⠁⠈⠌⠊⣯⣮⡏⠡⠂⠀⠀⠀⠀⠀⠀⠀⠀ 50 + ⠀⠀⠀⠀⠀⠀⠀⠀⣻⡟⡄⡡⣄⠀⠠⠀⠀⡅⠀⠐⠀⡀⠀⡀⠀⠄⠈⠃⠳⠪⠤⠀⠀⠀⠀⡀⠀⠂⠀⠀⠀⠁⠈⢠⣠⠒⠻⣻⡧⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 51 + ⠀⠀⠀⠀⠀⠀⠀⠀⠪⡎⠠⢌⠑⡀⠂⠀⠄⠠⠀⠠⠀⠁⡀⠠⠠⡀⣀⠜⢏⡅⠀⠀⡀⠁⠀⠀⠁⠁⠐⠄⡀⢀⠂⠀⠄⢑⣿⣿⣿⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 52 + ⠀⠀⠀⠀⠀⠀⠀⠀⠼⣻⠧⣣⣀⠐⠨⠁⠕⢈⢀⢀⡁⠀⠈⠠⢀⠀⠐⠜⣽⡗⡤⠀⠂⠀⠠⠀⢂⠠⠀⠁⠄⠀⠔⠀⠑⣨⣿⢯⠋⡅⠀⠀⠀⠀⠀⠀⠀⠀⠀ 53 + ⠀⠀⠀⠀⠀⠀⠀⠀⡚⣷⣭⠎⢃⡗⠄⡄⢀⠁⠀⠅⢀⢅⡀⠠⠀⢠⡀⡩⠷⢇⠀⡀⠄⡠⠤⠆⣀⡀⠄⠉⣠⠃⠴⠀⠈⢁⣿⡛⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 54 + ⠀⠀⠀⠀⠀⠀⠀⠘⡬⡿⣿⡏⡻⡯⠌⢁⢛⠠⠓⠐⠐⠐⠌⠃⠋⠂⡢⢰⣈⢏⣰⠂⠈⠀⠠⠒⠡⠌⠫⠭⠩⠢⡬⠆⠿⢷⢿⡽⡧⠉⠊⠀⠀⠀⠀⠀⠀⠀⠀ 55 + ⠀⠀⠀⠀⠀⠀⠀⠀⠺⣷⣺⣗⣿⡶⡎⡅⣣⢎⠠⡅⣢⡖⠴⠬⡈⠂⡨⢡⠾⣣⣢⠀⠀⡹⠄⡄⠄⡇⣰⡖⡊⠔⢹⣄⣿⣭⣵⣿⢷⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 56 + ⠀⠀⠀⠀⠀⠀⠀⠀⠩⣿⣿⣲⣿⣷⣟⣼⠟⣬⢉⡠⣪⢜⣂⣁⠥⠓⠚⡁⢶⣷⣠⠂⡄⡢⣀⡐⠧⢆⣒⡲⡳⡫⢟⡃⢪⡧⣟⡟⣯⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀ 57 + ⠀⠀⠀⠀⠀⠀⠀⠀⢺⠟⢿⢟⢻⡗⡮⡿⣲⢷⣆⣏⣇⡧⣄⢖⠾⡷⣿⣤⢳⢷⣣⣦⡜⠗⣭⢂⠩⣹⢿⡲⢎⡧⣕⣖⣓⣽⡿⡖⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 58 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 59 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 60 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 61 + ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 62 + 63 + plcbundle server (%s) 64 + 65 + `, version) 66 + 67 + fmt.Fprintf(w, "What is PLC Bundle?\n") 68 + fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━━\n") 69 + fmt.Fprintf(w, "plcbundle archives AT Protocol's DID PLC Directory operations into\n") 70 + fmt.Fprintf(w, "immutable, cryptographically-chained bundles of 10,000 operations.\n") 71 + fmt.Fprintf(w, "Each bundle is compressed (zstd), hashed (SHA-256), and linked to\n") 72 + fmt.Fprintf(w, "the previous bundle, creating a verifiable chain of DID operations.\n\n") 73 + fmt.Fprintf(w, "More info: https://github.com/atscan/plcbundle\n\n") 74 + 75 + fmt.Fprintf(w, "Server Stats\n") 76 + fmt.Fprintf(w, "━━━━━━━━━━━━\n") 77 + fmt.Fprintf(w, " Bundle count: %d\n", bundleCount) 78 + fmt.Fprintf(w, " Sync mode: %v\n", syncMode) 79 + fmt.Fprintf(w, " WebSocket: %v\n", wsEnabled) 80 + 81 + if bundleCount > 0 { 82 + firstBundle := stats["first_bundle"].(int) 83 + lastBundle := stats["last_bundle"].(int) 84 + totalSize := stats["total_size"].(int64) 85 + 86 + fmt.Fprintf(w, " Range: %06d - %06d\n", firstBundle, lastBundle) 87 + fmt.Fprintf(w, " Total size: %.2f MB\n", float64(totalSize)/(1024*1024)) 88 + fmt.Fprintf(w, " Updated: %s\n", stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05")) 89 + 90 + if gaps, ok := stats["gaps"].(int); ok && gaps > 0 { 91 + fmt.Fprintf(w, " ⚠ Gaps: %d missing bundles\n", gaps) 92 + } 93 + 94 + // Get first and last bundle metadata for hashes 95 + firstMeta, err := index.GetBundle(firstBundle) 96 + if err == nil { 97 + fmt.Fprintf(w, "\n Root: %s\n", firstMeta.Hash) 98 + } 99 + 100 + lastMeta, err := index.GetBundle(lastBundle) 101 + if err == nil { 102 + fmt.Fprintf(w, " Head: %s\n", lastMeta.Hash) 103 + } 104 + } 105 + 106 + // Show mempool stats if sync mode 107 + if syncMode { 108 + mempoolStats := mgr.GetMempoolStats() 109 + count := mempoolStats["count"].(int) 110 + targetBundle := mempoolStats["target_bundle"].(int) 111 + canCreate := mempoolStats["can_create_bundle"].(bool) 112 + 113 + fmt.Fprintf(w, "\nMempool Stats\n") 114 + fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 115 + fmt.Fprintf(w, " Target bundle: %06d\n", targetBundle) 116 + fmt.Fprintf(w, " Operations: %d / %d\n", count, bundle.BUNDLE_SIZE) 117 + fmt.Fprintf(w, " Can create bundle: %v\n", canCreate) 118 + 119 + if count > 0 { 120 + progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 121 + fmt.Fprintf(w, " Progress: %.1f%%\n", progress) 122 + 123 + // ASCII Progress bar 124 + barWidth := 50 125 + filled := int(float64(barWidth) * float64(count) / float64(bundle.BUNDLE_SIZE)) 126 + if filled > barWidth { 127 + filled = barWidth 128 + } 129 + bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled) 130 + fmt.Fprintf(w, " [%s]\n", bar) 131 + 132 + if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 133 + fmt.Fprintf(w, " First op: %s\n", firstTime.Format("2006-01-02 15:04:05")) 134 + } 135 + if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 136 + fmt.Fprintf(w, " Last op: %s\n", lastTime.Format("2006-01-02 15:04:05")) 137 + } 138 + } else { 139 + fmt.Fprintf(w, " (empty)\n") 140 + } 141 + } 142 + 143 + fmt.Fprintf(w, "\nAPI Endpoints\n") 144 + fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 145 + fmt.Fprintf(w, " GET / This info page\n") 146 + fmt.Fprintf(w, " GET /index.json Full bundle index\n") 147 + fmt.Fprintf(w, " GET /bundle/:number Bundle metadata (JSON)\n") 148 + fmt.Fprintf(w, " GET /data/:number Raw bundle (zstd compressed)\n") 149 + fmt.Fprintf(w, " GET /jsonl/:number Decompressed JSONL stream\n") 150 + 151 + if wsEnabled { 152 + fmt.Fprintf(w, "\nWebSocket Endpoints\n") 153 + fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━\n") 154 + fmt.Fprintf(w, " WS /ws?cursor=N Live stream all records from cursor N\n") 155 + fmt.Fprintf(w, " Streams all bundles, then mempool\n") 156 + fmt.Fprintf(w, " Continues streaming new operations live\n") 157 + fmt.Fprintf(w, " Connection stays open until client closes\n") 158 + fmt.Fprintf(w, " Cursor: global record number (0-based)\n") 159 + fmt.Fprintf(w, " Example: 88410345 = bundle 8841, pos 345\n") 160 + } 161 + 162 + if syncMode { 163 + fmt.Fprintf(w, "\nSync Endpoints\n") 164 + fmt.Fprintf(w, "━━━━━━━━━━━━━━\n") 165 + fmt.Fprintf(w, " GET /sync Sync status & mempool info (JSON)\n") 166 + fmt.Fprintf(w, " GET /sync/mempool Mempool operations (JSONL)\n") 167 + } 168 + 169 + fmt.Fprintf(w, "\nExamples\n") 170 + fmt.Fprintf(w, "━━━━━━━━\n") 171 + fmt.Fprintf(w, " # Get bundle metadata\n") 172 + fmt.Fprintf(w, " curl %s/bundle/1\n\n", baseURL) 173 + fmt.Fprintf(w, " # Download compressed bundle 42\n") 174 + fmt.Fprintf(w, " curl %s/data/42 -o 000042.jsonl.zst\n\n", baseURL) 175 + fmt.Fprintf(w, " # Stream decompressed operations from bundle 42\n") 176 + fmt.Fprintf(w, " curl %s/jsonl/1\n\n", baseURL) 177 + 178 + if wsEnabled { 179 + fmt.Fprintf(w, " # Stream all operations via WebSocket (from beginning)\n") 180 + fmt.Fprintf(w, " websocat %s/ws\n\n", wsURL) 181 + fmt.Fprintf(w, " # Stream from cursor 10000\n") 182 + fmt.Fprintf(w, " websocat '%s/ws?cursor=10000'\n\n", wsURL) 183 + fmt.Fprintf(w, " # Stream and save to file\n") 184 + fmt.Fprintf(w, " websocat %s/ws > all_operations.jsonl\n\n", wsURL) 185 + fmt.Fprintf(w, " # Stream with jq for pretty printing\n") 186 + fmt.Fprintf(w, " websocat %s/ws | jq .\n\n", wsURL) 187 + } 188 + 189 + if syncMode { 190 + fmt.Fprintf(w, " # Get sync status\n") 191 + fmt.Fprintf(w, " curl %s/sync\n\n", baseURL) 192 + fmt.Fprintf(w, " # Get mempool operations\n") 193 + fmt.Fprintf(w, " curl %s/sync/mempool\n\n", baseURL) 194 + } 195 + 196 + fmt.Fprintf(w, "\n────────────────────────────────────────────────────────────────\n") 197 + fmt.Fprintf(w, "plcbundle %s | https://github.com/atscan/plcbundle\n", version) 198 + } 199 + 27 200 // getScheme returns the appropriate HTTP scheme (http or https) 28 201 func getScheme(r *http.Request) string { 29 202 // Check if TLS is active ··· 374 547 } 375 548 376 549 return nil 377 - } 378 - 379 - func handleRoot(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, syncMode bool, wsEnabled bool) { 380 - w.Header().Set("Content-Type", "text/plain; charset=utf-8") 381 - 382 - index := mgr.GetIndex() 383 - stats := index.GetStats() 384 - bundleCount := stats["bundle_count"].(int) 385 - 386 - baseURL := getBaseURL(r) 387 - wsURL := getWSURL(r) 388 - 389 - fmt.Fprintf(w, ` 390 - 391 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 392 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 393 - ⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀ 394 - ⠀⠀⠀⠀⠀⠀⠀⢀⢸⣿⣿⢿⣾⣯⣑⢄⡂⠀⠄⠂⠀⠀⢀⠀⠀⠐⠀⠀⠀⠀⠀⠀⠀⠀⠄⠐⠀⠀⠀⠀⣄⠭⠂⠈⠜⣩⣿⢝⠃⠀⠁⠀⠀⠀⠀⠀⠀⠀⠀ 395 - ⠀⠀⠀⠀⠀⠀⠀⢀⣻⡟⠏⠀⠚⠈⠚⡉⡝⢶⣱⢤⣅⠈⠀⠄⠀⠀⠀⠀⠀⠠⠀⠀⡂⠐⣤⢕⡪⢼⣈⡹⡇⠏⠏⠋⠅⢃⣪⡏⡇⡍⠀⠀⠀⠀⠀⠀⠀⠀⠀ 396 - ⠀⠀⠀⠀⠀⠀⠀⠀⠺⣻⡄⠀⠀⠀⢠⠌⠃⠐⠉⢡⠱⠧⠝⡯⣮⢶⣴⣤⡆⢐⣣⢅⣮⡟⠦⠍⠉⠀⠁⠐⠀⠀⠀⠄⠐⠡⣽⡸⣎⢁⠀⠀⠀⠀⠀⠀⠀⠀⠀ 397 - ⠀⠀⠀⠀⠀⠀⠀⢈⡻⣧⠀⠁⠐⠀⠀⠀⠀⠀⠀⠊⠀⠕⢀⡉⠈⡫⠽⡿⡟⠿⠟⠁⠀⠀⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠬⠥⣋⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 398 - ⠀⠀⠀⠀⠀⠀⠀⡀⣾⡍⠕⡀⠀⠀⠀⠄⠠⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠥⣤⢌⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠄⢀⠀⢝⢞⣫⡆⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀ 399 - ⠀⠀⠀⠀⠀⠀⠀⠀⣽⡶⡄⠐⡀⠀⠀⠀⠀⠀⠀⢀⠀⠄⠀⠀⠀⠄⠁⠇⣷⡆⠀⠀⠀⢀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡸⢝⣮⠍⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 400 - ⠀⠀⠀⠀⠀⠀⢀⠀⢾⣷⠀⠠⡀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠀⠁⡁⠀⠀⣾⡥⠖⠀⠀⠀⠂⠀⠀⠀⠀⠀⠁⠀⡀⠁⠀⠀⠻⢳⣻⢄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 401 - ⠀⠀⠀⠀⠀⠀⠀⠀⣞⡙⠨⣀⠠⠄⠀⠂⠀⠀⠀⠈⢀⠀⠀⠀⠀⠀⠤⢚⢢⣟⠀⠀⠀⠀⡐⠀⠀⡀⠀⠀⠀⠀⠁⠈⠌⠊⣯⣮⡏⠡⠂⠀⠀⠀⠀⠀⠀⠀⠀ 402 - ⠀⠀⠀⠀⠀⠀⠀⠀⣻⡟⡄⡡⣄⠀⠠⠀⠀⡅⠀⠐⠀⡀⠀⡀⠀⠄⠈⠃⠳⠪⠤⠀⠀⠀⠀⡀⠀⠂⠀⠀⠀⠁⠈⢠⣠⠒⠻⣻⡧⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 403 - ⠀⠀⠀⠀⠀⠀⠀⠀⠪⡎⠠⢌⠑⡀⠂⠀⠄⠠⠀⠠⠀⠁⡀⠠⠠⡀⣀⠜⢏⡅⠀⠀⡀⠁⠀⠀⠁⠁⠐⠄⡀⢀⠂⠀⠄⢑⣿⣿⣿⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 404 - ⠀⠀⠀⠀⠀⠀⠀⠀⠼⣻⠧⣣⣀⠐⠨⠁⠕⢈⢀⢀⡁⠀⠈⠠⢀⠀⠐⠜⣽⡗⡤⠀⠂⠀⠠⠀⢂⠠⠀⠁⠄⠀⠔⠀⠑⣨⣿⢯⠋⡅⠀⠀⠀⠀⠀⠀⠀⠀⠀ 405 - ⠀⠀⠀⠀⠀⠀⠀⠀⡚⣷⣭⠎⢃⡗⠄⡄⢀⠁⠀⠅⢀⢅⡀⠠⠀⢠⡀⡩⠷⢇⠀⡀⠄⡠⠤⠆⣀⡀⠄⠉⣠⠃⠴⠀⠈⢁⣿⡛⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 406 - ⠀⠀⠀⠀⠀⠀⠀⠘⡬⡿⣿⡏⡻⡯⠌⢁⢛⠠⠓⠐⠐⠐⠌⠃⠋⠂⡢⢰⣈⢏⣰⠂⠈⠀⠠⠒⠡⠌⠫⠭⠩⠢⡬⠆⠿⢷⢿⡽⡧⠉⠊⠀⠀⠀⠀⠀⠀⠀⠀ 407 - ⠀⠀⠀⠀⠀⠀⠀⠀⠺⣷⣺⣗⣿⡶⡎⡅⣣⢎⠠⡅⣢⡖⠴⠬⡈⠂⡨⢡⠾⣣⣢⠀⠀⡹⠄⡄⠄⡇⣰⡖⡊⠔⢹⣄⣿⣭⣵⣿⢷⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 408 - ⠀⠀⠀⠀⠀⠀⠀⠀⠩⣿⣿⣲⣿⣷⣟⣼⠟⣬⢉⡠⣪⢜⣂⣁⠥⠓⠚⡁⢶⣷⣠⠂⡄⡢⣀⡐⠧⢆⣒⡲⡳⡫⢟⡃⢪⡧⣟⡟⣯⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀ 409 - ⠀⠀⠀⠀⠀⠀⠀⠀⢺⠟⢿⢟⢻⡗⡮⡿⣲⢷⣆⣏⣇⡧⣄⢖⠾⡷⣿⣤⢳⢷⣣⣦⡜⠗⣭⢂⠩⣹⢿⡲⢎⡧⣕⣖⣓⣽⡿⡖⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 410 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 411 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 412 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 413 - ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ 414 - 415 - plcbundle server (%s) 416 - 417 - `, version) 418 - 419 - fmt.Fprintf(w, "What is PLC Bundle?\n") 420 - fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━━\n") 421 - fmt.Fprintf(w, "plcbundle archives AT Protocol's PLC directory operations into\n") 422 - fmt.Fprintf(w, "immutable, cryptographically-chained bundles of 10,000 operations.\n") 423 - fmt.Fprintf(w, "Each bundle is compressed (zstd), hashed (SHA-256), and linked to\n") 424 - fmt.Fprintf(w, "the previous bundle, creating a verifiable chain of DID operations.\n\n") 425 - fmt.Fprintf(w, "More info: https://github.com/atscan/plcbundle\n\n") 426 - 427 - fmt.Fprintf(w, "Server Stats\n") 428 - fmt.Fprintf(w, "━━━━━━━━━━━━\n") 429 - fmt.Fprintf(w, " Bundle count: %d\n", bundleCount) 430 - fmt.Fprintf(w, " Sync mode: %v\n", syncMode) 431 - fmt.Fprintf(w, " WebSocket: %v\n", wsEnabled) 432 - 433 - if bundleCount > 0 { 434 - firstBundle := stats["first_bundle"].(int) 435 - lastBundle := stats["last_bundle"].(int) 436 - totalSize := stats["total_size"].(int64) 437 - 438 - fmt.Fprintf(w, " Range: %06d - %06d\n", firstBundle, lastBundle) 439 - fmt.Fprintf(w, " Total size: %.2f MB\n", float64(totalSize)/(1024*1024)) 440 - fmt.Fprintf(w, " Updated: %s\n", stats["updated_at"].(time.Time).Format("2006-01-02 15:04:05")) 441 - 442 - if gaps, ok := stats["gaps"].(int); ok && gaps > 0 { 443 - fmt.Fprintf(w, " ⚠ Gaps: %d missing bundles\n", gaps) 444 - } 445 - 446 - // Get first and last bundle metadata for hashes 447 - firstMeta, err := index.GetBundle(firstBundle) 448 - if err == nil { 449 - fmt.Fprintf(w, "\n Root: %s\n", firstMeta.Hash) 450 - } 451 - 452 - lastMeta, err := index.GetBundle(lastBundle) 453 - if err == nil { 454 - fmt.Fprintf(w, " Head: %s\n", lastMeta.Hash) 455 - } 456 - } 457 - 458 - // Show mempool stats if sync mode 459 - if syncMode { 460 - mempoolStats := mgr.GetMempoolStats() 461 - count := mempoolStats["count"].(int) 462 - targetBundle := mempoolStats["target_bundle"].(int) 463 - canCreate := mempoolStats["can_create_bundle"].(bool) 464 - 465 - fmt.Fprintf(w, "\nMempool Stats\n") 466 - fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 467 - fmt.Fprintf(w, " Target bundle: %06d\n", targetBundle) 468 - fmt.Fprintf(w, " Operations: %d / %d\n", count, bundle.BUNDLE_SIZE) 469 - fmt.Fprintf(w, " Can create bundle: %v\n", canCreate) 470 - 471 - if count > 0 { 472 - progress := float64(count) / float64(bundle.BUNDLE_SIZE) * 100 473 - fmt.Fprintf(w, " Progress: %.1f%%\n", progress) 474 - 475 - // ASCII Progress bar 476 - barWidth := 50 477 - filled := int(float64(barWidth) * float64(count) / float64(bundle.BUNDLE_SIZE)) 478 - if filled > barWidth { 479 - filled = barWidth 480 - } 481 - bar := strings.Repeat("█", filled) + strings.Repeat("░", barWidth-filled) 482 - fmt.Fprintf(w, " [%s]\n", bar) 483 - 484 - if firstTime, ok := mempoolStats["first_time"].(time.Time); ok { 485 - fmt.Fprintf(w, " First op: %s\n", firstTime.Format("2006-01-02 15:04:05")) 486 - } 487 - if lastTime, ok := mempoolStats["last_time"].(time.Time); ok { 488 - fmt.Fprintf(w, " Last op: %s\n", lastTime.Format("2006-01-02 15:04:05")) 489 - } 490 - } else { 491 - fmt.Fprintf(w, " (empty)\n") 492 - } 493 - } 494 - 495 - fmt.Fprintf(w, "\nAPI Endpoints\n") 496 - fmt.Fprintf(w, "━━━━━━━━━━━━━\n") 497 - fmt.Fprintf(w, " GET / This info page\n") 498 - fmt.Fprintf(w, " GET /index.json Full bundle index\n") 499 - fmt.Fprintf(w, " GET /bundle/:number Bundle metadata (JSON)\n") 500 - fmt.Fprintf(w, " GET /data/:number Raw bundle (zstd compressed)\n") 501 - fmt.Fprintf(w, " GET /jsonl/:number Decompressed JSONL stream\n") 502 - 503 - if wsEnabled { 504 - fmt.Fprintf(w, "\nWebSocket Endpoints\n") 505 - fmt.Fprintf(w, "━━━━━━━━━━━━━━━━━━━\n") 506 - fmt.Fprintf(w, " WS /ws?cursor=N Live stream all records from cursor N\n") 507 - fmt.Fprintf(w, " Streams all bundles, then mempool\n") 508 - fmt.Fprintf(w, " Continues streaming new operations live\n") 509 - fmt.Fprintf(w, " Connection stays open until client closes\n") 510 - fmt.Fprintf(w, " Cursor: global record number (0-based)\n") 511 - fmt.Fprintf(w, " Example: 88410345 = bundle 8841, pos 345\n") 512 - } 513 - 514 - if syncMode { 515 - fmt.Fprintf(w, "\nSync Endpoints\n") 516 - fmt.Fprintf(w, "━━━━━━━━━━━━━━\n") 517 - fmt.Fprintf(w, " GET /sync Sync status & mempool info (JSON)\n") 518 - fmt.Fprintf(w, " GET /sync/mempool Mempool operations (JSONL)\n") 519 - } 520 - 521 - fmt.Fprintf(w, "\nExamples\n") 522 - fmt.Fprintf(w, "━━━━━━━━\n") 523 - fmt.Fprintf(w, " # Get bundle metadata\n") 524 - fmt.Fprintf(w, " curl %s/bundle/1\n\n", baseURL) 525 - fmt.Fprintf(w, " # Download compressed bundle 42\n") 526 - fmt.Fprintf(w, " curl %s/data/42 -o 000042.jsonl.zst\n\n", baseURL) 527 - fmt.Fprintf(w, " # Stream decompressed operations from bundle 42\n") 528 - fmt.Fprintf(w, " curl %s/jsonl/1\n\n", baseURL) 529 - 530 - if wsEnabled { 531 - fmt.Fprintf(w, " # Stream all operations via WebSocket (from beginning)\n") 532 - fmt.Fprintf(w, " websocat %s/ws\n\n", wsURL) 533 - fmt.Fprintf(w, " # Stream from cursor 10000\n") 534 - fmt.Fprintf(w, " websocat '%s/ws?cursor=10000'\n\n", wsURL) 535 - fmt.Fprintf(w, " # Stream and save to file\n") 536 - fmt.Fprintf(w, " websocat %s/ws > all_operations.jsonl\n\n", wsURL) 537 - fmt.Fprintf(w, " # Stream with jq for pretty printing\n") 538 - fmt.Fprintf(w, " websocat %s/ws | jq .\n\n", wsURL) 539 - } 540 - 541 - if syncMode { 542 - fmt.Fprintf(w, " # Get sync status\n") 543 - fmt.Fprintf(w, " curl %s/sync\n\n", baseURL) 544 - fmt.Fprintf(w, " # Get mempool operations\n") 545 - fmt.Fprintf(w, " curl %s/sync/mempool\n\n", baseURL) 546 - } 547 - 548 - fmt.Fprintf(w, "\n────────────────────────────────────────────────────────────────\n") 549 - fmt.Fprintf(w, "plcbundle %s | https://github.com/atscan/plcbundle\n", version) 550 550 } 551 551 552 552 // handleSync returns sync status and mempool info as JSON