A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
at did-resolver 1682 lines 38 kB view raw view rendered
1# Library Guide 2 3A practical guide to using plcbundle as a Go library in your applications. 4 5## Table of Contents 6 7- [Getting Started](#getting-started) 8- [Core Concepts](#core-concepts) 9- [Common Patterns](#common-patterns) 10- [Building Applications](#building-applications) 11- [Advanced Usage](#advanced-usage) 12- [Best Practices](#best-practices) 13- [API Reference](#api-reference) 14 15--- 16 17## Getting Started 18 19### Installation 20 21```bash 22go get tangled.org/atscan.net/plcbundle 23``` 24 25### Your First Program 26 27Create a simple program to fetch and display bundle information: 28 29```go 30package main 31 32import ( 33 "context" 34 "log" 35 36 plcbundle "tangled.org/atscan.net/plcbundle" 37) 38 39func main() { 40 // Create a manager 41 mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 42 if err != nil { 43 log.Fatal(err) 44 } 45 defer mgr.Close() 46 47 // Get repository info 48 info := mgr.GetInfo() 49 log.Printf("Bundle directory: %s", info["bundle_dir"]) 50 51 // Get index stats 52 index := mgr.GetIndex() 53 stats := index.GetStats() 54 log.Printf("Total bundles: %d", stats["bundle_count"]) 55} 56``` 57 58Run it: 59```bash 60go run main.go 61# 2025/01/15 10:30:00 Bundle directory: ./plc_data 62# 2025/01/15 10:30:00 Total bundles: 0 63``` 64 65### Fetching Your First Bundle 66 67Let's fetch a bundle from the PLC directory: 68 69```go 70package main 71 72import ( 73 "context" 74 "log" 75 76 plcbundle "tangled.org/atscan.net/plcbundle" 77) 78 79func main() { 80 mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 81 if err != nil { 82 log.Fatal(err) 83 } 84 defer mgr.Close() 85 86 ctx := context.Background() 87 88 // Fetch next bundle 89 log.Println("Fetching bundle...") 90 bundle, err := mgr.FetchNext(ctx) 91 if err != nil { 92 log.Fatal(err) 93 } 94 95 log.Printf("✓ Fetched bundle %d", bundle.BundleNumber) 96 log.Printf(" Operations: %d", len(bundle.Operations)) 97 log.Printf(" Unique DIDs: %d", bundle.DIDCount) 98 log.Printf(" Time range: %s to %s", 99 bundle.StartTime.Format("2006-01-02"), 100 bundle.EndTime.Format("2006-01-02")) 101} 102``` 103 104**What's happening here?** 105 1061. `plcbundle.New()` creates a manager that handles all bundle operations 1072. `FetchNext()` automatically: 108 - Fetches operations from PLC directory 109 - Creates a bundle when 10,000 operations are collected 110 - Saves the bundle to disk 111 - Updates the index 112 - Returns the bundle object 113 114### Reading Bundles 115 116Once you have bundles, you can load and read them: 117 118```go 119package main 120 121import ( 122 "context" 123 "log" 124 125 plcbundle "tangled.org/atscan.net/plcbundle" 126) 127 128func main() { 129 mgr, err := plcbundle.New("./plc_data", "") 130 if err != nil { 131 log.Fatal(err) 132 } 133 defer mgr.Close() 134 135 ctx := context.Background() 136 137 // Load bundle 1 138 bundle, err := mgr.Load(ctx, 1) 139 if err != nil { 140 log.Fatal(err) 141 } 142 143 log.Printf("Bundle %d loaded", bundle.BundleNumber) 144 145 // Iterate through operations 146 for i, op := range bundle.Operations { 147 if i >= 5 { 148 break // Just show first 5 149 } 150 log.Printf("%d. DID: %s, CID: %s", i+1, op.DID, op.CID) 151 } 152} 153``` 154 155--- 156 157## Core Concepts 158 159### The Manager 160 161The `Manager` is your main entry point. It handles: 162- Bundle storage and retrieval 163- Index management 164- PLC directory synchronization 165- Verification 166- Mempool management 167 168**Creating a manager:** 169 170```go 171// Simple creation 172mgr, err := plcbundle.New("./bundles", "https://plc.directory") 173 174// Custom configuration 175config := plcbundle.DefaultConfig("./bundles") 176config.VerifyOnLoad = true 177config.AutoRebuild = true 178 179plcClient := plcbundle.NewPLCClient("https://plc.directory") 180mgr, err := plcbundle.NewManager(config, plcClient) 181``` 182 183### Bundles 184 185A bundle contains exactly 10,000 operations: 186 187```go 188type Bundle struct { 189 BundleNumber int // Sequential number (1, 2, 3...) 190 StartTime time.Time // First operation timestamp 191 EndTime time.Time // Last operation timestamp 192 Operations []plc.PLCOperation // The 10,000 operations 193 DIDCount int // Unique DIDs in bundle 194 Hash string // Chain hash (includes history) 195 ContentHash string // This bundle's content hash 196 Parent string // Previous bundle's chain hash 197 CompressedSize int64 // File size on disk 198 UncompressedSize int64 // Original JSONL size 199} 200``` 201 202### The Index 203 204The index tracks all bundles and their metadata: 205 206```go 207index := mgr.GetIndex() 208 209// Get all bundles 210bundles := index.GetBundles() 211for _, meta := range bundles { 212 log.Printf("Bundle %d: %s to %s", 213 meta.BundleNumber, 214 meta.StartTime.Format("2006-01-02"), 215 meta.EndTime.Format("2006-01-02")) 216} 217 218// Get specific bundle metadata 219meta, err := index.GetBundle(42) 220 221// Get last bundle 222lastBundle := index.GetLastBundle() 223``` 224 225### Operations 226 227Each operation represents a DID PLC directory event: 228 229```go 230type PLCOperation struct { 231 DID string // The DID (did:plc:...) 232 Operation json.RawMessage // Raw JSON bytes (use GetOperationMap() to parse) 233 CID string // Content identifier 234 Nullified interface{} // nil, false, or CID string 235 CreatedAt time.Time // When it was created 236 237 // Internal fields (populated automatically) 238 RawJSON []byte // Original JSON line 239 ParsedOperation map[string]interface{} // Cached parsed data 240} 241 242// Accessing operation data: 243operation, err := op.GetOperationMap() // Parses Operation field (cached) 244if err != nil || operation == nil { 245 return 246} 247 248// Now you can access fields 249services := operation["services"].(map[string]interface{}) 250 251// Check if operation was nullified 252if op.IsNullified() { 253 log.Printf("Operation %s was nullified by %s", op.CID, op.GetNullifyingCID()) 254} 255``` 256 257### Accessing Operation Data 258 259The `Operation` field uses lazy parsing for performance. Always parse it before accessing: 260 261```go 262// ❌ Wrong - won't compile 263services := op.Operation["services"] 264 265// ✅ Correct 266operation, err := op.GetOperationMap() 267if err != nil || operation == nil { 268 return 269} 270services, ok := operation["services"].(map[string]interface{}) 271``` 272 273The parsed data is cached, so repeated calls are fast: 274// First call: parses JSON 275data1, _ := op.GetOperationMap() 276 277// Second call: returns cached data (fast) 278data2, _ := op.GetOperationMap() 279 280--- 281 282## Common Patterns 283 284### Pattern 1: Transparent Sync Service 285 286**Goal:** Keep a local PLC mirror continuously synchronized. 287 288This is the most common use case - maintaining an up-to-date copy of the PLC directory. 289 290```go 291package main 292 293import ( 294 "context" 295 "log" 296 "os" 297 "os/signal" 298 "syscall" 299 "time" 300 301 plcbundle "tangled.org/atscan.net/plcbundle" 302) 303 304type SyncService struct { 305 mgr *plcbundle.Manager 306 interval time.Duration 307 stop chan struct{} 308} 309 310func NewSyncService(bundleDir string, interval time.Duration) (*SyncService, error) { 311 mgr, err := plcbundle.New(bundleDir, "https://plc.directory") 312 if err != nil { 313 return nil, err 314 } 315 316 return &SyncService{ 317 mgr: mgr, 318 interval: interval, 319 stop: make(chan struct{}), 320 }, nil 321} 322 323func (s *SyncService) Start() { 324 log.Println("Starting sync service...") 325 326 // Initial sync 327 s.sync() 328 329 // Periodic sync 330 ticker := time.NewTicker(s.interval) 331 defer ticker.Stop() 332 333 for { 334 select { 335 case <-ticker.C: 336 s.sync() 337 case <-s.stop: 338 log.Println("Sync service stopped") 339 return 340 } 341 } 342} 343 344func (s *SyncService) sync() { 345 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) 346 defer cancel() 347 348 log.Println("Checking for new bundles...") 349 350 fetched := 0 351 for { 352 bundle, err := s.mgr.FetchNext(ctx) 353 if err != nil { 354 if isInsufficientOps(err) { 355 if fetched > 0 { 356 log.Printf("✓ Synced %d new bundles", fetched) 357 } else { 358 log.Println("✓ Up to date") 359 } 360 return 361 } 362 log.Printf("Error: %v", err) 363 return 364 } 365 366 fetched++ 367 log.Printf("✓ Fetched bundle %d (%d ops, %d DIDs)", 368 bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount) 369 } 370} 371 372func (s *SyncService) Stop() { 373 close(s.stop) 374 s.mgr.Close() 375} 376 377func isInsufficientOps(err error) bool { 378 return err != nil && 379 (strings.Contains(err.Error(), "insufficient operations") || 380 strings.Contains(err.Error(), "no more available")) 381} 382 383func main() { 384 service, err := NewSyncService("./plc_data", 5*time.Minute) 385 if err != nil { 386 log.Fatal(err) 387 } 388 389 // Start service in background 390 go service.Start() 391 392 // Wait for interrupt 393 sigChan := make(chan os.Signal, 1) 394 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 395 <-sigChan 396 397 log.Println("Shutting down...") 398 service.Stop() 399} 400``` 401 402**Usage:** 403```bash 404go run main.go 405# Starting sync service... 406# Checking for new bundles... 407# ✓ Fetched bundle 8548 (10000 ops, 8234 DIDs) 408# ✓ Fetched bundle 8549 (10000 ops, 8156 DIDs) 409# ✓ Up to date 410# ... (repeats every 5 minutes) 411``` 412 413### Pattern 2: Reading and Processing Operations 414 415**Goal:** Process all historical operations for analysis. 416 417```go 418package main 419 420import ( 421 "context" 422 "log" 423 424 plcbundle "tangled.org/atscan.net/plcbundle" 425) 426 427type OperationProcessor struct { 428 mgr *plcbundle.Manager 429} 430 431func NewOperationProcessor(bundleDir string) (*OperationProcessor, error) { 432 mgr, err := plcbundle.New(bundleDir, "") 433 if err != nil { 434 return nil, err 435 } 436 437 return &OperationProcessor{mgr: mgr}, nil 438} 439 440func (p *OperationProcessor) ProcessAll() error { 441 ctx := context.Background() 442 443 index := p.mgr.GetIndex() 444 bundles := index.GetBundles() 445 446 log.Printf("Processing %d bundles...", len(bundles)) 447 448 totalOps := 0 449 uniqueDIDs := make(map[string]bool) 450 451 for _, meta := range bundles { 452 // Load bundle 453 bundle, err := p.mgr.Load(ctx, meta.BundleNumber) 454 if err != nil { 455 return err 456 } 457 458 // Process operations 459 for _, op := range bundle.Operations { 460 totalOps++ 461 uniqueDIDs[op.DID] = true 462 463 // Your processing logic here 464 p.processOperation(op) 465 } 466 467 if meta.BundleNumber % 100 == 0 { 468 log.Printf("Processed bundle %d...", meta.BundleNumber) 469 } 470 } 471 472 log.Printf("✓ Processed %d operations from %d unique DIDs", 473 totalOps, len(uniqueDIDs)) 474 475 return nil 476} 477 478func (p *OperationProcessor) processOperation(op plcbundle.PLCOperation) { 479 // Parse Operation field on-demand 480 operation, err := op.GetOperationMap() 481 if err != nil || operation == nil { 482 return 483 } 484 485 // Example: Extract PDS endpoints 486 if services, ok := operation["services"].(map[string]interface{}); ok { 487 if pds, ok := services["atproto_pds"].(map[string]interface{}); ok { 488 if endpoint, ok := pds["endpoint"].(string); ok { 489 log.Printf("DID %s uses PDS: %s", op.DID, endpoint) 490 } 491 } 492 } 493} 494 495 496func main() { 497 processor, err := NewOperationProcessor("./plc_data") 498 if err != nil { 499 log.Fatal(err) 500 } 501 502 if err := processor.ProcessAll(); err != nil { 503 log.Fatal(err) 504 } 505} 506``` 507 508### Pattern 3: Time-Based Queries 509 510**Goal:** Export operations from a specific time period. 511 512```go 513package main 514 515import ( 516 "context" 517 "encoding/json" 518 "log" 519 "os" 520 "time" 521 522 plcbundle "tangled.org/atscan.net/plcbundle" 523) 524 525func exportOperationsSince(bundleDir string, since time.Time, limit int) error { 526 mgr, err := plcbundle.New(bundleDir, "") 527 if err != nil { 528 return err 529 } 530 defer mgr.Close() 531 532 ctx := context.Background() 533 534 // Export operations after timestamp 535 ops, err := mgr.Export(ctx, since, limit) 536 if err != nil { 537 return err 538 } 539 540 log.Printf("Exporting %d operations...", len(ops)) 541 542 // Write as JSONL to stdout 543 encoder := json.NewEncoder(os.Stdout) 544 for _, op := range ops { 545 if err := encoder.Encode(op); err != nil { 546 return err 547 } 548 } 549 550 return nil 551} 552 553func main() { 554 // Export operations from the last 7 days 555 since := time.Now().AddDate(0, 0, -7) 556 557 if err := exportOperationsSince("./plc_data", since, 50000); err != nil { 558 log.Fatal(err) 559 } 560} 561``` 562 563**Output to file:** 564```bash 565go run main.go > last_7_days.jsonl 566``` 567 568### Pattern 4: Verification Service 569 570**Goal:** Periodically verify bundle integrity. 571 572```go 573package main 574 575import ( 576 "context" 577 "log" 578 "time" 579 580 plcbundle "tangled.org/atscan.net/plcbundle" 581) 582 583type VerificationService struct { 584 mgr *plcbundle.Manager 585 interval time.Duration 586} 587 588func NewVerificationService(bundleDir string, interval time.Duration) (*VerificationService, error) { 589 mgr, err := plcbundle.New(bundleDir, "") 590 if err != nil { 591 return nil, err 592 } 593 594 return &VerificationService{ 595 mgr: mgr, 596 interval: interval, 597 }, nil 598} 599 600func (v *VerificationService) Start() { 601 ticker := time.NewTicker(v.interval) 602 defer ticker.Stop() 603 604 // Verify immediately on start 605 v.verify() 606 607 for range ticker.C { 608 v.verify() 609 } 610} 611 612func (v *VerificationService) verify() { 613 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) 614 defer cancel() 615 616 log.Println("Starting chain verification...") 617 start := time.Now() 618 619 result, err := v.mgr.VerifyChain(ctx) 620 if err != nil { 621 log.Printf("❌ Verification error: %v", err) 622 return 623 } 624 625 elapsed := time.Since(start) 626 627 if result.Valid { 628 log.Printf("✅ Chain verified: %d bundles, took %s", 629 result.ChainLength, elapsed.Round(time.Second)) 630 631 // Get head hash 632 index := v.mgr.GetIndex() 633 if last := index.GetLastBundle(); last != nil { 634 log.Printf(" Head hash: %s...", last.Hash[:16]) 635 } 636 } else { 637 log.Printf("❌ Chain broken at bundle %d: %s", 638 result.BrokenAt, result.Error) 639 640 // Alert or take action 641 v.handleBrokenChain(result) 642 } 643} 644 645func (v *VerificationService) handleBrokenChain(result *plcbundle.ChainVerificationResult) { 646 // Send alert, trigger re-sync, etc. 647 log.Printf("⚠️ ALERT: Chain integrity compromised!") 648 // TODO: Implement your alerting logic 649} 650 651func main() { 652 service, err := NewVerificationService("./plc_data", 24*time.Hour) 653 if err != nil { 654 log.Fatal(err) 655 } 656 657 log.Println("Verification service started (daily checks)") 658 service.Start() 659} 660``` 661 662### Pattern 5: Custom HTTP API 663 664**Goal:** Build a custom API on top of your bundle archive. 665 666```go 667package main 668 669import ( 670 "encoding/json" 671 "log" 672 "net/http" 673 "strconv" 674 675 plcbundle "tangled.org/atscan.net/plcbundle" 676) 677 678type API struct { 679 mgr *plcbundle.Manager 680} 681 682func NewAPI(bundleDir string) (*API, error) { 683 mgr, err := plcbundle.New(bundleDir, "") 684 if err != nil { 685 return nil, err 686 } 687 688 return &API{mgr: mgr}, nil 689} 690 691func (api *API) handleStats(w http.ResponseWriter, r *http.Request) { 692 index := api.mgr.GetIndex() 693 stats := index.GetStats() 694 695 response := map[string]interface{}{ 696 "bundles": stats["bundle_count"], 697 "first": stats["first_bundle"], 698 "last": stats["last_bundle"], 699 "total_size": stats["total_size"], 700 "start_time": stats["start_time"], 701 "end_time": stats["end_time"], 702 "updated_at": stats["updated_at"], 703 } 704 705 w.Header().Set("Content-Type", "application/json") 706 json.NewEncoder(w).Encode(response) 707} 708 709func (api *API) handleOperations(w http.ResponseWriter, r *http.Request) { 710 bundleNumStr := r.URL.Query().Get("bundle") 711 if bundleNumStr == "" { 712 http.Error(w, "bundle parameter required", http.StatusBadRequest) 713 return 714 } 715 716 bundleNum, err := strconv.Atoi(bundleNumStr) 717 if err != nil { 718 http.Error(w, "invalid bundle number", http.StatusBadRequest) 719 return 720 } 721 722 ctx := r.Context() 723 bundle, err := api.mgr.Load(ctx, bundleNum) 724 if err != nil { 725 http.Error(w, err.Error(), http.StatusNotFound) 726 return 727 } 728 729 w.Header().Set("Content-Type", "application/x-ndjson") 730 encoder := json.NewEncoder(w) 731 for _, op := range bundle.Operations { 732 encoder.Encode(op) 733 } 734} 735 736func (api *API) handleDID(w http.ResponseWriter, r *http.Request) { 737 did := r.URL.Query().Get("did") 738 if did == "" { 739 http.Error(w, "did parameter required", http.StatusBadRequest) 740 return 741 } 742 743 ctx := r.Context() 744 745 // Search through bundles for this DID 746 var operations []plcbundle.PLCOperation 747 748 index := api.mgr.GetIndex() 749 bundles := index.GetBundles() 750 751 for _, meta := range bundles { 752 bundle, err := api.mgr.Load(ctx, meta.BundleNumber) 753 if err != nil { 754 continue 755 } 756 757 for _, op := range bundle.Operations { 758 if op.DID == did { 759 operations = append(operations, op) 760 } 761 } 762 } 763 764 w.Header().Set("Content-Type", "application/json") 765 json.NewEncoder(w).Encode(map[string]interface{}{ 766 "did": did, 767 "operations": operations, 768 "count": len(operations), 769 }) 770} 771 772func main() { 773 api, err := NewAPI("./plc_data") 774 if err != nil { 775 log.Fatal(err) 776 } 777 778 http.HandleFunc("/stats", api.handleStats) 779 http.HandleFunc("/operations", api.handleOperations) 780 http.HandleFunc("/did", api.handleDID) 781 782 log.Println("API listening on :8080") 783 log.Fatal(http.ListenAndServe(":8080", nil)) 784} 785``` 786 787**Usage:** 788```bash 789# Get stats 790curl http://localhost:8080/stats 791 792# Get operations from bundle 1 793curl http://localhost:8080/operations?bundle=1 794 795# Get all operations for a DID 796curl http://localhost:8080/did?did=did:plc:example123 797``` 798 799--- 800 801## Building Applications 802 803### Application 1: PDS Discovery Tool 804 805Find all PDS endpoints in the network: 806 807```go 808package main 809 810import ( 811 "context" 812 "fmt" 813 "log" 814 815 plcbundle "tangled.org/atscan.net/plcbundle" 816) 817 818type PDSTracker struct { 819 mgr *plcbundle.Manager 820 endpoints map[string]int // endpoint -> count 821} 822 823func NewPDSTracker(bundleDir string) (*PDSTracker, error) { 824 mgr, err := plcbundle.New(bundleDir, "") 825 if err != nil { 826 return nil, err 827 } 828 829 return &PDSTracker{ 830 mgr: mgr, 831 endpoints: make(map[string]int), 832 }, nil 833} 834 835func (pt *PDSTracker) Scan() error { 836 ctx := context.Background() 837 838 index := pt.mgr.GetIndex() 839 bundles := index.GetBundles() 840 841 log.Printf("Scanning %d bundles for PDS endpoints...", len(bundles)) 842 843 for _, meta := range bundles { 844 bundle, err := pt.mgr.Load(ctx, meta.BundleNumber) 845 if err != nil { 846 return err 847 } 848 849 for _, op := range bundle.Operations { 850 if endpoint := pt.extractPDS(op); endpoint != "" { 851 pt.endpoints[endpoint]++ 852 } 853 } 854 } 855 856 return nil 857} 858 859func (pt *PDSTracker) extractPDS(op plcbundle.PLCOperation) string { 860 // Parse Operation field on-demand 861 operation, err := op.GetOperationMap() 862 if err != nil || operation == nil { 863 return "" 864 } 865 866 services, ok := operation["services"].(map[string]interface{}) 867 if !ok { 868 return "" 869 } 870 871 pds, ok := services["atproto_pds"].(map[string]interface{}) 872 if !ok { 873 return "" 874 } 875 876 endpoint, ok := pds["endpoint"].(string) 877 if !ok { 878 return "" 879 } 880 881 return endpoint 882} 883 884 885func (pt *PDSTracker) PrintResults() { 886 log.Printf("\nFound %d unique PDS endpoints:\n", len(pt.endpoints)) 887 888 // Sort by count 889 type endpointCount struct { 890 endpoint string 891 count int 892 } 893 894 var sorted []endpointCount 895 for endpoint, count := range pt.endpoints { 896 sorted = append(sorted, endpointCount{endpoint, count}) 897 } 898 899 sort.Slice(sorted, func(i, j int) bool { 900 return sorted[i].count > sorted[j].count 901 }) 902 903 // Print top 20 904 for i, ec := range sorted { 905 if i >= 20 { 906 break 907 } 908 fmt.Printf("%3d. %s (%d DIDs)\n", i+1, ec.endpoint, ec.count) 909 } 910} 911 912func main() { 913 tracker, err := NewPDSTracker("./plc_data") 914 if err != nil { 915 log.Fatal(err) 916 } 917 918 if err := tracker.Scan(); err != nil { 919 log.Fatal(err) 920 } 921 922 tracker.PrintResults() 923} 924``` 925 926### Application 2: DID History Viewer 927 928View the complete history of a DID: 929 930```go 931package main 932 933import ( 934 "context" 935 "encoding/json" 936 "fmt" 937 "log" 938 "os" 939 940 plcbundle "tangled.org/atscan.net/plcbundle" 941) 942 943type DIDHistory struct { 944 DID string `json:"did"` 945 Operations []plcbundle.PLCOperation `json:"operations"` 946 FirstSeen time.Time `json:"first_seen"` 947 LastSeen time.Time `json:"last_seen"` 948 OpCount int `json:"operation_count"` 949} 950 951func getDIDHistory(bundleDir, did string) (*DIDHistory, error) { 952 mgr, err := plcbundle.New(bundleDir, "") 953 if err != nil { 954 return nil, err 955 } 956 defer mgr.Close() 957 958 ctx := context.Background() 959 960 history := &DIDHistory{ 961 DID: did, 962 Operations: make([]plcbundle.PLCOperation, 0), 963 } 964 965 index := mgr.GetIndex() 966 bundles := index.GetBundles() 967 968 log.Printf("Searching for DID %s...", did) 969 970 for _, meta := range bundles { 971 bundle, err := mgr.Load(ctx, meta.BundleNumber) 972 if err != nil { 973 continue 974 } 975 976 for _, op := range bundle.Operations { 977 if op.DID == did { 978 history.Operations = append(history.Operations, op) 979 } 980 } 981 } 982 983 if len(history.Operations) == 0 { 984 return nil, fmt.Errorf("DID not found") 985 } 986 987 // Set timestamps 988 history.FirstSeen = history.Operations[0].CreatedAt 989 history.LastSeen = history.Operations[len(history.Operations)-1].CreatedAt 990 history.OpCount = len(history.Operations) 991 992 return history, nil 993} 994 995func main() { 996 if len(os.Args) < 2 { 997 log.Fatal("Usage: did-history <did>") 998 } 999 1000 did := os.Args[1] 1001 1002 history, err := getDIDHistory("./plc_data", did) 1003 if err != nil { 1004 log.Fatal(err) 1005 } 1006 1007 // Print as JSON 1008 encoder := json.NewEncoder(os.Stdout) 1009 encoder.SetIndent("", " ") 1010 encoder.Encode(history) 1011} 1012``` 1013 1014### Application 3: Real-time Monitor 1015 1016Monitor new operations as they arrive: 1017 1018```go 1019package main 1020 1021import ( 1022 "context" 1023 "log" 1024 "time" 1025 1026 plcbundle "tangled.org/atscan.net/plcbundle" 1027) 1028 1029type Monitor struct { 1030 mgr *plcbundle.Manager 1031 lastSeen int // Last bundle number processed 1032 pollInterval time.Duration 1033} 1034 1035func NewMonitor(bundleDir string, pollInterval time.Duration) (*Monitor, error) { 1036 mgr, err := plcbundle.New(bundleDir, "https://plc.directory") 1037 if err != nil { 1038 return nil, err 1039 } 1040 1041 // Get current position 1042 index := mgr.GetIndex() 1043 lastBundle := index.GetLastBundle() 1044 lastSeen := 0 1045 if lastBundle != nil { 1046 lastSeen = lastBundle.BundleNumber 1047 } 1048 1049 return &Monitor{ 1050 mgr: mgr, 1051 lastSeen: lastSeen, 1052 pollInterval: pollInterval, 1053 }, nil 1054} 1055 1056func (m *Monitor) Start() { 1057 log.Println("Monitor started, watching for new bundles...") 1058 1059 ticker := time.NewTicker(m.pollInterval) 1060 defer ticker.Stop() 1061 1062 for range ticker.C { 1063 m.check() 1064 } 1065} 1066 1067func (m *Monitor) check() { 1068 ctx := context.Background() 1069 1070 // Try to fetch next bundle 1071 bundle, err := m.mgr.FetchNext(ctx) 1072 if err != nil { 1073 // Not an error if no new bundle available 1074 return 1075 } 1076 1077 // New bundle! 1078 log.Printf("🔔 New bundle: %d", bundle.BundleNumber) 1079 log.Printf(" Operations: %d", len(bundle.Operations)) 1080 log.Printf(" DIDs: %d", bundle.DIDCount) 1081 log.Printf(" Time: %s", bundle.EndTime.Format("2006-01-02 15:04:05")) 1082 1083 // Process new operations 1084 m.processNewOperations(bundle) 1085 1086 m.lastSeen = bundle.BundleNumber 1087} 1088 1089func (m *Monitor) processNewOperations(bundle *plcbundle.Bundle) { 1090 for _, op := range bundle.Operations { 1091 // Check for interesting operations 1092 if op.IsNullified() { 1093 log.Printf(" ⚠️ Nullified: %s", op.DID) 1094 } 1095 1096 // Check for new DIDs (operation type "create") 1097 operation, err := op.GetOperationMap() 1098 if err == nil && operation != nil { 1099 if opType, ok := operation["type"].(string); ok && opType == "create" { 1100 log.Printf(" ➕ New DID: %s", op.DID) 1101 } 1102 } 1103 } 1104} 1105 1106func main() { 1107 monitor, err := NewMonitor("./plc_data", 30*time.Second) 1108 if err != nil { 1109 log.Fatal(err) 1110 } 1111 1112 monitor.Start() 1113} 1114``` 1115 1116--- 1117 1118## Advanced Usage 1119 1120### Custom Configuration 1121 1122Full control over bundle manager behavior: 1123 1124```go 1125package main 1126 1127import ( 1128 "log" 1129 "runtime" 1130 "time" 1131 1132 "tangled.org/atscan.net/plcbundle/bundle" 1133 "tangled.org/atscan.net/plcbundle/plc" 1134 plcbundle "tangled.org/atscan.net/plcbundle" 1135) 1136 1137func main() { 1138 // Custom configuration 1139 config := &bundle.Config{ 1140 BundleDir: "./my_bundles", 1141 VerifyOnLoad: true, // Verify hashes when loading 1142 AutoRebuild: true, // Auto-rebuild index if needed 1143 RebuildWorkers: runtime.NumCPU(), // Parallel workers for rebuild 1144 Logger: &MyCustomLogger{}, // Custom logger 1145 1146 // Progress callback for rebuild 1147 RebuildProgress: func(current, total int) { 1148 if current%100 == 0 { 1149 log.Printf("Rebuild: %d/%d (%.1f%%)", 1150 current, total, float64(current)/float64(total)*100) 1151 } 1152 }, 1153 } 1154 1155 // Custom PLC client with rate limiting 1156 plcClient := plc.NewClient("https://plc.directory", 1157 plc.WithRateLimit(60, time.Minute), // 60 req/min 1158 plc.WithTimeout(30*time.Second), // 30s timeout 1159 plc.WithLogger(&MyCustomLogger{}), // Custom logger 1160 ) 1161 1162 // Create manager 1163 mgr, err := bundle.NewManager(config, plcClient) 1164 if err != nil { 1165 log.Fatal(err) 1166 } 1167 defer mgr.Close() 1168 1169 log.Println("Manager created with custom configuration") 1170} 1171 1172// Custom logger implementation 1173type MyCustomLogger struct{} 1174 1175func (l *MyCustomLogger) Printf(format string, v ...interface{}) { 1176 // Add custom formatting, filtering, etc. 1177 log.Printf("[PLCBUNDLE] "+format, v...) 1178} 1179 1180func (l *MyCustomLogger) Println(v ...interface{}) { 1181 log.Println(append([]interface{}{"[PLCBUNDLE]"}, v...)...) 1182} 1183``` 1184 1185### Streaming Data 1186 1187Stream bundle data without loading everything into memory: 1188 1189```go 1190package main 1191 1192import ( 1193 "bufio" 1194 "context" 1195 "encoding/json" 1196 "io" 1197 "log" 1198 1199 plcbundle "tangled.org/atscan.net/plcbundle" 1200) 1201 1202func streamBundle(mgr *plcbundle.Manager, bundleNumber int) error { 1203 ctx := context.Background() 1204 1205 // Get decompressed stream 1206 reader, err := mgr.StreamDecompressed(ctx, bundleNumber) 1207 if err != nil { 1208 return err 1209 } 1210 defer reader.Close() 1211 1212 // Read line by line (JSONL) 1213 scanner := bufio.NewScanner(reader) 1214 1215 // Set buffer size for large lines 1216 buf := make([]byte, 0, 64*1024) 1217 scanner.Buffer(buf, 1024*1024) 1218 1219 lineNum := 0 1220 for scanner.Scan() { 1221 lineNum++ 1222 1223 var op plcbundle.PLCOperation 1224 if err := json.Unmarshal(scanner.Bytes(), &op); err != nil { 1225 log.Printf("Warning: failed to parse line %d: %v", lineNum, err) 1226 continue 1227 } 1228 1229 // Process operation without storing all in memory 1230 processOperation(op) 1231 } 1232 1233 return scanner.Err() 1234} 1235 1236func processOperation(op plcbundle.PLCOperation) { 1237 // Your processing logic 1238 log.Printf("Processing: %s", op.DID) 1239} 1240 1241func main() { 1242 mgr, err := plcbundle.New("./plc_data", "") 1243 if err != nil { 1244 log.Fatal(err) 1245 } 1246 defer mgr.Close() 1247 1248 // Stream bundle 1 1249 if err := streamBundle(mgr, 1); err != nil { 1250 log.Fatal(err) 1251 } 1252} 1253``` 1254 1255### Parallel Processing 1256 1257Process multiple bundles concurrently: 1258 1259```go 1260package main 1261 1262import ( 1263 "context" 1264 "log" 1265 "sync" 1266 1267 plcbundle "tangled.org/atscan.net/plcbundle" 1268) 1269 1270func processParallel(mgr *plcbundle.Manager, workers int) error { 1271 ctx := context.Background() 1272 1273 index := mgr.GetIndex() 1274 bundles := index.GetBundles() 1275 1276 // Create job channel 1277 jobs := make(chan int, len(bundles)) 1278 results := make(chan error, len(bundles)) 1279 1280 // Start workers 1281 var wg sync.WaitGroup 1282 for w := 0; w < workers; w++ { 1283 wg.Add(1) 1284 go func() { 1285 defer wg.Done() 1286 for bundleNum := range jobs { 1287 if err := processBundle(ctx, mgr, bundleNum); err != nil { 1288 results <- err 1289 } else { 1290 results <- nil 1291 } 1292 } 1293 }() 1294 } 1295 1296 // Send jobs 1297 for _, meta := range bundles { 1298 jobs <- meta.BundleNumber 1299 } 1300 close(jobs) 1301 1302 // Wait for completion 1303 go func() { 1304 wg.Wait() 1305 close(results) 1306 }() 1307 1308 // Collect results 1309 errors := 0 1310 for err := range results { 1311 if err != nil { 1312 log.Printf("Error: %v", err) 1313 errors++ 1314 } 1315 } 1316 1317 if errors > 0 { 1318 return fmt.Errorf("%d bundles failed processing", errors) 1319 } 1320 1321 return nil 1322} 1323 1324func processBundle(ctx context.Context, mgr *plcbundle.Manager, bundleNum int) error { 1325 bundle, err := mgr.Load(ctx, bundleNum) 1326 if err != nil { 1327 return err 1328 } 1329 1330 // Process operations 1331 for _, op := range bundle.Operations { 1332 // Your logic here 1333 _ = op 1334 } 1335 1336 log.Printf("Processed bundle %d", bundleNum) 1337 return nil 1338} 1339 1340func main() { 1341 mgr, err := plcbundle.New("./plc_data", "") 1342 if err != nil { 1343 log.Fatal(err) 1344 } 1345 defer mgr.Close() 1346 1347 // Process with 8 workers 1348 if err := processParallel(mgr, 8); err != nil { 1349 log.Fatal(err) 1350 } 1351} 1352``` 1353 1354### Working with Mempool 1355 1356Access operations before they're bundled: 1357 1358```go 1359package main 1360 1361import ( 1362 "log" 1363 1364 plcbundle "tangled.org/atscan.net/plcbundle" 1365) 1366 1367func main() { 1368 mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 1369 if err != nil { 1370 log.Fatal(err) 1371 } 1372 defer mgr.Close() 1373 1374 // Get mempool stats 1375 stats := mgr.GetMempoolStats() 1376 1377 count := stats["count"].(int) 1378 targetBundle := stats["target_bundle"].(int) 1379 canCreate := stats["can_create_bundle"].(bool) 1380 1381 log.Printf("Mempool status:") 1382 log.Printf(" Target bundle: %d", targetBundle) 1383 log.Printf(" Operations: %d/%d", count, plcbundle.BUNDLE_SIZE) 1384 log.Printf(" Ready: %v", canCreate) 1385 1386 if count > 0 { 1387 // Get mempool operations 1388 ops, err := mgr.GetMempoolOperations() 1389 if err != nil { 1390 log.Fatal(err) 1391 } 1392 1393 log.Printf("Latest unbundled operations:") 1394 for i, op := range ops { 1395 if i >= 5 { 1396 break 1397 } 1398 log.Printf(" %d. %s (%s)", i+1, op.DID, op.CreatedAt.Format("15:04:05")) 1399 } 1400 } 1401 1402 // Validate chronological order 1403 if err := mgr.ValidateMempool(); err != nil { 1404 log.Printf("⚠️ Mempool validation failed: %v", err) 1405 } else { 1406 log.Println("✓ Mempool validated") 1407 } 1408} 1409``` 1410 1411--- 1412 1413## Best Practices 1414 1415### 1. Always Close the Manager 1416 1417Use `defer` to ensure cleanup: 1418 1419```go 1420mgr, err := plcbundle.New("./plc_data", "https://plc.directory") 1421if err != nil { 1422 return err 1423} 1424defer mgr.Close() // Always close! 1425``` 1426 1427### 2. Handle Context Cancellation 1428 1429Support graceful shutdown: 1430 1431```go 1432ctx, cancel := context.WithCancel(context.Background()) 1433defer cancel() 1434 1435// Listen for interrupt 1436sigChan := make(chan os.Signal, 1) 1437signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) 1438 1439go func() { 1440 <-sigChan 1441 log.Println("Interrupt received, stopping...") 1442 cancel() 1443}() 1444 1445// Use context in operations 1446bundle, err := mgr.FetchNext(ctx) 1447if err == context.Canceled { 1448 log.Println("Operation cancelled gracefully") 1449 return nil 1450} 1451``` 1452 1453### 3. Check Errors Properly 1454 1455Distinguish between different error types: 1456 1457```go 1458bundle, err := mgr.FetchNext(ctx) 1459if err != nil { 1460 // Check if it's just "caught up" 1461 if strings.Contains(err.Error(), "insufficient operations") { 1462 log.Println("No new bundles available (caught up)") 1463 return nil 1464 } 1465 1466 // Real error 1467 return fmt.Errorf("fetch failed: %w", err) 1468} 1469``` 1470 1471### 4. Use Streaming for Large Datasets 1472 1473Don't load everything into memory: 1474 1475```go 1476// ❌ Bad: Loads all operations into memory 1477index := mgr.GetIndex() 1478var allOps []plcbundle.PLCOperation 1479for _, meta := range index.GetBundles() { 1480 bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1481 allOps = append(allOps, bundle.Operations...) 1482} 1483 1484// ✅ Good: Process one bundle at a time 1485for _, meta := range index.GetBundles() { 1486 bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1487 for _, op := range bundle.Operations { 1488 processOperation(op) 1489 } 1490} 1491``` 1492 1493### 5. Enable Verification in Production 1494 1495```go 1496config := plcbundle.DefaultConfig("./plc_data") 1497config.VerifyOnLoad = true // Verify hashes when loading 1498 1499mgr, err := plcbundle.NewManager(config, plcClient) 1500``` 1501 1502### 6. Log Appropriately 1503 1504Implement custom logger for production: 1505 1506```go 1507type ProductionLogger struct { 1508 logger *zap.Logger 1509} 1510 1511func (l *ProductionLogger) Printf(format string, v ...interface{}) { 1512 l.logger.Sugar().Infof(format, v...) 1513} 1514 1515func (l *ProductionLogger) Println(v ...interface{}) { 1516 l.logger.Sugar().Info(v...) 1517} 1518``` 1519 1520### 7. Handle Rate Limits 1521 1522Configure PLC client appropriately: 1523 1524```go 1525// Production: Be conservative 1526plcClient := plc.NewClient("https://plc.directory", 1527 plc.WithRateLimit(60, time.Minute), // 60 req/min max 1528 plc.WithTimeout(60*time.Second), 1529) 1530 1531// Development: Can be more aggressive (but respectful) 1532plcClient := plc.NewClient("https://plc.directory", 1533 plc.WithRateLimit(90, time.Minute), 1534 plc.WithTimeout(30*time.Second), 1535) 1536``` 1537 1538--- 1539 1540## API Reference 1541 1542### Manager Methods 1543 1544```go 1545// Creation 1546New(bundleDir, plcURL string) (*Manager, error) 1547NewManager(config *Config, plcClient *PLCClient) (*Manager, error) 1548 1549// Lifecycle 1550Close() 1551 1552// Fetching 1553FetchNext(ctx) (*Bundle, error) 1554 1555// Loading 1556Load(ctx, bundleNumber int) (*Bundle, error) 1557 1558// Verification 1559Verify(ctx, bundleNumber int) (*VerificationResult, error) 1560VerifyChain(ctx) (*ChainVerificationResult, error) 1561 1562// Exporting 1563Export(ctx, afterTime time.Time, count int) ([]PLCOperation, error) 1564 1565// Streaming 1566StreamRaw(ctx, bundleNumber int) (io.ReadCloser, error) 1567StreamDecompressed(ctx, bundleNumber int) (io.ReadCloser, error) 1568 1569// Index 1570GetIndex() *Index 1571ScanBundle(path string, bundleNumber int) (*BundleMetadata, error) 1572Scan() (*DirectoryScanResult, error) 1573 1574// Mempool 1575GetMempoolStats() map[string]interface{} 1576GetMempoolOperations() ([]PLCOperation, error) 1577ValidateMempool() error 1578ClearMempool() error 1579 1580// Info 1581GetInfo() map[string]interface{} 1582IsBundleIndexed(bundleNumber int) bool 1583``` 1584 1585### Index Methods 1586 1587```go 1588// Creation 1589NewIndex() *Index 1590LoadIndex(path string) (*Index, error) 1591 1592// Persistence 1593Save(path string) error 1594 1595// Queries 1596GetBundle(bundleNumber int) (*BundleMetadata, error) 1597GetLastBundle() *BundleMetadata 1598GetBundles() []*BundleMetadata 1599GetBundleRange(start, end int) []*BundleMetadata 1600 1601// Stats 1602Count() int 1603FindGaps() []int 1604GetStats() map[string]interface{} 1605``` 1606 1607### Configuration Types 1608 1609```go 1610type Config struct { 1611 BundleDir string 1612 VerifyOnLoad bool 1613 AutoRebuild bool 1614 RebuildWorkers int 1615 RebuildProgress func(current, total int) 1616 Logger Logger 1617} 1618 1619type Logger interface { 1620 Printf(format string, v ...interface{}) 1621 Println(v ...interface{}) 1622} 1623``` 1624 1625--- 1626 1627## Troubleshooting 1628 1629### Bundle Not Found Error 1630 1631```go 1632bundle, err := mgr.Load(ctx, 999) 1633if err != nil { 1634 if strings.Contains(err.Error(), "not in index") { 1635 // Bundle doesn't exist 1636 log.Printf("Bundle 999 hasn't been fetched yet") 1637 } 1638} 1639``` 1640 1641### Insufficient Operations Error 1642 1643```go 1644bundle, err := mgr.FetchNext(ctx) 1645if err != nil { 1646 if strings.Contains(err.Error(), "insufficient operations") { 1647 // Not enough operations for a complete bundle 1648 // Check mempool 1649 stats := mgr.GetMempoolStats() 1650 count := stats["count"].(int) 1651 log.Printf("Only %d operations available (need %d)", count, plcbundle.BUNDLE_SIZE) 1652 } 1653} 1654``` 1655 1656### Memory Usage 1657 1658If processing large numbers of bundles: 1659 1660```go 1661// Force garbage collection between bundles 1662for _, meta := range index.GetBundles() { 1663 bundle, _ := mgr.Load(ctx, meta.BundleNumber) 1664 processBundle(bundle) 1665 1666 runtime.GC() // Help garbage collector 1667} 1668``` 1669 1670--- 1671 1672## Examples Repository 1673 1674Find complete, runnable examples at: 1675- https://github.com/plcbundle/examples 1676 1677Including: 1678- Complete sync service 1679- API server 1680- Analysis tools 1681- Monitoring services 1682