A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1# Library Guide
2
3A practical guide to using plcbundle as a Go library in your applications.
4
5## Table of Contents
6
7- [Getting Started](#getting-started)
8- [Core Concepts](#core-concepts)
9- [Common Patterns](#common-patterns)
10- [Building Applications](#building-applications)
11- [Advanced Usage](#advanced-usage)
12- [Best Practices](#best-practices)
13- [API Reference](#api-reference)
14
15---
16
17## Getting Started
18
19### Installation
20
21```bash
22go get tangled.org/atscan.net/plcbundle
23```
24
25### Your First Program
26
27Create a simple program to fetch and display bundle information:
28
29```go
30package main
31
32import (
33 "context"
34 "log"
35
36 plcbundle "tangled.org/atscan.net/plcbundle"
37)
38
39func main() {
40 // Create a manager
41 mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
42 if err != nil {
43 log.Fatal(err)
44 }
45 defer mgr.Close()
46
47 // Get repository info
48 info := mgr.GetInfo()
49 log.Printf("Bundle directory: %s", info["bundle_dir"])
50
51 // Get index stats
52 index := mgr.GetIndex()
53 stats := index.GetStats()
54 log.Printf("Total bundles: %d", stats["bundle_count"])
55}
56```
57
58Run it:
59```bash
60go run main.go
61# 2025/01/15 10:30:00 Bundle directory: ./plc_data
62# 2025/01/15 10:30:00 Total bundles: 0
63```
64
65### Fetching Your First Bundle
66
67Let's fetch a bundle from the PLC directory:
68
69```go
70package main
71
72import (
73 "context"
74 "log"
75
76 plcbundle "tangled.org/atscan.net/plcbundle"
77)
78
79func main() {
80 mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
81 if err != nil {
82 log.Fatal(err)
83 }
84 defer mgr.Close()
85
86 ctx := context.Background()
87
88 // Fetch next bundle
89 log.Println("Fetching bundle...")
90 bundle, err := mgr.FetchNext(ctx)
91 if err != nil {
92 log.Fatal(err)
93 }
94
95 log.Printf("✓ Fetched bundle %d", bundle.BundleNumber)
96 log.Printf(" Operations: %d", len(bundle.Operations))
97 log.Printf(" Unique DIDs: %d", bundle.DIDCount)
98 log.Printf(" Time range: %s to %s",
99 bundle.StartTime.Format("2006-01-02"),
100 bundle.EndTime.Format("2006-01-02"))
101}
102```
103
104**What's happening here?**
105
1061. `plcbundle.New()` creates a manager that handles all bundle operations
1072. `FetchNext()` automatically:
108 - Fetches operations from PLC directory
109 - Creates a bundle when 10,000 operations are collected
110 - Saves the bundle to disk
111 - Updates the index
112 - Returns the bundle object
113
114### Reading Bundles
115
116Once you have bundles, you can load and read them:
117
118```go
119package main
120
121import (
122 "context"
123 "log"
124
125 plcbundle "tangled.org/atscan.net/plcbundle"
126)
127
128func main() {
129 mgr, err := plcbundle.New("./plc_data", "")
130 if err != nil {
131 log.Fatal(err)
132 }
133 defer mgr.Close()
134
135 ctx := context.Background()
136
137 // Load bundle 1
138 bundle, err := mgr.Load(ctx, 1)
139 if err != nil {
140 log.Fatal(err)
141 }
142
143 log.Printf("Bundle %d loaded", bundle.BundleNumber)
144
145 // Iterate through operations
146 for i, op := range bundle.Operations {
147 if i >= 5 {
148 break // Just show first 5
149 }
150 log.Printf("%d. DID: %s, CID: %s", i+1, op.DID, op.CID)
151 }
152}
153```
154
155---
156
157## Core Concepts
158
159### The Manager
160
161The `Manager` is your main entry point. It handles:
162- Bundle storage and retrieval
163- Index management
164- PLC directory synchronization
165- Verification
166- Mempool management
167
168**Creating a manager:**
169
170```go
171// Simple creation
172mgr, err := plcbundle.New("./bundles", "https://plc.directory")
173
174// Custom configuration
175config := plcbundle.DefaultConfig("./bundles")
176config.VerifyOnLoad = true
177config.AutoRebuild = true
178
179plcClient := plcbundle.NewPLCClient("https://plc.directory")
180mgr, err := plcbundle.NewManager(config, plcClient)
181```
182
183### Bundles
184
185A bundle contains exactly 10,000 operations:
186
187```go
188type Bundle struct {
189 BundleNumber int // Sequential number (1, 2, 3...)
190 StartTime time.Time // First operation timestamp
191 EndTime time.Time // Last operation timestamp
192 Operations []plc.PLCOperation // The 10,000 operations
193 DIDCount int // Unique DIDs in bundle
194 Hash string // Chain hash (includes history)
195 ContentHash string // This bundle's content hash
196 Parent string // Previous bundle's chain hash
197 CompressedSize int64 // File size on disk
198 UncompressedSize int64 // Original JSONL size
199}
200```
201
202### The Index
203
204The index tracks all bundles and their metadata:
205
206```go
207index := mgr.GetIndex()
208
209// Get all bundles
210bundles := index.GetBundles()
211for _, meta := range bundles {
212 log.Printf("Bundle %d: %s to %s",
213 meta.BundleNumber,
214 meta.StartTime.Format("2006-01-02"),
215 meta.EndTime.Format("2006-01-02"))
216}
217
218// Get specific bundle metadata
219meta, err := index.GetBundle(42)
220
221// Get last bundle
222lastBundle := index.GetLastBundle()
223```
224
225### Operations
226
227Each operation represents a DID PLC directory event:
228
229```go
230type PLCOperation struct {
231 DID string // The DID (did:plc:...)
232 Operation json.RawMessage // Raw JSON bytes (use GetOperationMap() to parse)
233 CID string // Content identifier
234 Nullified interface{} // nil, false, or CID string
235 CreatedAt time.Time // When it was created
236
237 // Internal fields (populated automatically)
238 RawJSON []byte // Original JSON line
239 ParsedOperation map[string]interface{} // Cached parsed data
240}
241
242// Accessing operation data:
243operation, err := op.GetOperationMap() // Parses Operation field (cached)
244if err != nil || operation == nil {
245 return
246}
247
248// Now you can access fields
249services := operation["services"].(map[string]interface{})
250
251// Check if operation was nullified
252if op.IsNullified() {
253 log.Printf("Operation %s was nullified by %s", op.CID, op.GetNullifyingCID())
254}
255```
256
257### Accessing Operation Data
258
259The `Operation` field uses lazy parsing for performance. Always parse it before accessing:
260
261```go
262// ❌ Wrong - won't compile
263services := op.Operation["services"]
264
265// ✅ Correct
266operation, err := op.GetOperationMap()
267if err != nil || operation == nil {
268 return
269}
270services, ok := operation["services"].(map[string]interface{})
271```
272
273The parsed data is cached, so repeated calls are fast:
274// First call: parses JSON
275data1, _ := op.GetOperationMap()
276
277// Second call: returns cached data (fast)
278data2, _ := op.GetOperationMap()
279
280---
281
282## Common Patterns
283
284### Pattern 1: Transparent Sync Service
285
286**Goal:** Keep a local PLC mirror continuously synchronized.
287
288This is the most common use case - maintaining an up-to-date copy of the PLC directory.
289
290```go
291package main
292
293import (
294 "context"
295 "log"
296 "os"
297 "os/signal"
298 "syscall"
299 "time"
300
301 plcbundle "tangled.org/atscan.net/plcbundle"
302)
303
304type SyncService struct {
305 mgr *plcbundle.Manager
306 interval time.Duration
307 stop chan struct{}
308}
309
310func NewSyncService(bundleDir string, interval time.Duration) (*SyncService, error) {
311 mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
312 if err != nil {
313 return nil, err
314 }
315
316 return &SyncService{
317 mgr: mgr,
318 interval: interval,
319 stop: make(chan struct{}),
320 }, nil
321}
322
323func (s *SyncService) Start() {
324 log.Println("Starting sync service...")
325
326 // Initial sync
327 s.sync()
328
329 // Periodic sync
330 ticker := time.NewTicker(s.interval)
331 defer ticker.Stop()
332
333 for {
334 select {
335 case <-ticker.C:
336 s.sync()
337 case <-s.stop:
338 log.Println("Sync service stopped")
339 return
340 }
341 }
342}
343
344func (s *SyncService) sync() {
345 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
346 defer cancel()
347
348 log.Println("Checking for new bundles...")
349
350 fetched := 0
351 for {
352 bundle, err := s.mgr.FetchNext(ctx)
353 if err != nil {
354 if isInsufficientOps(err) {
355 if fetched > 0 {
356 log.Printf("✓ Synced %d new bundles", fetched)
357 } else {
358 log.Println("✓ Up to date")
359 }
360 return
361 }
362 log.Printf("Error: %v", err)
363 return
364 }
365
366 fetched++
367 log.Printf("✓ Fetched bundle %d (%d ops, %d DIDs)",
368 bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount)
369 }
370}
371
372func (s *SyncService) Stop() {
373 close(s.stop)
374 s.mgr.Close()
375}
376
377func isInsufficientOps(err error) bool {
378 return err != nil &&
379 (strings.Contains(err.Error(), "insufficient operations") ||
380 strings.Contains(err.Error(), "no more available"))
381}
382
383func main() {
384 service, err := NewSyncService("./plc_data", 5*time.Minute)
385 if err != nil {
386 log.Fatal(err)
387 }
388
389 // Start service in background
390 go service.Start()
391
392 // Wait for interrupt
393 sigChan := make(chan os.Signal, 1)
394 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
395 <-sigChan
396
397 log.Println("Shutting down...")
398 service.Stop()
399}
400```
401
402**Usage:**
403```bash
404go run main.go
405# Starting sync service...
406# Checking for new bundles...
407# ✓ Fetched bundle 8548 (10000 ops, 8234 DIDs)
408# ✓ Fetched bundle 8549 (10000 ops, 8156 DIDs)
409# ✓ Up to date
410# ... (repeats every 5 minutes)
411```
412
413### Pattern 2: Reading and Processing Operations
414
415**Goal:** Process all historical operations for analysis.
416
417```go
418package main
419
420import (
421 "context"
422 "log"
423
424 plcbundle "tangled.org/atscan.net/plcbundle"
425)
426
427type OperationProcessor struct {
428 mgr *plcbundle.Manager
429}
430
431func NewOperationProcessor(bundleDir string) (*OperationProcessor, error) {
432 mgr, err := plcbundle.New(bundleDir, "")
433 if err != nil {
434 return nil, err
435 }
436
437 return &OperationProcessor{mgr: mgr}, nil
438}
439
440func (p *OperationProcessor) ProcessAll() error {
441 ctx := context.Background()
442
443 index := p.mgr.GetIndex()
444 bundles := index.GetBundles()
445
446 log.Printf("Processing %d bundles...", len(bundles))
447
448 totalOps := 0
449 uniqueDIDs := make(map[string]bool)
450
451 for _, meta := range bundles {
452 // Load bundle
453 bundle, err := p.mgr.Load(ctx, meta.BundleNumber)
454 if err != nil {
455 return err
456 }
457
458 // Process operations
459 for _, op := range bundle.Operations {
460 totalOps++
461 uniqueDIDs[op.DID] = true
462
463 // Your processing logic here
464 p.processOperation(op)
465 }
466
467 if meta.BundleNumber % 100 == 0 {
468 log.Printf("Processed bundle %d...", meta.BundleNumber)
469 }
470 }
471
472 log.Printf("✓ Processed %d operations from %d unique DIDs",
473 totalOps, len(uniqueDIDs))
474
475 return nil
476}
477
478func (p *OperationProcessor) processOperation(op plcbundle.PLCOperation) {
479 // Parse Operation field on-demand
480 operation, err := op.GetOperationMap()
481 if err != nil || operation == nil {
482 return
483 }
484
485 // Example: Extract PDS endpoints
486 if services, ok := operation["services"].(map[string]interface{}); ok {
487 if pds, ok := services["atproto_pds"].(map[string]interface{}); ok {
488 if endpoint, ok := pds["endpoint"].(string); ok {
489 log.Printf("DID %s uses PDS: %s", op.DID, endpoint)
490 }
491 }
492 }
493}
494
495
496func main() {
497 processor, err := NewOperationProcessor("./plc_data")
498 if err != nil {
499 log.Fatal(err)
500 }
501
502 if err := processor.ProcessAll(); err != nil {
503 log.Fatal(err)
504 }
505}
506```
507
508### Pattern 3: Time-Based Queries
509
510**Goal:** Export operations from a specific time period.
511
512```go
513package main
514
515import (
516 "context"
517 "encoding/json"
518 "log"
519 "os"
520 "time"
521
522 plcbundle "tangled.org/atscan.net/plcbundle"
523)
524
525func exportOperationsSince(bundleDir string, since time.Time, limit int) error {
526 mgr, err := plcbundle.New(bundleDir, "")
527 if err != nil {
528 return err
529 }
530 defer mgr.Close()
531
532 ctx := context.Background()
533
534 // Export operations after timestamp
535 ops, err := mgr.Export(ctx, since, limit)
536 if err != nil {
537 return err
538 }
539
540 log.Printf("Exporting %d operations...", len(ops))
541
542 // Write as JSONL to stdout
543 encoder := json.NewEncoder(os.Stdout)
544 for _, op := range ops {
545 if err := encoder.Encode(op); err != nil {
546 return err
547 }
548 }
549
550 return nil
551}
552
553func main() {
554 // Export operations from the last 7 days
555 since := time.Now().AddDate(0, 0, -7)
556
557 if err := exportOperationsSince("./plc_data", since, 50000); err != nil {
558 log.Fatal(err)
559 }
560}
561```
562
563**Output to file:**
564```bash
565go run main.go > last_7_days.jsonl
566```
567
568### Pattern 4: Verification Service
569
570**Goal:** Periodically verify bundle integrity.
571
572```go
573package main
574
575import (
576 "context"
577 "log"
578 "time"
579
580 plcbundle "tangled.org/atscan.net/plcbundle"
581)
582
583type VerificationService struct {
584 mgr *plcbundle.Manager
585 interval time.Duration
586}
587
588func NewVerificationService(bundleDir string, interval time.Duration) (*VerificationService, error) {
589 mgr, err := plcbundle.New(bundleDir, "")
590 if err != nil {
591 return nil, err
592 }
593
594 return &VerificationService{
595 mgr: mgr,
596 interval: interval,
597 }, nil
598}
599
600func (v *VerificationService) Start() {
601 ticker := time.NewTicker(v.interval)
602 defer ticker.Stop()
603
604 // Verify immediately on start
605 v.verify()
606
607 for range ticker.C {
608 v.verify()
609 }
610}
611
612func (v *VerificationService) verify() {
613 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
614 defer cancel()
615
616 log.Println("Starting chain verification...")
617 start := time.Now()
618
619 result, err := v.mgr.VerifyChain(ctx)
620 if err != nil {
621 log.Printf("❌ Verification error: %v", err)
622 return
623 }
624
625 elapsed := time.Since(start)
626
627 if result.Valid {
628 log.Printf("✅ Chain verified: %d bundles, took %s",
629 result.ChainLength, elapsed.Round(time.Second))
630
631 // Get head hash
632 index := v.mgr.GetIndex()
633 if last := index.GetLastBundle(); last != nil {
634 log.Printf(" Head hash: %s...", last.Hash[:16])
635 }
636 } else {
637 log.Printf("❌ Chain broken at bundle %d: %s",
638 result.BrokenAt, result.Error)
639
640 // Alert or take action
641 v.handleBrokenChain(result)
642 }
643}
644
645func (v *VerificationService) handleBrokenChain(result *plcbundle.ChainVerificationResult) {
646 // Send alert, trigger re-sync, etc.
647 log.Printf("⚠️ ALERT: Chain integrity compromised!")
648 // TODO: Implement your alerting logic
649}
650
651func main() {
652 service, err := NewVerificationService("./plc_data", 24*time.Hour)
653 if err != nil {
654 log.Fatal(err)
655 }
656
657 log.Println("Verification service started (daily checks)")
658 service.Start()
659}
660```
661
662### Pattern 5: Custom HTTP API
663
664**Goal:** Build a custom API on top of your bundle archive.
665
666```go
667package main
668
669import (
670 "encoding/json"
671 "log"
672 "net/http"
673 "strconv"
674
675 plcbundle "tangled.org/atscan.net/plcbundle"
676)
677
678type API struct {
679 mgr *plcbundle.Manager
680}
681
682func NewAPI(bundleDir string) (*API, error) {
683 mgr, err := plcbundle.New(bundleDir, "")
684 if err != nil {
685 return nil, err
686 }
687
688 return &API{mgr: mgr}, nil
689}
690
691func (api *API) handleStats(w http.ResponseWriter, r *http.Request) {
692 index := api.mgr.GetIndex()
693 stats := index.GetStats()
694
695 response := map[string]interface{}{
696 "bundles": stats["bundle_count"],
697 "first": stats["first_bundle"],
698 "last": stats["last_bundle"],
699 "total_size": stats["total_size"],
700 "start_time": stats["start_time"],
701 "end_time": stats["end_time"],
702 "updated_at": stats["updated_at"],
703 }
704
705 w.Header().Set("Content-Type", "application/json")
706 json.NewEncoder(w).Encode(response)
707}
708
709func (api *API) handleOperations(w http.ResponseWriter, r *http.Request) {
710 bundleNumStr := r.URL.Query().Get("bundle")
711 if bundleNumStr == "" {
712 http.Error(w, "bundle parameter required", http.StatusBadRequest)
713 return
714 }
715
716 bundleNum, err := strconv.Atoi(bundleNumStr)
717 if err != nil {
718 http.Error(w, "invalid bundle number", http.StatusBadRequest)
719 return
720 }
721
722 ctx := r.Context()
723 bundle, err := api.mgr.Load(ctx, bundleNum)
724 if err != nil {
725 http.Error(w, err.Error(), http.StatusNotFound)
726 return
727 }
728
729 w.Header().Set("Content-Type", "application/x-ndjson")
730 encoder := json.NewEncoder(w)
731 for _, op := range bundle.Operations {
732 encoder.Encode(op)
733 }
734}
735
736func (api *API) handleDID(w http.ResponseWriter, r *http.Request) {
737 did := r.URL.Query().Get("did")
738 if did == "" {
739 http.Error(w, "did parameter required", http.StatusBadRequest)
740 return
741 }
742
743 ctx := r.Context()
744
745 // Search through bundles for this DID
746 var operations []plcbundle.PLCOperation
747
748 index := api.mgr.GetIndex()
749 bundles := index.GetBundles()
750
751 for _, meta := range bundles {
752 bundle, err := api.mgr.Load(ctx, meta.BundleNumber)
753 if err != nil {
754 continue
755 }
756
757 for _, op := range bundle.Operations {
758 if op.DID == did {
759 operations = append(operations, op)
760 }
761 }
762 }
763
764 w.Header().Set("Content-Type", "application/json")
765 json.NewEncoder(w).Encode(map[string]interface{}{
766 "did": did,
767 "operations": operations,
768 "count": len(operations),
769 })
770}
771
772func main() {
773 api, err := NewAPI("./plc_data")
774 if err != nil {
775 log.Fatal(err)
776 }
777
778 http.HandleFunc("/stats", api.handleStats)
779 http.HandleFunc("/operations", api.handleOperations)
780 http.HandleFunc("/did", api.handleDID)
781
782 log.Println("API listening on :8080")
783 log.Fatal(http.ListenAndServe(":8080", nil))
784}
785```
786
787**Usage:**
788```bash
789# Get stats
790curl http://localhost:8080/stats
791
792# Get operations from bundle 1
793curl http://localhost:8080/operations?bundle=1
794
795# Get all operations for a DID
796curl http://localhost:8080/did?did=did:plc:example123
797```
798
799---
800
801## Building Applications
802
803### Application 1: PDS Discovery Tool
804
805Find all PDS endpoints in the network:
806
807```go
808package main
809
810import (
811 "context"
812 "fmt"
813 "log"
814
815 plcbundle "tangled.org/atscan.net/plcbundle"
816)
817
818type PDSTracker struct {
819 mgr *plcbundle.Manager
820 endpoints map[string]int // endpoint -> count
821}
822
823func NewPDSTracker(bundleDir string) (*PDSTracker, error) {
824 mgr, err := plcbundle.New(bundleDir, "")
825 if err != nil {
826 return nil, err
827 }
828
829 return &PDSTracker{
830 mgr: mgr,
831 endpoints: make(map[string]int),
832 }, nil
833}
834
835func (pt *PDSTracker) Scan() error {
836 ctx := context.Background()
837
838 index := pt.mgr.GetIndex()
839 bundles := index.GetBundles()
840
841 log.Printf("Scanning %d bundles for PDS endpoints...", len(bundles))
842
843 for _, meta := range bundles {
844 bundle, err := pt.mgr.Load(ctx, meta.BundleNumber)
845 if err != nil {
846 return err
847 }
848
849 for _, op := range bundle.Operations {
850 if endpoint := pt.extractPDS(op); endpoint != "" {
851 pt.endpoints[endpoint]++
852 }
853 }
854 }
855
856 return nil
857}
858
859func (pt *PDSTracker) extractPDS(op plcbundle.PLCOperation) string {
860 // Parse Operation field on-demand
861 operation, err := op.GetOperationMap()
862 if err != nil || operation == nil {
863 return ""
864 }
865
866 services, ok := operation["services"].(map[string]interface{})
867 if !ok {
868 return ""
869 }
870
871 pds, ok := services["atproto_pds"].(map[string]interface{})
872 if !ok {
873 return ""
874 }
875
876 endpoint, ok := pds["endpoint"].(string)
877 if !ok {
878 return ""
879 }
880
881 return endpoint
882}
883
884
885func (pt *PDSTracker) PrintResults() {
886 log.Printf("\nFound %d unique PDS endpoints:\n", len(pt.endpoints))
887
888 // Sort by count
889 type endpointCount struct {
890 endpoint string
891 count int
892 }
893
894 var sorted []endpointCount
895 for endpoint, count := range pt.endpoints {
896 sorted = append(sorted, endpointCount{endpoint, count})
897 }
898
899 sort.Slice(sorted, func(i, j int) bool {
900 return sorted[i].count > sorted[j].count
901 })
902
903 // Print top 20
904 for i, ec := range sorted {
905 if i >= 20 {
906 break
907 }
908 fmt.Printf("%3d. %s (%d DIDs)\n", i+1, ec.endpoint, ec.count)
909 }
910}
911
912func main() {
913 tracker, err := NewPDSTracker("./plc_data")
914 if err != nil {
915 log.Fatal(err)
916 }
917
918 if err := tracker.Scan(); err != nil {
919 log.Fatal(err)
920 }
921
922 tracker.PrintResults()
923}
924```
925
926### Application 2: DID History Viewer
927
928View the complete history of a DID:
929
930```go
931package main
932
933import (
934 "context"
935 "encoding/json"
936 "fmt"
937 "log"
938 "os"
939
940 plcbundle "tangled.org/atscan.net/plcbundle"
941)
942
943type DIDHistory struct {
944 DID string `json:"did"`
945 Operations []plcbundle.PLCOperation `json:"operations"`
946 FirstSeen time.Time `json:"first_seen"`
947 LastSeen time.Time `json:"last_seen"`
948 OpCount int `json:"operation_count"`
949}
950
951func getDIDHistory(bundleDir, did string) (*DIDHistory, error) {
952 mgr, err := plcbundle.New(bundleDir, "")
953 if err != nil {
954 return nil, err
955 }
956 defer mgr.Close()
957
958 ctx := context.Background()
959
960 history := &DIDHistory{
961 DID: did,
962 Operations: make([]plcbundle.PLCOperation, 0),
963 }
964
965 index := mgr.GetIndex()
966 bundles := index.GetBundles()
967
968 log.Printf("Searching for DID %s...", did)
969
970 for _, meta := range bundles {
971 bundle, err := mgr.Load(ctx, meta.BundleNumber)
972 if err != nil {
973 continue
974 }
975
976 for _, op := range bundle.Operations {
977 if op.DID == did {
978 history.Operations = append(history.Operations, op)
979 }
980 }
981 }
982
983 if len(history.Operations) == 0 {
984 return nil, fmt.Errorf("DID not found")
985 }
986
987 // Set timestamps
988 history.FirstSeen = history.Operations[0].CreatedAt
989 history.LastSeen = history.Operations[len(history.Operations)-1].CreatedAt
990 history.OpCount = len(history.Operations)
991
992 return history, nil
993}
994
995func main() {
996 if len(os.Args) < 2 {
997 log.Fatal("Usage: did-history <did>")
998 }
999
1000 did := os.Args[1]
1001
1002 history, err := getDIDHistory("./plc_data", did)
1003 if err != nil {
1004 log.Fatal(err)
1005 }
1006
1007 // Print as JSON
1008 encoder := json.NewEncoder(os.Stdout)
1009 encoder.SetIndent("", " ")
1010 encoder.Encode(history)
1011}
1012```
1013
1014### Application 3: Real-time Monitor
1015
1016Monitor new operations as they arrive:
1017
1018```go
1019package main
1020
1021import (
1022 "context"
1023 "log"
1024 "time"
1025
1026 plcbundle "tangled.org/atscan.net/plcbundle"
1027)
1028
1029type Monitor struct {
1030 mgr *plcbundle.Manager
1031 lastSeen int // Last bundle number processed
1032 pollInterval time.Duration
1033}
1034
1035func NewMonitor(bundleDir string, pollInterval time.Duration) (*Monitor, error) {
1036 mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
1037 if err != nil {
1038 return nil, err
1039 }
1040
1041 // Get current position
1042 index := mgr.GetIndex()
1043 lastBundle := index.GetLastBundle()
1044 lastSeen := 0
1045 if lastBundle != nil {
1046 lastSeen = lastBundle.BundleNumber
1047 }
1048
1049 return &Monitor{
1050 mgr: mgr,
1051 lastSeen: lastSeen,
1052 pollInterval: pollInterval,
1053 }, nil
1054}
1055
1056func (m *Monitor) Start() {
1057 log.Println("Monitor started, watching for new bundles...")
1058
1059 ticker := time.NewTicker(m.pollInterval)
1060 defer ticker.Stop()
1061
1062 for range ticker.C {
1063 m.check()
1064 }
1065}
1066
1067func (m *Monitor) check() {
1068 ctx := context.Background()
1069
1070 // Try to fetch next bundle
1071 bundle, err := m.mgr.FetchNext(ctx)
1072 if err != nil {
1073 // Not an error if no new bundle available
1074 return
1075 }
1076
1077 // New bundle!
1078 log.Printf("🔔 New bundle: %d", bundle.BundleNumber)
1079 log.Printf(" Operations: %d", len(bundle.Operations))
1080 log.Printf(" DIDs: %d", bundle.DIDCount)
1081 log.Printf(" Time: %s", bundle.EndTime.Format("2006-01-02 15:04:05"))
1082
1083 // Process new operations
1084 m.processNewOperations(bundle)
1085
1086 m.lastSeen = bundle.BundleNumber
1087}
1088
1089func (m *Monitor) processNewOperations(bundle *plcbundle.Bundle) {
1090 for _, op := range bundle.Operations {
1091 // Check for interesting operations
1092 if op.IsNullified() {
1093 log.Printf(" ⚠️ Nullified: %s", op.DID)
1094 }
1095
1096 // Check for new DIDs (operation type "create")
1097 operation, err := op.GetOperationMap()
1098 if err == nil && operation != nil {
1099 if opType, ok := operation["type"].(string); ok && opType == "create" {
1100 log.Printf(" ➕ New DID: %s", op.DID)
1101 }
1102 }
1103 }
1104}
1105
1106func main() {
1107 monitor, err := NewMonitor("./plc_data", 30*time.Second)
1108 if err != nil {
1109 log.Fatal(err)
1110 }
1111
1112 monitor.Start()
1113}
1114```
1115
1116---
1117
1118## Advanced Usage
1119
1120### Custom Configuration
1121
1122Full control over bundle manager behavior:
1123
1124```go
1125package main
1126
1127import (
1128 "log"
1129 "runtime"
1130 "time"
1131
1132 "tangled.org/atscan.net/plcbundle/bundle"
1133 "tangled.org/atscan.net/plcbundle/plc"
1134 plcbundle "tangled.org/atscan.net/plcbundle"
1135)
1136
1137func main() {
1138 // Custom configuration
1139 config := &bundle.Config{
1140 BundleDir: "./my_bundles",
1141 VerifyOnLoad: true, // Verify hashes when loading
1142 AutoRebuild: true, // Auto-rebuild index if needed
1143 RebuildWorkers: runtime.NumCPU(), // Parallel workers for rebuild
1144 Logger: &MyCustomLogger{}, // Custom logger
1145
1146 // Progress callback for rebuild
1147 RebuildProgress: func(current, total int) {
1148 if current%100 == 0 {
1149 log.Printf("Rebuild: %d/%d (%.1f%%)",
1150 current, total, float64(current)/float64(total)*100)
1151 }
1152 },
1153 }
1154
1155 // Custom PLC client with rate limiting
1156 plcClient := plc.NewClient("https://plc.directory",
1157 plc.WithRateLimit(60, time.Minute), // 60 req/min
1158 plc.WithTimeout(30*time.Second), // 30s timeout
1159 plc.WithLogger(&MyCustomLogger{}), // Custom logger
1160 )
1161
1162 // Create manager
1163 mgr, err := bundle.NewManager(config, plcClient)
1164 if err != nil {
1165 log.Fatal(err)
1166 }
1167 defer mgr.Close()
1168
1169 log.Println("Manager created with custom configuration")
1170}
1171
1172// Custom logger implementation
1173type MyCustomLogger struct{}
1174
1175func (l *MyCustomLogger) Printf(format string, v ...interface{}) {
1176 // Add custom formatting, filtering, etc.
1177 log.Printf("[PLCBUNDLE] "+format, v...)
1178}
1179
1180func (l *MyCustomLogger) Println(v ...interface{}) {
1181 log.Println(append([]interface{}{"[PLCBUNDLE]"}, v...)...)
1182}
1183```
1184
1185### Streaming Data
1186
1187Stream bundle data without loading everything into memory:
1188
1189```go
1190package main
1191
1192import (
1193 "bufio"
1194 "context"
1195 "encoding/json"
1196 "io"
1197 "log"
1198
1199 plcbundle "tangled.org/atscan.net/plcbundle"
1200)
1201
1202func streamBundle(mgr *plcbundle.Manager, bundleNumber int) error {
1203 ctx := context.Background()
1204
1205 // Get decompressed stream
1206 reader, err := mgr.StreamDecompressed(ctx, bundleNumber)
1207 if err != nil {
1208 return err
1209 }
1210 defer reader.Close()
1211
1212 // Read line by line (JSONL)
1213 scanner := bufio.NewScanner(reader)
1214
1215 // Set buffer size for large lines
1216 buf := make([]byte, 0, 64*1024)
1217 scanner.Buffer(buf, 1024*1024)
1218
1219 lineNum := 0
1220 for scanner.Scan() {
1221 lineNum++
1222
1223 var op plcbundle.PLCOperation
1224 if err := json.Unmarshal(scanner.Bytes(), &op); err != nil {
1225 log.Printf("Warning: failed to parse line %d: %v", lineNum, err)
1226 continue
1227 }
1228
1229 // Process operation without storing all in memory
1230 processOperation(op)
1231 }
1232
1233 return scanner.Err()
1234}
1235
1236func processOperation(op plcbundle.PLCOperation) {
1237 // Your processing logic
1238 log.Printf("Processing: %s", op.DID)
1239}
1240
1241func main() {
1242 mgr, err := plcbundle.New("./plc_data", "")
1243 if err != nil {
1244 log.Fatal(err)
1245 }
1246 defer mgr.Close()
1247
1248 // Stream bundle 1
1249 if err := streamBundle(mgr, 1); err != nil {
1250 log.Fatal(err)
1251 }
1252}
1253```
1254
1255### Parallel Processing
1256
1257Process multiple bundles concurrently:
1258
1259```go
1260package main
1261
1262import (
1263 "context"
1264 "log"
1265 "sync"
1266
1267 plcbundle "tangled.org/atscan.net/plcbundle"
1268)
1269
1270func processParallel(mgr *plcbundle.Manager, workers int) error {
1271 ctx := context.Background()
1272
1273 index := mgr.GetIndex()
1274 bundles := index.GetBundles()
1275
1276 // Create job channel
1277 jobs := make(chan int, len(bundles))
1278 results := make(chan error, len(bundles))
1279
1280 // Start workers
1281 var wg sync.WaitGroup
1282 for w := 0; w < workers; w++ {
1283 wg.Add(1)
1284 go func() {
1285 defer wg.Done()
1286 for bundleNum := range jobs {
1287 if err := processBundle(ctx, mgr, bundleNum); err != nil {
1288 results <- err
1289 } else {
1290 results <- nil
1291 }
1292 }
1293 }()
1294 }
1295
1296 // Send jobs
1297 for _, meta := range bundles {
1298 jobs <- meta.BundleNumber
1299 }
1300 close(jobs)
1301
1302 // Wait for completion
1303 go func() {
1304 wg.Wait()
1305 close(results)
1306 }()
1307
1308 // Collect results
1309 errors := 0
1310 for err := range results {
1311 if err != nil {
1312 log.Printf("Error: %v", err)
1313 errors++
1314 }
1315 }
1316
1317 if errors > 0 {
1318 return fmt.Errorf("%d bundles failed processing", errors)
1319 }
1320
1321 return nil
1322}
1323
1324func processBundle(ctx context.Context, mgr *plcbundle.Manager, bundleNum int) error {
1325 bundle, err := mgr.Load(ctx, bundleNum)
1326 if err != nil {
1327 return err
1328 }
1329
1330 // Process operations
1331 for _, op := range bundle.Operations {
1332 // Your logic here
1333 _ = op
1334 }
1335
1336 log.Printf("Processed bundle %d", bundleNum)
1337 return nil
1338}
1339
1340func main() {
1341 mgr, err := plcbundle.New("./plc_data", "")
1342 if err != nil {
1343 log.Fatal(err)
1344 }
1345 defer mgr.Close()
1346
1347 // Process with 8 workers
1348 if err := processParallel(mgr, 8); err != nil {
1349 log.Fatal(err)
1350 }
1351}
1352```
1353
1354### Working with Mempool
1355
1356Access operations before they're bundled:
1357
1358```go
1359package main
1360
1361import (
1362 "log"
1363
1364 plcbundle "tangled.org/atscan.net/plcbundle"
1365)
1366
1367func main() {
1368 mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
1369 if err != nil {
1370 log.Fatal(err)
1371 }
1372 defer mgr.Close()
1373
1374 // Get mempool stats
1375 stats := mgr.GetMempoolStats()
1376
1377 count := stats["count"].(int)
1378 targetBundle := stats["target_bundle"].(int)
1379 canCreate := stats["can_create_bundle"].(bool)
1380
1381 log.Printf("Mempool status:")
1382 log.Printf(" Target bundle: %d", targetBundle)
1383 log.Printf(" Operations: %d/%d", count, plcbundle.BUNDLE_SIZE)
1384 log.Printf(" Ready: %v", canCreate)
1385
1386 if count > 0 {
1387 // Get mempool operations
1388 ops, err := mgr.GetMempoolOperations()
1389 if err != nil {
1390 log.Fatal(err)
1391 }
1392
1393 log.Printf("Latest unbundled operations:")
1394 for i, op := range ops {
1395 if i >= 5 {
1396 break
1397 }
1398 log.Printf(" %d. %s (%s)", i+1, op.DID, op.CreatedAt.Format("15:04:05"))
1399 }
1400 }
1401
1402 // Validate chronological order
1403 if err := mgr.ValidateMempool(); err != nil {
1404 log.Printf("⚠️ Mempool validation failed: %v", err)
1405 } else {
1406 log.Println("✓ Mempool validated")
1407 }
1408}
1409```
1410
1411---
1412
1413## Best Practices
1414
1415### 1. Always Close the Manager
1416
1417Use `defer` to ensure cleanup:
1418
1419```go
1420mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
1421if err != nil {
1422 return err
1423}
1424defer mgr.Close() // Always close!
1425```
1426
1427### 2. Handle Context Cancellation
1428
1429Support graceful shutdown:
1430
1431```go
1432ctx, cancel := context.WithCancel(context.Background())
1433defer cancel()
1434
1435// Listen for interrupt
1436sigChan := make(chan os.Signal, 1)
1437signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
1438
1439go func() {
1440 <-sigChan
1441 log.Println("Interrupt received, stopping...")
1442 cancel()
1443}()
1444
1445// Use context in operations
1446bundle, err := mgr.FetchNext(ctx)
1447if err == context.Canceled {
1448 log.Println("Operation cancelled gracefully")
1449 return nil
1450}
1451```
1452
1453### 3. Check Errors Properly
1454
1455Distinguish between different error types:
1456
1457```go
1458bundle, err := mgr.FetchNext(ctx)
1459if err != nil {
1460 // Check if it's just "caught up"
1461 if strings.Contains(err.Error(), "insufficient operations") {
1462 log.Println("No new bundles available (caught up)")
1463 return nil
1464 }
1465
1466 // Real error
1467 return fmt.Errorf("fetch failed: %w", err)
1468}
1469```
1470
1471### 4. Use Streaming for Large Datasets
1472
1473Don't load everything into memory:
1474
1475```go
1476// ❌ Bad: Loads all operations into memory
1477index := mgr.GetIndex()
1478var allOps []plcbundle.PLCOperation
1479for _, meta := range index.GetBundles() {
1480 bundle, _ := mgr.Load(ctx, meta.BundleNumber)
1481 allOps = append(allOps, bundle.Operations...)
1482}
1483
1484// ✅ Good: Process one bundle at a time
1485for _, meta := range index.GetBundles() {
1486 bundle, _ := mgr.Load(ctx, meta.BundleNumber)
1487 for _, op := range bundle.Operations {
1488 processOperation(op)
1489 }
1490}
1491```
1492
1493### 5. Enable Verification in Production
1494
1495```go
1496config := plcbundle.DefaultConfig("./plc_data")
1497config.VerifyOnLoad = true // Verify hashes when loading
1498
1499mgr, err := plcbundle.NewManager(config, plcClient)
1500```
1501
1502### 6. Log Appropriately
1503
1504Implement custom logger for production:
1505
1506```go
1507type ProductionLogger struct {
1508 logger *zap.Logger
1509}
1510
1511func (l *ProductionLogger) Printf(format string, v ...interface{}) {
1512 l.logger.Sugar().Infof(format, v...)
1513}
1514
1515func (l *ProductionLogger) Println(v ...interface{}) {
1516 l.logger.Sugar().Info(v...)
1517}
1518```
1519
1520### 7. Handle Rate Limits
1521
1522Configure PLC client appropriately:
1523
1524```go
1525// Production: Be conservative
1526plcClient := plc.NewClient("https://plc.directory",
1527 plc.WithRateLimit(60, time.Minute), // 60 req/min max
1528 plc.WithTimeout(60*time.Second),
1529)
1530
1531// Development: Can be more aggressive (but respectful)
1532plcClient := plc.NewClient("https://plc.directory",
1533 plc.WithRateLimit(90, time.Minute),
1534 plc.WithTimeout(30*time.Second),
1535)
1536```
1537
1538---
1539
1540## API Reference
1541
1542### Manager Methods
1543
1544```go
1545// Creation
1546New(bundleDir, plcURL string) (*Manager, error)
1547NewManager(config *Config, plcClient *PLCClient) (*Manager, error)
1548
1549// Lifecycle
1550Close()
1551
1552// Fetching
1553FetchNext(ctx) (*Bundle, error)
1554
1555// Loading
1556Load(ctx, bundleNumber int) (*Bundle, error)
1557
1558// Verification
1559Verify(ctx, bundleNumber int) (*VerificationResult, error)
1560VerifyChain(ctx) (*ChainVerificationResult, error)
1561
1562// Exporting
1563Export(ctx, afterTime time.Time, count int) ([]PLCOperation, error)
1564
1565// Streaming
1566StreamRaw(ctx, bundleNumber int) (io.ReadCloser, error)
1567StreamDecompressed(ctx, bundleNumber int) (io.ReadCloser, error)
1568
1569// Index
1570GetIndex() *Index
1571ScanBundle(path string, bundleNumber int) (*BundleMetadata, error)
1572Scan() (*DirectoryScanResult, error)
1573
1574// Mempool
1575GetMempoolStats() map[string]interface{}
1576GetMempoolOperations() ([]PLCOperation, error)
1577ValidateMempool() error
1578ClearMempool() error
1579
1580// Info
1581GetInfo() map[string]interface{}
1582IsBundleIndexed(bundleNumber int) bool
1583```
1584
1585### Index Methods
1586
1587```go
1588// Creation
1589NewIndex() *Index
1590LoadIndex(path string) (*Index, error)
1591
1592// Persistence
1593Save(path string) error
1594
1595// Queries
1596GetBundle(bundleNumber int) (*BundleMetadata, error)
1597GetLastBundle() *BundleMetadata
1598GetBundles() []*BundleMetadata
1599GetBundleRange(start, end int) []*BundleMetadata
1600
1601// Stats
1602Count() int
1603FindGaps() []int
1604GetStats() map[string]interface{}
1605```
1606
1607### Configuration Types
1608
1609```go
1610type Config struct {
1611 BundleDir string
1612 VerifyOnLoad bool
1613 AutoRebuild bool
1614 RebuildWorkers int
1615 RebuildProgress func(current, total int)
1616 Logger Logger
1617}
1618
1619type Logger interface {
1620 Printf(format string, v ...interface{})
1621 Println(v ...interface{})
1622}
1623```
1624
1625---
1626
1627## Troubleshooting
1628
1629### Bundle Not Found Error
1630
1631```go
1632bundle, err := mgr.Load(ctx, 999)
1633if err != nil {
1634 if strings.Contains(err.Error(), "not in index") {
1635 // Bundle doesn't exist
1636 log.Printf("Bundle 999 hasn't been fetched yet")
1637 }
1638}
1639```
1640
1641### Insufficient Operations Error
1642
1643```go
1644bundle, err := mgr.FetchNext(ctx)
1645if err != nil {
1646 if strings.Contains(err.Error(), "insufficient operations") {
1647 // Not enough operations for a complete bundle
1648 // Check mempool
1649 stats := mgr.GetMempoolStats()
1650 count := stats["count"].(int)
1651 log.Printf("Only %d operations available (need %d)", count, plcbundle.BUNDLE_SIZE)
1652 }
1653}
1654```
1655
1656### Memory Usage
1657
1658If processing large numbers of bundles:
1659
1660```go
1661// Force garbage collection between bundles
1662for _, meta := range index.GetBundles() {
1663 bundle, _ := mgr.Load(ctx, meta.BundleNumber)
1664 processBundle(bundle)
1665
1666 runtime.GC() // Help garbage collector
1667}
1668```
1669
1670---
1671
1672## Examples Repository
1673
1674Find complete, runnable examples at:
1675- https://github.com/plcbundle/examples
1676
1677Including:
1678- Complete sync service
1679- API server
1680- Analysis tools
1681- Monitoring services
1682