A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at test-validate 1630 lines 37 kB view raw view rendered
1# Library Guide 2 3A practical guide to using plcbundle as a Go library in your applications. 4 5## Table of Contents 6 7- [Getting Started](#getting-started) 8- [Core Concepts](#core-concepts) 9- [Common Patterns](#common-patterns) 10- [Building Applications](#building-applications) 11- [Advanced Usage](#advanced-usage) 12- [Best Practices](#best-practices) 13- [API Reference](#api-reference) 14 15--- 16 17## Getting Started 18 19### Installation 20 21```bash 22go get tangled.org/atscan.net/plcbundle 23``` 24 25### Your First Program 26 27Create a simple program to fetch and display bundle information: 28 29```go 30package main 31 32import ( 33 "context" 34 "log" 35 36 plcbundle "tangled.org/atscan.net/plcbundle" 37) 38 39func main() { 40 // Create a manager 41 mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 42 if err != nil { 43 log.Fatal(err) 44 } 45 defer mgr.Close() 46 47 // Get repository info 48 info := mgr.GetInfo() 49 log.Printf("Bundle directory: %s", info["bundle_dir"]) 50 51 // Get index stats 52 index := mgr.GetIndex() 53 stats := index.GetStats() 54 log.Printf("Total bundles: %d", stats["bundle_count"]) 55} 56``` 57 58Run it: 59```bash 60go run main.go 61# 2025/01/15 10:30:00 Bundle directory: ./plc_data 62# 2025/01/15 10:30:00 Total bundles: 0 63``` 64 65### Fetching Your First Bundle 66 67Let's fetch a bundle from the PLC directory: 68 69```go 70package main 71 72import ( 73 "context" 74 "log" 75 76 plcbundle "tangled.org/atscan.net/plcbundle" 77) 78 79func main() { 80 mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 81 if err != nil { 82 log.Fatal(err) 83 } 84 defer mgr.Close() 85 86 ctx := context.Background() 87 88 // Fetch next bundle 89 log.Println("Fetching bundle...") 90 bundle, err := mgr.FetchNext(ctx) 91 if err != nil { 92 log.Fatal(err) 93 } 94 95 log.Printf("✓ Fetched bundle %d", bundle.BundleNumber) 96 log.Printf(" Operations: %d", len(bundle.Operations)) 97 log.Printf(" Unique DIDs: %d", bundle.DIDCount) 98 log.Printf(" Time range: %s to %s", 99 bundle.StartTime.Format("2006-01-02"), 100 bundle.EndTime.Format("2006-01-02")) 101} 102``` 103 104**What's happening here?** 105 1061. `plcbundle.New()` creates a manager that handles all bundle operations 1072. `FetchNext()` automatically: 108 - Fetches operations from PLC directory 109 - Creates a bundle when 10,000 operations are collected 110 - Saves the bundle to disk 111 - Updates the index 112 - Returns the bundle object 113 114### Reading Bundles 115 116Once you have bundles, you can load and read them: 117 118```go 119package main 120 121import ( 122 "context" 123 "log" 124 125 plcbundle "tangled.org/atscan.net/plcbundle" 126) 127 128func main() { 129 mgr, err := plcbundle.New("./plc_data", "") 130 if err != nil { 131 log.Fatal(err) 132 } 133 defer mgr.Close() 134 135 ctx := context.Background() 136 137 // Load bundle 1 138 bundle, err := mgr.Load(ctx, 1) 139 if err != nil { 140 log.Fatal(err) 141 } 142 143 log.Printf("Bundle %d loaded", bundle.BundleNumber) 144 145 // Iterate through operations 146 for i, op := range bundle.Operations { 147 if i >= 5 { 148 break // Just show first 5 149 } 150 log.Printf("%d. DID: %s, CID: %s", i+1, op.DID, op.CID) 151 } 152} 153``` 154 155--- 156 157## Core Concepts 158 159### The Manager 160 161The `Manager` is your main entry point. It handles: 162- Bundle storage and retrieval 163- Index management 164- PLC directory synchronization 165- Verification 166- Mempool management 167 168**Creating a manager:** 169 170```go 171// Simple creation 172mgr, err := plcbundle.New("./bundles", "https://plc.directory") 173 174// Custom configuration 175config := plcbundle.DefaultConfig("./bundles") 176config.VerifyOnLoad = true 177config.AutoRebuild = true 178 179plcClient := plcbundle.NewPLCClient("https://plc.directory") 180mgr, err := plcbundle.NewManager(config, plcClient) 181``` 182 183### Bundles 184 185A bundle contains exactly 10,000 operations: 186 187```go 188type Bundle struct { 189 BundleNumber int // Sequential number (1, 2, 3...) 190 StartTime time.Time // First operation timestamp 191 EndTime time.Time // Last operation timestamp 192 Operations []plc.PLCOperation // The 10,000 operations 193 DIDCount int // Unique DIDs in bundle 194 Hash string // Chain hash (includes history) 195 ContentHash string // This bundle's content hash 196 Parent string // Previous bundle's chain hash 197 CompressedSize int64 // File size on disk 198 UncompressedSize int64 // Original JSONL size 199} 200``` 201 202### The Index 203 204The index tracks all bundles and their metadata: 205 206```go 207index := mgr.GetIndex() 208 209// Get all bundles 210bundles := index.GetBundles() 211for _, meta := range bundles { 212 log.Printf("Bundle %d: %s to %s", 213 meta.BundleNumber, 214 meta.StartTime.Format("2006-01-02"), 215 meta.EndTime.Format("2006-01-02")) 216} 217 218// Get specific bundle metadata 219meta, err := index.GetBundle(42) 220 221// Get last bundle 222lastBundle := index.GetLastBundle() 223``` 224 225### Operations 226 227Each operation represents a DID PLC directory event: 228 229```go 230type PLCOperation struct { 231 DID string // The DID (did:plc:...) 232 Operation map[string]interface{} // The operation data 233 CID string // Content identifier 234 Nullified interface{} // nil, false, or CID string 235 CreatedAt time.Time // When it was created 236 RawJSON []byte // Original JSON bytes 237} 238 239// Check if operation was nullified 240if op.IsNullified() { 241 log.Printf("Operation %s was nullified by %s", op.CID, op.GetNullifyingCID()) 242} 243``` 244 245--- 246 247## Common Patterns 248 249### Pattern 1: Transparent Sync Service 250 251**Goal:** Keep a local PLC mirror continuously synchronized. 252 253This is the most common use case - maintaining an up-to-date copy of the PLC directory. 254 255```go 256package main 257 258import ( 259 "context" 260 "log" 261 "os" 262 "os/signal" 263 "syscall" 264 "time" 265 266 plcbundle "tangled.org/atscan.net/plcbundle" 267) 268 269type SyncService struct { 270 mgr *plcbundle.Manager 271 interval time.Duration 272 stop chan struct{} 273} 274 275func NewSyncService(bundleDir string, interval time.Duration) (*SyncService, error) { 276 mgr, err := plcbundle.New(bundleDir, "https://plc.directory") 277 if err != nil { 278 return nil, err 279 } 280 281 return &SyncService{ 282 mgr: mgr, 283 interval: interval, 284 stop: make(chan struct{}), 285 }, nil 286} 287 288func (s *SyncService) Start() { 289 log.Println("Starting sync service...") 290 291 // Initial sync 292 s.sync() 293 294 // Periodic sync 295 ticker := time.NewTicker(s.interval) 296 defer ticker.Stop() 297 298 for { 299 select { 300 case <-ticker.C: 301 s.sync() 302 case <-s.stop: 303 log.Println("Sync service stopped") 304 return 305 } 306 } 307} 308 309func (s *SyncService) sync() { 310 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) 311 defer cancel() 312 313 log.Println("Checking for new bundles...") 314 315 fetched := 0 316 for { 317 bundle, err := s.mgr.FetchNext(ctx) 318 if err != nil { 319 if isInsufficientOps(err) { 320 if fetched > 0 { 321 log.Printf("✓ Synced %d new bundles", fetched) 322 } else { 323 log.Println("✓ Up to date") 324 } 325 return 326 } 327 log.Printf("Error: %v", err) 328 return 329 } 330 331 fetched++ 332 log.Printf("✓ Fetched bundle %d (%d ops, %d DIDs)", 333 bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount) 334 } 335} 336 337func (s *SyncService) Stop() { 338 close(s.stop) 339 s.mgr.Close() 340} 341 342func isInsufficientOps(err error) bool { 343 return err != nil && 344 (strings.Contains(err.Error(), "insufficient operations") || 345 strings.Contains(err.Error(), "no more available")) 346} 347 348func main() { 349 service, err := NewSyncService("./plc_data", 5*time.Minute) 350 if err != nil { 351 log.Fatal(err) 352 } 353 354 // Start service in background 355 go service.Start() 356 357 // Wait for interrupt 358 sigChan := make(chan os.Signal, 1) 359 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 360 <-sigChan 361 362 log.Println("Shutting down...") 363 service.Stop() 364} 365``` 366 367**Usage:** 368```bash 369go run main.go 370# Starting sync service... 371# Checking for new bundles... 372# ✓ Fetched bundle 8548 (10000 ops, 8234 DIDs) 373# ✓ Fetched bundle 8549 (10000 ops, 8156 DIDs) 374# ✓ Up to date 375# ... (repeats every 5 minutes) 376``` 377 378### Pattern 2: Reading and Processing Operations 379 380**Goal:** Process all historical operations for analysis. 381 382```go 383package main 384 385import ( 386 "context" 387 "log" 388 389 plcbundle "tangled.org/atscan.net/plcbundle" 390) 391 392type OperationProcessor struct { 393 mgr *plcbundle.Manager 394} 395 396func NewOperationProcessor(bundleDir string) (*OperationProcessor, error) { 397 mgr, err := plcbundle.New(bundleDir, "") 398 if err != nil { 399 return nil, err 400 } 401 402 return &OperationProcessor{mgr: mgr}, nil 403} 404 405func (p *OperationProcessor) ProcessAll() error { 406 ctx := context.Background() 407 408 index := p.mgr.GetIndex() 409 bundles := index.GetBundles() 410 411 log.Printf("Processing %d bundles...", len(bundles)) 412 413 totalOps := 0 414 uniqueDIDs := make(map[string]bool) 415 416 for _, meta := range bundles { 417 // Load bundle 418 bundle, err := p.mgr.Load(ctx, meta.BundleNumber) 419 if err != nil { 420 return err 421 } 422 423 // Process operations 424 for _, op := range bundle.Operations { 425 totalOps++ 426 uniqueDIDs[op.DID] = true 427 428 // Your processing logic here 429 p.processOperation(op) 430 } 431 432 if meta.BundleNumber % 100 == 0 { 433 log.Printf("Processed bundle %d...", meta.BundleNumber) 434 } 435 } 436 437 log.Printf("✓ Processed %d operations from %d unique DIDs", 438 totalOps, len(uniqueDIDs)) 439 440 return nil 441} 442 443func (p *OperationProcessor) processOperation(op plcbundle.PLCOperation) { 444 // Example: Extract PDS endpoints 445 if services, ok := op.Operation["services"].(map[string]interface{}); ok { 446 if pds, ok := services["atproto_pds"].(map[string]interface{}); ok { 447 if endpoint, ok := pds["endpoint"].(string); ok { 448 log.Printf("DID %s uses PDS: %s", op.DID, endpoint) 449 } 450 } 451 } 452} 453 454func main() { 455 processor, err := NewOperationProcessor("./plc_data") 456 if err != nil { 457 log.Fatal(err) 458 } 459 460 if err := processor.ProcessAll(); err != nil { 461 log.Fatal(err) 462 } 463} 464``` 465 466### Pattern 3: Time-Based Queries 467 468**Goal:** Export operations from a specific time period. 469 470```go 471package main 472 473import ( 474 "context" 475 "encoding/json" 476 "log" 477 "os" 478 "time" 479 480 plcbundle "tangled.org/atscan.net/plcbundle" 481) 482 483func exportOperationsSince(bundleDir string, since time.Time, limit int) error { 484 mgr, err := plcbundle.New(bundleDir, "") 485 if err != nil { 486 return err 487 } 488 defer mgr.Close() 489 490 ctx := context.Background() 491 492 // Export operations after timestamp 493 ops, err := mgr.Export(ctx, since, limit) 494 if err != nil { 495 return err 496 } 497 498 log.Printf("Exporting %d operations...", len(ops)) 499 500 // Write as JSONL to stdout 501 encoder := json.NewEncoder(os.Stdout) 502 for _, op := range ops { 503 if err := encoder.Encode(op); err != nil { 504 return err 505 } 506 } 507 508 return nil 509} 510 511func main() { 512 // Export operations from the last 7 days 513 since := time.Now().AddDate(0, 0, -7) 514 515 if err := exportOperationsSince("./plc_data", since, 50000); err != nil { 516 log.Fatal(err) 517 } 518} 519``` 520 521**Output to file:** 522```bash 523go run main.go > last_7_days.jsonl 524``` 525 526### Pattern 4: Verification Service 527 528**Goal:** Periodically verify bundle integrity. 529 530```go 531package main 532 533import ( 534 "context" 535 "log" 536 "time" 537 538 plcbundle "tangled.org/atscan.net/plcbundle" 539) 540 541type VerificationService struct { 542 mgr *plcbundle.Manager 543 interval time.Duration 544} 545 546func NewVerificationService(bundleDir string, interval time.Duration) (*VerificationService, error) { 547 mgr, err := plcbundle.New(bundleDir, "") 548 if err != nil { 549 return nil, err 550 } 551 552 return &VerificationService{ 553 mgr: mgr, 554 interval: interval, 555 }, nil 556} 557 558func (v *VerificationService) Start() { 559 ticker := time.NewTicker(v.interval) 560 defer ticker.Stop() 561 562 // Verify immediately on start 563 v.verify() 564 565 for range ticker.C { 566 v.verify() 567 } 568} 569 570func (v *VerificationService) verify() { 571 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) 572 defer cancel() 573 574 log.Println("Starting chain verification...") 575 start := time.Now() 576 577 result, err := v.mgr.VerifyChain(ctx) 578 if err != nil { 579 log.Printf("❌ Verification error: %v", err) 580 return 581 } 582 583 elapsed := time.Since(start) 584 585 if result.Valid { 586 log.Printf("✅ Chain verified: %d bundles, took %s", 587 result.ChainLength, elapsed.Round(time.Second)) 588 589 // Get head hash 590 index := v.mgr.GetIndex() 591 if last := index.GetLastBundle(); last != nil { 592 log.Printf(" Head hash: %s...", last.Hash[:16]) 593 } 594 } else { 595 log.Printf("❌ Chain broken at bundle %d: %s", 596 result.BrokenAt, result.Error) 597 598 // Alert or take action 599 v.handleBrokenChain(result) 600 } 601} 602 603func (v *VerificationService) handleBrokenChain(result *plcbundle.ChainVerificationResult) { 604 // Send alert, trigger re-sync, etc. 605 log.Printf("⚠️ ALERT: Chain integrity compromised!") 606 // TODO: Implement your alerting logic 607} 608 609func main() { 610 service, err := NewVerificationService("./plc_data", 24*time.Hour) 611 if err != nil { 612 log.Fatal(err) 613 } 614 615 log.Println("Verification service started (daily checks)") 616 service.Start() 617} 618``` 619 620### Pattern 5: Custom HTTP API 621 622**Goal:** Build a custom API on top of your bundle archive. 623 624```go 625package main 626 627import ( 628 "encoding/json" 629 "log" 630 "net/http" 631 "strconv" 632 633 plcbundle "tangled.org/atscan.net/plcbundle" 634) 635 636type API struct { 637 mgr *plcbundle.Manager 638} 639 640func NewAPI(bundleDir string) (*API, error) { 641 mgr, err := plcbundle.New(bundleDir, "") 642 if err != nil { 643 return nil, err 644 } 645 646 return &API{mgr: mgr}, nil 647} 648 649func (api *API) handleStats(w http.ResponseWriter, r *http.Request) { 650 index := api.mgr.GetIndex() 651 stats := index.GetStats() 652 653 response := map[string]interface{}{ 654 "bundles": stats["bundle_count"], 655 "first": stats["first_bundle"], 656 "last": stats["last_bundle"], 657 "total_size": stats["total_size"], 658 "start_time": stats["start_time"], 659 "end_time": stats["end_time"], 660 "updated_at": stats["updated_at"], 661 } 662 663 w.Header().Set("Content-Type", "application/json") 664 json.NewEncoder(w).Encode(response) 665} 666 667func (api *API) handleOperations(w http.ResponseWriter, r *http.Request) { 668 bundleNumStr := r.URL.Query().Get("bundle") 669 if bundleNumStr == "" { 670 http.Error(w, "bundle parameter required", http.StatusBadRequest) 671 return 672 } 673 674 bundleNum, err := strconv.Atoi(bundleNumStr) 675 if err != nil { 676 http.Error(w, "invalid bundle number", http.StatusBadRequest) 677 return 678 } 679 680 ctx := r.Context() 681 bundle, err := api.mgr.Load(ctx, bundleNum) 682 if err != nil { 683 http.Error(w, err.Error(), http.StatusNotFound) 684 return 685 } 686 687 w.Header().Set("Content-Type", "application/x-ndjson") 688 encoder := json.NewEncoder(w) 689 for _, op := range bundle.Operations { 690 encoder.Encode(op) 691 } 692} 693 694func (api *API) handleDID(w http.ResponseWriter, r *http.Request) { 695 did := r.URL.Query().Get("did") 696 if did == "" { 697 http.Error(w, "did parameter required", http.StatusBadRequest) 698 return 699 } 700 701 ctx := r.Context() 702 703 // Search through bundles for this DID 704 var operations []plcbundle.PLCOperation 705 706 index := api.mgr.GetIndex() 707 bundles := index.GetBundles() 708 709 for _, meta := range bundles { 710 bundle, err := api.mgr.Load(ctx, meta.BundleNumber) 711 if err != nil { 712 continue 713 } 714 715 for _, op := range bundle.Operations { 716 if op.DID == did { 717 operations = append(operations, op) 718 } 719 } 720 } 721 722 w.Header().Set("Content-Type", "application/json") 723 json.NewEncoder(w).Encode(map[string]interface{}{ 724 "did": did, 725 "operations": operations, 726 "count": len(operations), 727 }) 728} 729 730func main() { 731 api, err := NewAPI("./plc_data") 732 if err != nil { 733 log.Fatal(err) 734 } 735 736 http.HandleFunc("/stats", api.handleStats) 737 http.HandleFunc("/operations", api.handleOperations) 738 http.HandleFunc("/did", api.handleDID) 739 740 log.Println("API listening on :8080") 741 log.Fatal(http.ListenAndServe(":8080", nil)) 742} 743``` 744 745**Usage:** 746```bash 747# Get stats 748curl http://localhost:8080/stats 749 750# Get operations from bundle 1 751curl http://localhost:8080/operations?bundle=1 752 753# Get all operations for a DID 754curl http://localhost:8080/did?did=did:plc:example123 755``` 756 757--- 758 759## Building Applications 760 761### Application 1: PDS Discovery Tool 762 763Find all PDS endpoints in the network: 764 765```go 766package main 767 768import ( 769 "context" 770 "fmt" 771 "log" 772 773 plcbundle "tangled.org/atscan.net/plcbundle" 774) 775 776type PDSTracker struct { 777 mgr *plcbundle.Manager 778 endpoints map[string]int // endpoint -> count 779} 780 781func NewPDSTracker(bundleDir string) (*PDSTracker, error) { 782 mgr, err := plcbundle.New(bundleDir, "") 783 if err != nil { 784 return nil, err 785 } 786 787 return &PDSTracker{ 788 mgr: mgr, 789 endpoints: make(map[string]int), 790 }, nil 791} 792 793func (pt *PDSTracker) Scan() error { 794 ctx := context.Background() 795 796 index := pt.mgr.GetIndex() 797 bundles := index.GetBundles() 798 799 log.Printf("Scanning %d bundles for PDS endpoints...", len(bundles)) 800 801 for _, meta := range bundles { 802 bundle, err := pt.mgr.Load(ctx, meta.BundleNumber) 803 if err != nil { 804 return err 805 } 806 807 for _, op := range bundle.Operations { 808 if endpoint := pt.extractPDS(op); endpoint != "" { 809 pt.endpoints[endpoint]++ 810 } 811 } 812 } 813 814 return nil 815} 816 817func (pt *PDSTracker) extractPDS(op plcbundle.PLCOperation) string { 818 services, ok := op.Operation["services"].(map[string]interface{}) 819 if !ok { 820 return "" 821 } 822 823 pds, ok := services["atproto_pds"].(map[string]interface{}) 824 if !ok { 825 return "" 826 } 827 828 endpoint, ok := pds["endpoint"].(string) 829 if !ok { 830 return "" 831 } 832 833 return endpoint 834} 835 836func (pt *PDSTracker) PrintResults() { 837 log.Printf("\nFound %d unique PDS endpoints:\n", len(pt.endpoints)) 838 839 // Sort by count 840 type endpointCount struct { 841 endpoint string 842 count int 843 } 844 845 var sorted []endpointCount 846 for endpoint, count := range pt.endpoints { 847 sorted = append(sorted, endpointCount{endpoint, count}) 848 } 849 850 sort.Slice(sorted, func(i, j int) bool { 851 return sorted[i].count > sorted[j].count 852 }) 853 854 // Print top 20 855 for i, ec := range sorted { 856 if i >= 20 { 857 break 858 } 859 fmt.Printf("%3d. %s (%d DIDs)\n", i+1, ec.endpoint, ec.count) 860 } 861} 862 863func main() { 864 tracker, err := NewPDSTracker("./plc_data") 865 if err != nil { 866 log.Fatal(err) 867 } 868 869 if err := tracker.Scan(); err != nil { 870 log.Fatal(err) 871 } 872 873 tracker.PrintResults() 874} 875``` 876 877### Application 2: DID History Viewer 878 879View the complete history of a DID: 880 881```go 882package main 883 884import ( 885 "context" 886 "encoding/json" 887 "fmt" 888 "log" 889 "os" 890 891 plcbundle "tangled.org/atscan.net/plcbundle" 892) 893 894type DIDHistory struct { 895 DID string `json:"did"` 896 Operations []plcbundle.PLCOperation `json:"operations"` 897 FirstSeen time.Time `json:"first_seen"` 898 LastSeen time.Time `json:"last_seen"` 899 OpCount int `json:"operation_count"` 900} 901 902func getDIDHistory(bundleDir, did string) (*DIDHistory, error) { 903 mgr, err := plcbundle.New(bundleDir, "") 904 if err != nil { 905 return nil, err 906 } 907 defer mgr.Close() 908 909 ctx := context.Background() 910 911 history := &DIDHistory{ 912 DID: did, 913 Operations: make([]plcbundle.PLCOperation, 0), 914 } 915 916 index := mgr.GetIndex() 917 bundles := index.GetBundles() 918 919 log.Printf("Searching for DID %s...", did) 920 921 for _, meta := range bundles { 922 bundle, err := mgr.Load(ctx, meta.BundleNumber) 923 if err != nil { 924 continue 925 } 926 927 for _, op := range bundle.Operations { 928 if op.DID == did { 929 history.Operations = append(history.Operations, op) 930 } 931 } 932 } 933 934 if len(history.Operations) == 0 { 935 return nil, fmt.Errorf("DID not found") 936 } 937 938 // Set timestamps 939 history.FirstSeen = history.Operations[0].CreatedAt 940 history.LastSeen = history.Operations[len(history.Operations)-1].CreatedAt 941 history.OpCount = len(history.Operations) 942 943 return history, nil 944} 945 946func main() { 947 if len(os.Args) < 2 { 948 log.Fatal("Usage: did-history <did>") 949 } 950 951 did := os.Args[1] 952 953 history, err := getDIDHistory("./plc_data", did) 954 if err != nil { 955 log.Fatal(err) 956 } 957 958 // Print as JSON 959 encoder := json.NewEncoder(os.Stdout) 960 encoder.SetIndent("", " ") 961 encoder.Encode(history) 962} 963``` 964 965### Application 3: Real-time Monitor 966 967Monitor new operations as they arrive: 968 969```go 970package main 971 972import ( 973 "context" 974 "log" 975 "time" 976 977 plcbundle "tangled.org/atscan.net/plcbundle" 978) 979 980type Monitor struct { 981 mgr *plcbundle.Manager 982 lastSeen int // Last bundle number processed 983 pollInterval time.Duration 984} 985 986func NewMonitor(bundleDir string, pollInterval time.Duration) (*Monitor, error) { 987 mgr, err := plcbundle.New(bundleDir, "https://plc.directory") 988 if err != nil { 989 return nil, err 990 } 991 992 // Get current position 993 index := mgr.GetIndex() 994 lastBundle := index.GetLastBundle() 995 lastSeen := 0 996 if lastBundle != nil { 997 lastSeen = lastBundle.BundleNumber 998 } 999 1000 return &Monitor{ 1001 mgr: mgr, 1002 lastSeen: lastSeen, 1003 pollInterval: pollInterval, 1004 }, nil 1005} 1006 1007func (m *Monitor) Start() { 1008 log.Println("Monitor started, watching for new bundles...") 1009 1010 ticker := time.NewTicker(m.pollInterval) 1011 defer ticker.Stop() 1012 1013 for range ticker.C { 1014 m.check() 1015 } 1016} 1017 1018func (m *Monitor) check() { 1019 ctx := context.Background() 1020 1021 // Try to fetch next bundle 1022 bundle, err := m.mgr.FetchNext(ctx) 1023 if err != nil { 1024 // Not an error if no new bundle available 1025 return 1026 } 1027 1028 // New bundle! 1029 log.Printf("🔔 New bundle: %d", bundle.BundleNumber) 1030 log.Printf(" Operations: %d", len(bundle.Operations)) 1031 log.Printf(" DIDs: %d", bundle.DIDCount) 1032 log.Printf(" Time: %s", bundle.EndTime.Format("2006-01-02 15:04:05")) 1033 1034 // Process new operations 1035 m.processNewOperations(bundle) 1036 1037 m.lastSeen = bundle.BundleNumber 1038} 1039 1040func (m *Monitor) processNewOperations(bundle *plcbundle.Bundle) { 1041 for _, op := range bundle.Operations { 1042 // Check for interesting operations 1043 if op.IsNullified() { 1044 log.Printf(" ⚠️ Nullified: %s", op.DID) 1045 } 1046 1047 // Check for new DIDs (operation type "create") 1048 if opType, ok := op.Operation["type"].(string); ok && opType == "create" { 1049 log.Printf(" ➕ New DID: %s", op.DID) 1050 } 1051 } 1052} 1053 1054func main() { 1055 monitor, err := NewMonitor("./plc_data", 30*time.Second) 1056 if err != nil { 1057 log.Fatal(err) 1058 } 1059 1060 monitor.Start() 1061} 1062``` 1063 1064--- 1065 1066## Advanced Usage 1067 1068### Custom Configuration 1069 1070Full control over bundle manager behavior: 1071 1072```go 1073package main 1074 1075import ( 1076 "log" 1077 "runtime" 1078 "time" 1079 1080 "tangled.org/atscan.net/plcbundle/bundle" 1081 "tangled.org/atscan.net/plcbundle/plc" 1082 plcbundle "tangled.org/atscan.net/plcbundle" 1083) 1084 1085func main() { 1086 // Custom configuration 1087 config := &bundle.Config{ 1088 BundleDir: "./my_bundles", 1089 VerifyOnLoad: true, // Verify hashes when loading 1090 AutoRebuild: true, // Auto-rebuild index if needed 1091 RebuildWorkers: runtime.NumCPU(), // Parallel workers for rebuild 1092 Logger: &MyCustomLogger{}, // Custom logger 1093 1094 // Progress callback for rebuild 1095 RebuildProgress: func(current, total int) { 1096 if current%100 == 0 { 1097 log.Printf("Rebuild: %d/%d (%.1f%%)", 1098 current, total, float64(current)/float64(total)*100) 1099 } 1100 }, 1101 } 1102 1103 // Custom PLC client with rate limiting 1104 plcClient := plc.NewClient("https://plc.directory", 1105 plc.WithRateLimit(60, time.Minute), // 60 req/min 1106 plc.WithTimeout(30*time.Second), // 30s timeout 1107 plc.WithLogger(&MyCustomLogger{}), // Custom logger 1108 ) 1109 1110 // Create manager 1111 mgr, err := bundle.NewManager(config, plcClient) 1112 if err != nil { 1113 log.Fatal(err) 1114 } 1115 defer mgr.Close() 1116 1117 log.Println("Manager created with custom configuration") 1118} 1119 1120// Custom logger implementation 1121type MyCustomLogger struct{} 1122 1123func (l *MyCustomLogger) Printf(format string, v ...interface{}) { 1124 // Add custom formatting, filtering, etc. 1125 log.Printf("[PLCBUNDLE] "+format, v...) 1126} 1127 1128func (l *MyCustomLogger) Println(v ...interface{}) { 1129 log.Println(append([]interface{}{"[PLCBUNDLE]"}, v...)...) 1130} 1131``` 1132 1133### Streaming Data 1134 1135Stream bundle data without loading everything into memory: 1136 1137```go 1138package main 1139 1140import ( 1141 "bufio" 1142 "context" 1143 "encoding/json" 1144 "io" 1145 "log" 1146 1147 plcbundle "tangled.org/atscan.net/plcbundle" 1148) 1149 1150func streamBundle(mgr *plcbundle.Manager, bundleNumber int) error { 1151 ctx := context.Background() 1152 1153 // Get decompressed stream 1154 reader, err := mgr.StreamDecompressed(ctx, bundleNumber) 1155 if err != nil { 1156 return err 1157 } 1158 defer reader.Close() 1159 1160 // Read line by line (JSONL) 1161 scanner := bufio.NewScanner(reader) 1162 1163 // Set buffer size for large lines 1164 buf := make([]byte, 0, 64*1024) 1165 scanner.Buffer(buf, 1024*1024) 1166 1167 lineNum := 0 1168 for scanner.Scan() { 1169 lineNum++ 1170 1171 var op plcbundle.PLCOperation 1172 if err := json.Unmarshal(scanner.Bytes(), &op); err != nil { 1173 log.Printf("Warning: failed to parse line %d: %v", lineNum, err) 1174 continue 1175 } 1176 1177 // Process operation without storing all in memory 1178 processOperation(op) 1179 } 1180 1181 return scanner.Err() 1182} 1183 1184func processOperation(op plcbundle.PLCOperation) { 1185 // Your processing logic 1186 log.Printf("Processing: %s", op.DID) 1187} 1188 1189func main() { 1190 mgr, err := plcbundle.New("./plc_data", "") 1191 if err != nil { 1192 log.Fatal(err) 1193 } 1194 defer mgr.Close() 1195 1196 // Stream bundle 1 1197 if err := streamBundle(mgr, 1); err != nil { 1198 log.Fatal(err) 1199 } 1200} 1201``` 1202 1203### Parallel Processing 1204 1205Process multiple bundles concurrently: 1206 1207```go 1208package main 1209 1210import ( 1211 "context" 1212 "log" 1213 "sync" 1214 1215 plcbundle "tangled.org/atscan.net/plcbundle" 1216) 1217 1218func processParallel(mgr *plcbundle.Manager, workers int) error { 1219 ctx := context.Background() 1220 1221 index := mgr.GetIndex() 1222 bundles := index.GetBundles() 1223 1224 // Create job channel 1225 jobs := make(chan int, len(bundles)) 1226 results := make(chan error, len(bundles)) 1227 1228 // Start workers 1229 var wg sync.WaitGroup 1230 for w := 0; w < workers; w++ { 1231 wg.Add(1) 1232 go func() { 1233 defer wg.Done() 1234 for bundleNum := range jobs { 1235 if err := processBundle(ctx, mgr, bundleNum); err != nil { 1236 results <- err 1237 } else { 1238 results <- nil 1239 } 1240 } 1241 }() 1242 } 1243 1244 // Send jobs 1245 for _, meta := range bundles { 1246 jobs <- meta.BundleNumber 1247 } 1248 close(jobs) 1249 1250 // Wait for completion 1251 go func() { 1252 wg.Wait() 1253 close(results) 1254 }() 1255 1256 // Collect results 1257 errors := 0 1258 for err := range results { 1259 if err != nil { 1260 log.Printf("Error: %v", err) 1261 errors++ 1262 } 1263 } 1264 1265 if errors > 0 { 1266 return fmt.Errorf("%d bundles failed processing", errors) 1267 } 1268 1269 return nil 1270} 1271 1272func processBundle(ctx context.Context, mgr *plcbundle.Manager, bundleNum int) error { 1273 bundle, err := mgr.Load(ctx, bundleNum) 1274 if err != nil { 1275 return err 1276 } 1277 1278 // Process operations 1279 for _, op := range bundle.Operations { 1280 // Your logic here 1281 _ = op 1282 } 1283 1284 log.Printf("Processed bundle %d", bundleNum) 1285 return nil 1286} 1287 1288func main() { 1289 mgr, err := plcbundle.New("./plc_data", "") 1290 if err != nil { 1291 log.Fatal(err) 1292 } 1293 defer mgr.Close() 1294 1295 // Process with 8 workers 1296 if err := processParallel(mgr, 8); err != nil { 1297 log.Fatal(err) 1298 } 1299} 1300``` 1301 1302### Working with Mempool 1303 1304Access operations before they're bundled: 1305 1306```go 1307package main 1308 1309import ( 1310 "log" 1311 1312 plcbundle "tangled.org/atscan.net/plcbundle" 1313) 1314 1315func main() { 1316 mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 1317 if err != nil { 1318 log.Fatal(err) 1319 } 1320 defer mgr.Close() 1321 1322 // Get mempool stats 1323 stats := mgr.GetMempoolStats() 1324 1325 count := stats["count"].(int) 1326 targetBundle := stats["target_bundle"].(int) 1327 canCreate := stats["can_create_bundle"].(bool) 1328 1329 log.Printf("Mempool status:") 1330 log.Printf(" Target bundle: %d", targetBundle) 1331 log.Printf(" Operations: %d/%d", count, plcbundle.BUNDLE_SIZE) 1332 log.Printf(" Ready: %v", canCreate) 1333 1334 if count > 0 { 1335 // Get mempool operations 1336 ops, err := mgr.GetMempoolOperations() 1337 if err != nil { 1338 log.Fatal(err) 1339 } 1340 1341 log.Printf("Latest unbundled operations:") 1342 for i, op := range ops { 1343 if i >= 5 { 1344 break 1345 } 1346 log.Printf(" %d. %s (%s)", i+1, op.DID, op.CreatedAt.Format("15:04:05")) 1347 } 1348 } 1349 1350 // Validate chronological order 1351 if err := mgr.ValidateMempool(); err != nil { 1352 log.Printf("⚠️ Mempool validation failed: %v", err) 1353 } else { 1354 log.Println("✓ Mempool validated") 1355 } 1356} 1357``` 1358 1359--- 1360 1361## Best Practices 1362 1363### 1. Always Close the Manager 1364 1365Use `defer` to ensure cleanup: 1366 1367```go 1368mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 1369if err != nil { 1370 return err 1371} 1372defer mgr.Close() // Always close! 1373``` 1374 1375### 2. Handle Context Cancellation 1376 1377Support graceful shutdown: 1378 1379```go 1380ctx, cancel := context.WithCancel(context.Background()) 1381defer cancel() 1382 1383// Listen for interrupt 1384sigChan := make(chan os.Signal, 1) 1385signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 1386 1387go func() { 1388 <-sigChan 1389 log.Println("Interrupt received, stopping...") 1390 cancel() 1391}() 1392 1393// Use context in operations 1394bundle, err := mgr.FetchNext(ctx) 1395if err == context.Canceled { 1396 log.Println("Operation cancelled gracefully") 1397 return nil 1398} 1399``` 1400 1401### 3. Check Errors Properly 1402 1403Distinguish between different error types: 1404 1405```go 1406bundle, err := mgr.FetchNext(ctx) 1407if err != nil { 1408 // Check if it's just "caught up" 1409 if strings.Contains(err.Error(), "insufficient operations") { 1410 log.Println("No new bundles available (caught up)") 1411 return nil 1412 } 1413 1414 // Real error 1415 return fmt.Errorf("fetch failed: %w", err) 1416} 1417``` 1418 1419### 4. Use Streaming for Large Datasets 1420 1421Don't load everything into memory: 1422 1423```go 1424// ❌ Bad: Loads all operations into memory 1425index := mgr.GetIndex() 1426var allOps []plcbundle.PLCOperation 1427for _, meta := range index.GetBundles() { 1428 bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1429 allOps = append(allOps, bundle.Operations...) 1430} 1431 1432// ✅ Good: Process one bundle at a time 1433for _, meta := range index.GetBundles() { 1434 bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1435 for _, op := range bundle.Operations { 1436 processOperation(op) 1437 } 1438} 1439``` 1440 1441### 5. Enable Verification in Production 1442 1443```go 1444config := plcbundle.DefaultConfig("./plc_data") 1445config.VerifyOnLoad = true // Verify hashes when loading 1446 1447mgr, err := plcbundle.NewManager(config, plcClient) 1448``` 1449 1450### 6. Log Appropriately 1451 1452Implement custom logger for production: 1453 1454```go 1455type ProductionLogger struct { 1456 logger *zap.Logger 1457} 1458 1459func (l *ProductionLogger) Printf(format string, v ...interface{}) { 1460 l.logger.Sugar().Infof(format, v...) 1461} 1462 1463func (l *ProductionLogger) Println(v ...interface{}) { 1464 l.logger.Sugar().Info(v...) 1465} 1466``` 1467 1468### 7. Handle Rate Limits 1469 1470Configure PLC client appropriately: 1471 1472```go 1473// Production: Be conservative 1474plcClient := plc.NewClient("https://plc.directory", 1475 plc.WithRateLimit(60, time.Minute), // 60 req/min max 1476 plc.WithTimeout(60*time.Second), 1477) 1478 1479// Development: Can be more aggressive (but respectful) 1480plcClient := plc.NewClient("https://plc.directory", 1481 plc.WithRateLimit(90, time.Minute), 1482 plc.WithTimeout(30*time.Second), 1483) 1484``` 1485 1486--- 1487 1488## API Reference 1489 1490### Manager Methods 1491 1492```go 1493// Creation 1494New(bundleDir, plcURL string) (*Manager, error) 1495NewManager(config *Config, plcClient *PLCClient) (*Manager, error) 1496 1497// Lifecycle 1498Close() 1499 1500// Fetching 1501FetchNext(ctx) (*Bundle, error) 1502 1503// Loading 1504Load(ctx, bundleNumber int) (*Bundle, error) 1505 1506// Verification 1507Verify(ctx, bundleNumber int) (*VerificationResult, error) 1508VerifyChain(ctx) (*ChainVerificationResult, error) 1509 1510// Exporting 1511Export(ctx, afterTime time.Time, count int) ([]PLCOperation, error) 1512 1513// Streaming 1514StreamRaw(ctx, bundleNumber int) (io.ReadCloser, error) 1515StreamDecompressed(ctx, bundleNumber int) (io.ReadCloser, error) 1516 1517// Index 1518GetIndex() *Index 1519ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) 1520Scan() (*DirectoryScanResult, error) 1521 1522// Mempool 1523GetMempoolStats() map[string]interface{} 1524GetMempoolOperations() ([]PLCOperation, error) 1525ValidateMempool() error 1526ClearMempool() error 1527 1528// Info 1529GetInfo() map[string]interface{} 1530IsBundleIndexed(bundleNumber int) bool 1531``` 1532 1533### Index Methods 1534 1535```go 1536// Creation 1537NewIndex() *Index 1538LoadIndex(path string) (*Index, error) 1539 1540// Persistence 1541Save(path string) error 1542 1543// Queries 1544GetBundle(bundleNumber int) (*BundleMetadata, error) 1545GetLastBundle() *BundleMetadata 1546GetBundles() []*BundleMetadata 1547GetBundleRange(start, end int) []*BundleMetadata 1548 1549// Stats 1550Count() int 1551FindGaps() []int 1552GetStats() map[string]interface{} 1553``` 1554 1555### Configuration Types 1556 1557```go 1558type Config struct { 1559 BundleDir string 1560 VerifyOnLoad bool 1561 AutoRebuild bool 1562 RebuildWorkers int 1563 RebuildProgress func(current, total int) 1564 Logger Logger 1565} 1566 1567type Logger interface { 1568 Printf(format string, v ...interface{}) 1569 Println(v ...interface{}) 1570} 1571``` 1572 1573--- 1574 1575## Troubleshooting 1576 1577### Bundle Not Found Error 1578 1579```go 1580bundle, err := mgr.Load(ctx, 999) 1581if err != nil { 1582 if strings.Contains(err.Error(), "not in index") { 1583 // Bundle doesn't exist 1584 log.Printf("Bundle 999 hasn't been fetched yet") 1585 } 1586} 1587``` 1588 1589### Insufficient Operations Error 1590 1591```go 1592bundle, err := mgr.FetchNext(ctx) 1593if err != nil { 1594 if strings.Contains(err.Error(), "insufficient operations") { 1595 // Not enough operations for a complete bundle 1596 // Check mempool 1597 stats := mgr.GetMempoolStats() 1598 count := stats["count"].(int) 1599 log.Printf("Only %d operations available (need %d)", count, plcbundle.BUNDLE_SIZE) 1600 } 1601} 1602``` 1603 1604### Memory Usage 1605 1606If processing large numbers of bundles: 1607 1608```go 1609// Force garbage collection between bundles 1610for _, meta := range index.GetBundles() { 1611 bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1612 processBundle(bundle) 1613 1614 runtime.GC() // Help garbage collector 1615} 1616``` 1617 1618--- 1619 1620## Examples Repository 1621 1622Find complete, runnable examples at: 1623- https://github.com/plcbundle/examples 1624 1625Including: 1626- Complete sync service 1627- API server 1628- Analysis tools 1629- Monitoring services 1630