A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
1# Library Guide
2
3A practical guide to using plcbundle as a Go library in your applications.
4
5## Table of Contents
6
7- [Getting Started](#getting-started)
8- [Core Concepts](#core-concepts)
9- [Common Patterns](#common-patterns)
10- [Building Applications](#building-applications)
11- [Advanced Usage](#advanced-usage)
12- [Best Practices](#best-practices)
13- [API Reference](#api-reference)
14
15---
16
17## Getting Started
18
19### Installation
20
21```bash
22go get tangled.org/atscan.net/plcbundle
23```
24
25### Your First Program
26
27Create a simple program to fetch and display bundle information:
28
29```go
30package main
31
32import (
33 "context"
34 "log"
35
36 plcbundle "tangled.org/atscan.net/plcbundle"
37)
38
39func main() {
40 // Create a manager
41 mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
42 if err != nil {
43 log.Fatal(err)
44 }
45 defer mgr.Close()
46
47 // Get repository info
48 info := mgr.GetInfo()
49 log.Printf("Bundle directory: %s", info["bundle_dir"])
50
51 // Get index stats
52 index := mgr.GetIndex()
53 stats := index.GetStats()
54 log.Printf("Total bundles: %d", stats["bundle_count"])
55}
56```
57
58Run it:
59```bash
60go run main.go
61# 2025/01/15 10:30:00 Bundle directory: ./plc_data
62# 2025/01/15 10:30:00 Total bundles: 0
63```
64
65### Fetching Your First Bundle
66
67Let's fetch a bundle from the PLC directory:
68
69```go
70package main
71
72import (
73 "context"
74 "log"
75
76 plcbundle "tangled.org/atscan.net/plcbundle"
77)
78
79func main() {
80 mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
81 if err != nil {
82 log.Fatal(err)
83 }
84 defer mgr.Close()
85
86 ctx := context.Background()
87
88 // Fetch next bundle
89 log.Println("Fetching bundle...")
90 bundle, err := mgr.FetchNext(ctx)
91 if err != nil {
92 log.Fatal(err)
93 }
94
95 log.Printf("✓ Fetched bundle %d", bundle.BundleNumber)
96 log.Printf(" Operations: %d", len(bundle.Operations))
97 log.Printf(" Unique DIDs: %d", bundle.DIDCount)
98 log.Printf(" Time range: %s to %s",
99 bundle.StartTime.Format("2006-01-02"),
100 bundle.EndTime.Format("2006-01-02"))
101}
102```
103
104**What's happening here?**
105
1061. `plcbundle.New()` creates a manager that handles all bundle operations
1072. `FetchNext()` automatically:
108 - Fetches operations from PLC directory
109 - Creates a bundle when 10,000 operations are collected
110 - Saves the bundle to disk
111 - Updates the index
112 - Returns the bundle object
113
114### Reading Bundles
115
116Once you have bundles, you can load and read them:
117
118```go
119package main
120
121import (
122 "context"
123 "log"
124
125 plcbundle "tangled.org/atscan.net/plcbundle"
126)
127
128func main() {
129 mgr, err := plcbundle.New("./plc_data", "")
130 if err != nil {
131 log.Fatal(err)
132 }
133 defer mgr.Close()
134
135 ctx := context.Background()
136
137 // Load bundle 1
138 bundle, err := mgr.Load(ctx, 1)
139 if err != nil {
140 log.Fatal(err)
141 }
142
143 log.Printf("Bundle %d loaded", bundle.BundleNumber)
144
145 // Iterate through operations
146 for i, op := range bundle.Operations {
147 if i >= 5 {
148 break // Just show first 5
149 }
150 log.Printf("%d. DID: %s, CID: %s", i+1, op.DID, op.CID)
151 }
152}
153```
154
155---
156
157## Core Concepts
158
159### The Manager
160
161The `Manager` is your main entry point. It handles:
162- Bundle storage and retrieval
163- Index management
164- PLC directory synchronization
165- Verification
166- Mempool management
167
168**Creating a manager:**
169
170```go
171// Simple creation
172mgr, err := plcbundle.New("./bundles", "https://plc.directory")
173
174// Custom configuration
175config := plcbundle.DefaultConfig("./bundles")
176config.VerifyOnLoad = true
177config.AutoRebuild = true
178
179plcClient := plcbundle.NewPLCClient("https://plc.directory")
180mgr, err := plcbundle.NewManager(config, plcClient)
181```
182
183### Bundles
184
185A bundle contains exactly 10,000 operations:
186
187```go
188type Bundle struct {
189 BundleNumber int // Sequential number (1, 2, 3...)
190 StartTime time.Time // First operation timestamp
191 EndTime time.Time // Last operation timestamp
192 Operations []plc.PLCOperation // The 10,000 operations
193 DIDCount int // Unique DIDs in bundle
194 Hash string // Chain hash (includes history)
195 ContentHash string // This bundle's content hash
196 Parent string // Previous bundle's chain hash
197 CompressedSize int64 // File size on disk
198 UncompressedSize int64 // Original JSONL size
199}
200```
201
202### The Index
203
204The index tracks all bundles and their metadata:
205
206```go
207index := mgr.GetIndex()
208
209// Get all bundles
210bundles := index.GetBundles()
211for _, meta := range bundles {
212 log.Printf("Bundle %d: %s to %s",
213 meta.BundleNumber,
214 meta.StartTime.Format("2006-01-02"),
215 meta.EndTime.Format("2006-01-02"))
216}
217
218// Get specific bundle metadata
219meta, err := index.GetBundle(42)
220
221// Get last bundle
222lastBundle := index.GetLastBundle()
223```
224
225### Operations
226
227Each operation represents a DID PLC directory event:
228
229```go
230type PLCOperation struct {
231 DID string // The DID (did:plc:...)
232 Operation map[string]interface{} // The operation data
233 CID string // Content identifier
234 Nullified interface{} // nil, false, or CID string
235 CreatedAt time.Time // When it was created
236 RawJSON []byte // Original JSON bytes
237}
238
239// Check if operation was nullified
240if op.IsNullified() {
241 log.Printf("Operation %s was nullified by %s", op.CID, op.GetNullifyingCID())
242}
243```
244
245---
246
247## Common Patterns
248
249### Pattern 1: Transparent Sync Service
250
251**Goal:** Keep a local PLC mirror continuously synchronized.
252
253This is the most common use case - maintaining an up-to-date copy of the PLC directory.
254
255```go
256package main
257
258import (
259 "context"
260 "log"
261 "os"
262 "os/signal"
263 "syscall"
264 "time"
265
266 plcbundle "tangled.org/atscan.net/plcbundle"
267)
268
269type SyncService struct {
270 mgr *plcbundle.Manager
271 interval time.Duration
272 stop chan struct{}
273}
274
275func NewSyncService(bundleDir string, interval time.Duration) (*SyncService, error) {
276 mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
277 if err != nil {
278 return nil, err
279 }
280
281 return &SyncService{
282 mgr: mgr,
283 interval: interval,
284 stop: make(chan struct{}),
285 }, nil
286}
287
288func (s *SyncService) Start() {
289 log.Println("Starting sync service...")
290
291 // Initial sync
292 s.sync()
293
294 // Periodic sync
295 ticker := time.NewTicker(s.interval)
296 defer ticker.Stop()
297
298 for {
299 select {
300 case <-ticker.C:
301 s.sync()
302 case <-s.stop:
303 log.Println("Sync service stopped")
304 return
305 }
306 }
307}
308
309func (s *SyncService) sync() {
310 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
311 defer cancel()
312
313 log.Println("Checking for new bundles...")
314
315 fetched := 0
316 for {
317 bundle, err := s.mgr.FetchNext(ctx)
318 if err != nil {
319 if isInsufficientOps(err) {
320 if fetched > 0 {
321 log.Printf("✓ Synced %d new bundles", fetched)
322 } else {
323 log.Println("✓ Up to date")
324 }
325 return
326 }
327 log.Printf("Error: %v", err)
328 return
329 }
330
331 fetched++
332 log.Printf("✓ Fetched bundle %d (%d ops, %d DIDs)",
333 bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount)
334 }
335}
336
337func (s *SyncService) Stop() {
338 close(s.stop)
339 s.mgr.Close()
340}
341
342func isInsufficientOps(err error) bool {
343 return err != nil &&
344 (strings.Contains(err.Error(), "insufficient operations") ||
345 strings.Contains(err.Error(), "no more available"))
346}
347
348func main() {
349 service, err := NewSyncService("./plc_data", 5*time.Minute)
350 if err != nil {
351 log.Fatal(err)
352 }
353
354 // Start service in background
355 go service.Start()
356
357 // Wait for interrupt
358 sigChan := make(chan os.Signal, 1)
359 signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
360 <-sigChan
361
362 log.Println("Shutting down...")
363 service.Stop()
364}
365```
366
367**Usage:**
368```bash
369go run main.go
370# Starting sync service...
371# Checking for new bundles...
372# ✓ Fetched bundle 8548 (10000 ops, 8234 DIDs)
373# ✓ Fetched bundle 8549 (10000 ops, 8156 DIDs)
374# ✓ Up to date
375# ... (repeats every 5 minutes)
376```
377
378### Pattern 2: Reading and Processing Operations
379
380**Goal:** Process all historical operations for analysis.
381
382```go
383package main
384
385import (
386 "context"
387 "log"
388
389 plcbundle "tangled.org/atscan.net/plcbundle"
390)
391
392type OperationProcessor struct {
393 mgr *plcbundle.Manager
394}
395
396func NewOperationProcessor(bundleDir string) (*OperationProcessor, error) {
397 mgr, err := plcbundle.New(bundleDir, "")
398 if err != nil {
399 return nil, err
400 }
401
402 return &OperationProcessor{mgr: mgr}, nil
403}
404
405func (p *OperationProcessor) ProcessAll() error {
406 ctx := context.Background()
407
408 index := p.mgr.GetIndex()
409 bundles := index.GetBundles()
410
411 log.Printf("Processing %d bundles...", len(bundles))
412
413 totalOps := 0
414 uniqueDIDs := make(map[string]bool)
415
416 for _, meta := range bundles {
417 // Load bundle
418 bundle, err := p.mgr.Load(ctx, meta.BundleNumber)
419 if err != nil {
420 return err
421 }
422
423 // Process operations
424 for _, op := range bundle.Operations {
425 totalOps++
426 uniqueDIDs[op.DID] = true
427
428 // Your processing logic here
429 p.processOperation(op)
430 }
431
432 if meta.BundleNumber % 100 == 0 {
433 log.Printf("Processed bundle %d...", meta.BundleNumber)
434 }
435 }
436
437 log.Printf("✓ Processed %d operations from %d unique DIDs",
438 totalOps, len(uniqueDIDs))
439
440 return nil
441}
442
443func (p *OperationProcessor) processOperation(op plcbundle.PLCOperation) {
444 // Example: Extract PDS endpoints
445 if services, ok := op.Operation["services"].(map[string]interface{}); ok {
446 if pds, ok := services["atproto_pds"].(map[string]interface{}); ok {
447 if endpoint, ok := pds["endpoint"].(string); ok {
448 log.Printf("DID %s uses PDS: %s", op.DID, endpoint)
449 }
450 }
451 }
452}
453
454func main() {
455 processor, err := NewOperationProcessor("./plc_data")
456 if err != nil {
457 log.Fatal(err)
458 }
459
460 if err := processor.ProcessAll(); err != nil {
461 log.Fatal(err)
462 }
463}
464```
465
466### Pattern 3: Time-Based Queries
467
468**Goal:** Export operations from a specific time period.
469
470```go
471package main
472
473import (
474 "context"
475 "encoding/json"
476 "log"
477 "os"
478 "time"
479
480 plcbundle "tangled.org/atscan.net/plcbundle"
481)
482
483func exportOperationsSince(bundleDir string, since time.Time, limit int) error {
484 mgr, err := plcbundle.New(bundleDir, "")
485 if err != nil {
486 return err
487 }
488 defer mgr.Close()
489
490 ctx := context.Background()
491
492 // Export operations after timestamp
493 ops, err := mgr.Export(ctx, since, limit)
494 if err != nil {
495 return err
496 }
497
498 log.Printf("Exporting %d operations...", len(ops))
499
500 // Write as JSONL to stdout
501 encoder := json.NewEncoder(os.Stdout)
502 for _, op := range ops {
503 if err := encoder.Encode(op); err != nil {
504 return err
505 }
506 }
507
508 return nil
509}
510
511func main() {
512 // Export operations from the last 7 days
513 since := time.Now().AddDate(0, 0, -7)
514
515 if err := exportOperationsSince("./plc_data", since, 50000); err != nil {
516 log.Fatal(err)
517 }
518}
519```
520
521**Output to file:**
522```bash
523go run main.go > last_7_days.jsonl
524```
525
526### Pattern 4: Verification Service
527
528**Goal:** Periodically verify bundle integrity.
529
530```go
531package main
532
533import (
534 "context"
535 "log"
536 "time"
537
538 plcbundle "tangled.org/atscan.net/plcbundle"
539)
540
541type VerificationService struct {
542 mgr *plcbundle.Manager
543 interval time.Duration
544}
545
546func NewVerificationService(bundleDir string, interval time.Duration) (*VerificationService, error) {
547 mgr, err := plcbundle.New(bundleDir, "")
548 if err != nil {
549 return nil, err
550 }
551
552 return &VerificationService{
553 mgr: mgr,
554 interval: interval,
555 }, nil
556}
557
558func (v *VerificationService) Start() {
559 ticker := time.NewTicker(v.interval)
560 defer ticker.Stop()
561
562 // Verify immediately on start
563 v.verify()
564
565 for range ticker.C {
566 v.verify()
567 }
568}
569
570func (v *VerificationService) verify() {
571 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
572 defer cancel()
573
574 log.Println("Starting chain verification...")
575 start := time.Now()
576
577 result, err := v.mgr.VerifyChain(ctx)
578 if err != nil {
579 log.Printf("❌ Verification error: %v", err)
580 return
581 }
582
583 elapsed := time.Since(start)
584
585 if result.Valid {
586 log.Printf("✅ Chain verified: %d bundles, took %s",
587 result.ChainLength, elapsed.Round(time.Second))
588
589 // Get head hash
590 index := v.mgr.GetIndex()
591 if last := index.GetLastBundle(); last != nil {
592 log.Printf(" Head hash: %s...", last.Hash[:16])
593 }
594 } else {
595 log.Printf("❌ Chain broken at bundle %d: %s",
596 result.BrokenAt, result.Error)
597
598 // Alert or take action
599 v.handleBrokenChain(result)
600 }
601}
602
603func (v *VerificationService) handleBrokenChain(result *plcbundle.ChainVerificationResult) {
604 // Send alert, trigger re-sync, etc.
605 log.Printf("⚠️ ALERT: Chain integrity compromised!")
606 // TODO: Implement your alerting logic
607}
608
609func main() {
610 service, err := NewVerificationService("./plc_data", 24*time.Hour)
611 if err != nil {
612 log.Fatal(err)
613 }
614
615 log.Println("Verification service started (daily checks)")
616 service.Start()
617}
618```
619
620### Pattern 5: Custom HTTP API
621
622**Goal:** Build a custom API on top of your bundle archive.
623
624```go
625package main
626
627import (
628 "encoding/json"
629 "log"
630 "net/http"
631 "strconv"
632
633 plcbundle "tangled.org/atscan.net/plcbundle"
634)
635
636type API struct {
637 mgr *plcbundle.Manager
638}
639
640func NewAPI(bundleDir string) (*API, error) {
641 mgr, err := plcbundle.New(bundleDir, "")
642 if err != nil {
643 return nil, err
644 }
645
646 return &API{mgr: mgr}, nil
647}
648
649func (api *API) handleStats(w http.ResponseWriter, r *http.Request) {
650 index := api.mgr.GetIndex()
651 stats := index.GetStats()
652
653 response := map[string]interface{}{
654 "bundles": stats["bundle_count"],
655 "first": stats["first_bundle"],
656 "last": stats["last_bundle"],
657 "total_size": stats["total_size"],
658 "start_time": stats["start_time"],
659 "end_time": stats["end_time"],
660 "updated_at": stats["updated_at"],
661 }
662
663 w.Header().Set("Content-Type", "application/json")
664 json.NewEncoder(w).Encode(response)
665}
666
667func (api *API) handleOperations(w http.ResponseWriter, r *http.Request) {
668 bundleNumStr := r.URL.Query().Get("bundle")
669 if bundleNumStr == "" {
670 http.Error(w, "bundle parameter required", http.StatusBadRequest)
671 return
672 }
673
674 bundleNum, err := strconv.Atoi(bundleNumStr)
675 if err != nil {
676 http.Error(w, "invalid bundle number", http.StatusBadRequest)
677 return
678 }
679
680 ctx := r.Context()
681 bundle, err := api.mgr.Load(ctx, bundleNum)
682 if err != nil {
683 http.Error(w, err.Error(), http.StatusNotFound)
684 return
685 }
686
687 w.Header().Set("Content-Type", "application/x-ndjson")
688 encoder := json.NewEncoder(w)
689 for _, op := range bundle.Operations {
690 encoder.Encode(op)
691 }
692}
693
694func (api *API) handleDID(w http.ResponseWriter, r *http.Request) {
695 did := r.URL.Query().Get("did")
696 if did == "" {
697 http.Error(w, "did parameter required", http.StatusBadRequest)
698 return
699 }
700
701 ctx := r.Context()
702
703 // Search through bundles for this DID
704 var operations []plcbundle.PLCOperation
705
706 index := api.mgr.GetIndex()
707 bundles := index.GetBundles()
708
709 for _, meta := range bundles {
710 bundle, err := api.mgr.Load(ctx, meta.BundleNumber)
711 if err != nil {
712 continue
713 }
714
715 for _, op := range bundle.Operations {
716 if op.DID == did {
717 operations = append(operations, op)
718 }
719 }
720 }
721
722 w.Header().Set("Content-Type", "application/json")
723 json.NewEncoder(w).Encode(map[string]interface{}{
724 "did": did,
725 "operations": operations,
726 "count": len(operations),
727 })
728}
729
730func main() {
731 api, err := NewAPI("./plc_data")
732 if err != nil {
733 log.Fatal(err)
734 }
735
736 http.HandleFunc("/stats", api.handleStats)
737 http.HandleFunc("/operations", api.handleOperations)
738 http.HandleFunc("/did", api.handleDID)
739
740 log.Println("API listening on :8080")
741 log.Fatal(http.ListenAndServe(":8080", nil))
742}
743```
744
745**Usage:**
746```bash
747# Get stats
748curl http://localhost:8080/stats
749
750# Get operations from bundle 1
751curl http://localhost:8080/operations?bundle=1
752
753# Get all operations for a DID
754curl http://localhost:8080/did?did=did:plc:example123
755```
756
757---
758
759## Building Applications
760
761### Application 1: PDS Discovery Tool
762
763Find all PDS endpoints in the network:
764
765```go
766package main
767
768import (
769 "context"
770 "fmt"
771 "log"
772
773 plcbundle "tangled.org/atscan.net/plcbundle"
774)
775
776type PDSTracker struct {
777 mgr *plcbundle.Manager
778 endpoints map[string]int // endpoint -> count
779}
780
781func NewPDSTracker(bundleDir string) (*PDSTracker, error) {
782 mgr, err := plcbundle.New(bundleDir, "")
783 if err != nil {
784 return nil, err
785 }
786
787 return &PDSTracker{
788 mgr: mgr,
789 endpoints: make(map[string]int),
790 }, nil
791}
792
793func (pt *PDSTracker) Scan() error {
794 ctx := context.Background()
795
796 index := pt.mgr.GetIndex()
797 bundles := index.GetBundles()
798
799 log.Printf("Scanning %d bundles for PDS endpoints...", len(bundles))
800
801 for _, meta := range bundles {
802 bundle, err := pt.mgr.Load(ctx, meta.BundleNumber)
803 if err != nil {
804 return err
805 }
806
807 for _, op := range bundle.Operations {
808 if endpoint := pt.extractPDS(op); endpoint != "" {
809 pt.endpoints[endpoint]++
810 }
811 }
812 }
813
814 return nil
815}
816
817func (pt *PDSTracker) extractPDS(op plcbundle.PLCOperation) string {
818 services, ok := op.Operation["services"].(map[string]interface{})
819 if !ok {
820 return ""
821 }
822
823 pds, ok := services["atproto_pds"].(map[string]interface{})
824 if !ok {
825 return ""
826 }
827
828 endpoint, ok := pds["endpoint"].(string)
829 if !ok {
830 return ""
831 }
832
833 return endpoint
834}
835
836func (pt *PDSTracker) PrintResults() {
837 log.Printf("\nFound %d unique PDS endpoints:\n", len(pt.endpoints))
838
839 // Sort by count
840 type endpointCount struct {
841 endpoint string
842 count int
843 }
844
845 var sorted []endpointCount
846 for endpoint, count := range pt.endpoints {
847 sorted = append(sorted, endpointCount{endpoint, count})
848 }
849
850 sort.Slice(sorted, func(i, j int) bool {
851 return sorted[i].count > sorted[j].count
852 })
853
854 // Print top 20
855 for i, ec := range sorted {
856 if i >= 20 {
857 break
858 }
859 fmt.Printf("%3d. %s (%d DIDs)\n", i+1, ec.endpoint, ec.count)
860 }
861}
862
863func main() {
864 tracker, err := NewPDSTracker("./plc_data")
865 if err != nil {
866 log.Fatal(err)
867 }
868
869 if err := tracker.Scan(); err != nil {
870 log.Fatal(err)
871 }
872
873 tracker.PrintResults()
874}
875```
876
877### Application 2: DID History Viewer
878
879View the complete history of a DID:
880
881```go
882package main
883
884import (
885 "context"
886 "encoding/json"
887 "fmt"
888 "log"
889 "os"
890
891 plcbundle "tangled.org/atscan.net/plcbundle"
892)
893
894type DIDHistory struct {
895 DID string `json:"did"`
896 Operations []plcbundle.PLCOperation `json:"operations"`
897 FirstSeen time.Time `json:"first_seen"`
898 LastSeen time.Time `json:"last_seen"`
899 OpCount int `json:"operation_count"`
900}
901
902func getDIDHistory(bundleDir, did string) (*DIDHistory, error) {
903 mgr, err := plcbundle.New(bundleDir, "")
904 if err != nil {
905 return nil, err
906 }
907 defer mgr.Close()
908
909 ctx := context.Background()
910
911 history := &DIDHistory{
912 DID: did,
913 Operations: make([]plcbundle.PLCOperation, 0),
914 }
915
916 index := mgr.GetIndex()
917 bundles := index.GetBundles()
918
919 log.Printf("Searching for DID %s...", did)
920
921 for _, meta := range bundles {
922 bundle, err := mgr.Load(ctx, meta.BundleNumber)
923 if err != nil {
924 continue
925 }
926
927 for _, op := range bundle.Operations {
928 if op.DID == did {
929 history.Operations = append(history.Operations, op)
930 }
931 }
932 }
933
934 if len(history.Operations) == 0 {
935 return nil, fmt.Errorf("DID not found")
936 }
937
938 // Set timestamps
939 history.FirstSeen = history.Operations[0].CreatedAt
940 history.LastSeen = history.Operations[len(history.Operations)-1].CreatedAt
941 history.OpCount = len(history.Operations)
942
943 return history, nil
944}
945
946func main() {
947 if len(os.Args) < 2 {
948 log.Fatal("Usage: did-history <did>")
949 }
950
951 did := os.Args[1]
952
953 history, err := getDIDHistory("./plc_data", did)
954 if err != nil {
955 log.Fatal(err)
956 }
957
958 // Print as JSON
959 encoder := json.NewEncoder(os.Stdout)
960 encoder.SetIndent("", " ")
961 encoder.Encode(history)
962}
963```
964
965### Application 3: Real-time Monitor
966
967Monitor new operations as they arrive:
968
969```go
970package main
971
972import (
973 "context"
974 "log"
975 "time"
976
977 plcbundle "tangled.org/atscan.net/plcbundle"
978)
979
980type Monitor struct {
981 mgr *plcbundle.Manager
982 lastSeen int // Last bundle number processed
983 pollInterval time.Duration
984}
985
986func NewMonitor(bundleDir string, pollInterval time.Duration) (*Monitor, error) {
987 mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
988 if err != nil {
989 return nil, err
990 }
991
992 // Get current position
993 index := mgr.GetIndex()
994 lastBundle := index.GetLastBundle()
995 lastSeen := 0
996 if lastBundle != nil {
997 lastSeen = lastBundle.BundleNumber
998 }
999
1000 return &Monitor{
1001 mgr: mgr,
1002 lastSeen: lastSeen,
1003 pollInterval: pollInterval,
1004 }, nil
1005}
1006
1007func (m *Monitor) Start() {
1008 log.Println("Monitor started, watching for new bundles...")
1009
1010 ticker := time.NewTicker(m.pollInterval)
1011 defer ticker.Stop()
1012
1013 for range ticker.C {
1014 m.check()
1015 }
1016}
1017
1018func (m *Monitor) check() {
1019 ctx := context.Background()
1020
1021 // Try to fetch next bundle
1022 bundle, err := m.mgr.FetchNext(ctx)
1023 if err != nil {
1024 // Not an error if no new bundle available
1025 return
1026 }
1027
1028 // New bundle!
1029 log.Printf("🔔 New bundle: %d", bundle.BundleNumber)
1030 log.Printf(" Operations: %d", len(bundle.Operations))
1031 log.Printf(" DIDs: %d", bundle.DIDCount)
1032 log.Printf(" Time: %s", bundle.EndTime.Format("2006-01-02 15:04:05"))
1033
1034 // Process new operations
1035 m.processNewOperations(bundle)
1036
1037 m.lastSeen = bundle.BundleNumber
1038}
1039
1040func (m *Monitor) processNewOperations(bundle *plcbundle.Bundle) {
1041 for _, op := range bundle.Operations {
1042 // Check for interesting operations
1043 if op.IsNullified() {
1044 log.Printf(" ⚠️ Nullified: %s", op.DID)
1045 }
1046
1047 // Check for new DIDs (operation type "create")
1048 if opType, ok := op.Operation["type"].(string); ok && opType == "create" {
1049 log.Printf(" ➕ New DID: %s", op.DID)
1050 }
1051 }
1052}
1053
1054func main() {
1055 monitor, err := NewMonitor("./plc_data", 30*time.Second)
1056 if err != nil {
1057 log.Fatal(err)
1058 }
1059
1060 monitor.Start()
1061}
1062```
1063
1064---
1065
1066## Advanced Usage
1067
1068### Custom Configuration
1069
1070Full control over bundle manager behavior:
1071
1072```go
1073package main
1074
1075import (
1076 "log"
1077 "runtime"
1078 "time"
1079
1080 "tangled.org/atscan.net/plcbundle/bundle"
1081 "tangled.org/atscan.net/plcbundle/plc"
1082 plcbundle "tangled.org/atscan.net/plcbundle"
1083)
1084
1085func main() {
1086 // Custom configuration
1087 config := &bundle.Config{
1088 BundleDir: "./my_bundles",
1089 VerifyOnLoad: true, // Verify hashes when loading
1090 AutoRebuild: true, // Auto-rebuild index if needed
1091 RebuildWorkers: runtime.NumCPU(), // Parallel workers for rebuild
1092 Logger: &MyCustomLogger{}, // Custom logger
1093
1094 // Progress callback for rebuild
1095 RebuildProgress: func(current, total int) {
1096 if current%100 == 0 {
1097 log.Printf("Rebuild: %d/%d (%.1f%%)",
1098 current, total, float64(current)/float64(total)*100)
1099 }
1100 },
1101 }
1102
1103 // Custom PLC client with rate limiting
1104 plcClient := plc.NewClient("https://plc.directory",
1105 plc.WithRateLimit(60, time.Minute), // 60 req/min
1106 plc.WithTimeout(30*time.Second), // 30s timeout
1107 plc.WithLogger(&MyCustomLogger{}), // Custom logger
1108 )
1109
1110 // Create manager
1111 mgr, err := bundle.NewManager(config, plcClient)
1112 if err != nil {
1113 log.Fatal(err)
1114 }
1115 defer mgr.Close()
1116
1117 log.Println("Manager created with custom configuration")
1118}
1119
1120// Custom logger implementation
1121type MyCustomLogger struct{}
1122
1123func (l *MyCustomLogger) Printf(format string, v ...interface{}) {
1124 // Add custom formatting, filtering, etc.
1125 log.Printf("[PLCBUNDLE] "+format, v...)
1126}
1127
1128func (l *MyCustomLogger) Println(v ...interface{}) {
1129 log.Println(append([]interface{}{"[PLCBUNDLE]"}, v...)...)
1130}
1131```
1132
1133### Streaming Data
1134
1135Stream bundle data without loading everything into memory:
1136
1137```go
1138package main
1139
1140import (
1141 "bufio"
1142 "context"
1143 "encoding/json"
1144 "io"
1145 "log"
1146
1147 plcbundle "tangled.org/atscan.net/plcbundle"
1148)
1149
1150func streamBundle(mgr *plcbundle.Manager, bundleNumber int) error {
1151 ctx := context.Background()
1152
1153 // Get decompressed stream
1154 reader, err := mgr.StreamDecompressed(ctx, bundleNumber)
1155 if err != nil {
1156 return err
1157 }
1158 defer reader.Close()
1159
1160 // Read line by line (JSONL)
1161 scanner := bufio.NewScanner(reader)
1162
1163 // Set buffer size for large lines
1164 buf := make([]byte, 0, 64*1024)
1165 scanner.Buffer(buf, 1024*1024)
1166
1167 lineNum := 0
1168 for scanner.Scan() {
1169 lineNum++
1170
1171 var op plcbundle.PLCOperation
1172 if err := json.Unmarshal(scanner.Bytes(), &op); err != nil {
1173 log.Printf("Warning: failed to parse line %d: %v", lineNum, err)
1174 continue
1175 }
1176
1177 // Process operation without storing all in memory
1178 processOperation(op)
1179 }
1180
1181 return scanner.Err()
1182}
1183
1184func processOperation(op plcbundle.PLCOperation) {
1185 // Your processing logic
1186 log.Printf("Processing: %s", op.DID)
1187}
1188
1189func main() {
1190 mgr, err := plcbundle.New("./plc_data", "")
1191 if err != nil {
1192 log.Fatal(err)
1193 }
1194 defer mgr.Close()
1195
1196 // Stream bundle 1
1197 if err := streamBundle(mgr, 1); err != nil {
1198 log.Fatal(err)
1199 }
1200}
1201```
1202
1203### Parallel Processing
1204
1205Process multiple bundles concurrently:
1206
1207```go
1208package main
1209
1210import (
1211 "context"
1212 "log"
1213 "sync"
1214
1215 plcbundle "tangled.org/atscan.net/plcbundle"
1216)
1217
1218func processParallel(mgr *plcbundle.Manager, workers int) error {
1219 ctx := context.Background()
1220
1221 index := mgr.GetIndex()
1222 bundles := index.GetBundles()
1223
1224 // Create job channel
1225 jobs := make(chan int, len(bundles))
1226 results := make(chan error, len(bundles))
1227
1228 // Start workers
1229 var wg sync.WaitGroup
1230 for w := 0; w < workers; w++ {
1231 wg.Add(1)
1232 go func() {
1233 defer wg.Done()
1234 for bundleNum := range jobs {
1235 if err := processBundle(ctx, mgr, bundleNum); err != nil {
1236 results <- err
1237 } else {
1238 results <- nil
1239 }
1240 }
1241 }()
1242 }
1243
1244 // Send jobs
1245 for _, meta := range bundles {
1246 jobs <- meta.BundleNumber
1247 }
1248 close(jobs)
1249
1250 // Wait for completion
1251 go func() {
1252 wg.Wait()
1253 close(results)
1254 }()
1255
1256 // Collect results
1257 errors := 0
1258 for err := range results {
1259 if err != nil {
1260 log.Printf("Error: %v", err)
1261 errors++
1262 }
1263 }
1264
1265 if errors > 0 {
1266 return fmt.Errorf("%d bundles failed processing", errors)
1267 }
1268
1269 return nil
1270}
1271
1272func processBundle(ctx context.Context, mgr *plcbundle.Manager, bundleNum int) error {
1273 bundle, err := mgr.Load(ctx, bundleNum)
1274 if err != nil {
1275 return err
1276 }
1277
1278 // Process operations
1279 for _, op := range bundle.Operations {
1280 // Your logic here
1281 _ = op
1282 }
1283
1284 log.Printf("Processed bundle %d", bundleNum)
1285 return nil
1286}
1287
1288func main() {
1289 mgr, err := plcbundle.New("./plc_data", "")
1290 if err != nil {
1291 log.Fatal(err)
1292 }
1293 defer mgr.Close()
1294
1295 // Process with 8 workers
1296 if err := processParallel(mgr, 8); err != nil {
1297 log.Fatal(err)
1298 }
1299}
1300```
1301
1302### Working with Mempool
1303
1304Access operations before they're bundled:
1305
1306```go
1307package main
1308
1309import (
1310 "log"
1311
1312 plcbundle "tangled.org/atscan.net/plcbundle"
1313)
1314
1315func main() {
1316 mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
1317 if err != nil {
1318 log.Fatal(err)
1319 }
1320 defer mgr.Close()
1321
1322 // Get mempool stats
1323 stats := mgr.GetMempoolStats()
1324
1325 count := stats["count"].(int)
1326 targetBundle := stats["target_bundle"].(int)
1327 canCreate := stats["can_create_bundle"].(bool)
1328
1329 log.Printf("Mempool status:")
1330 log.Printf(" Target bundle: %d", targetBundle)
1331 log.Printf(" Operations: %d/%d", count, plcbundle.BUNDLE_SIZE)
1332 log.Printf(" Ready: %v", canCreate)
1333
1334 if count > 0 {
1335 // Get mempool operations
1336 ops, err := mgr.GetMempoolOperations()
1337 if err != nil {
1338 log.Fatal(err)
1339 }
1340
1341 log.Printf("Latest unbundled operations:")
1342 for i, op := range ops {
1343 if i >= 5 {
1344 break
1345 }
1346 log.Printf(" %d. %s (%s)", i+1, op.DID, op.CreatedAt.Format("15:04:05"))
1347 }
1348 }
1349
1350 // Validate chronological order
1351 if err := mgr.ValidateMempool(); err != nil {
1352 log.Printf("⚠️ Mempool validation failed: %v", err)
1353 } else {
1354 log.Println("✓ Mempool validated")
1355 }
1356}
1357```
1358
1359---
1360
1361## Best Practices
1362
1363### 1. Always Close the Manager
1364
1365Use `defer` to ensure cleanup:
1366
1367```go
1368mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
1369if err != nil {
1370 return err
1371}
1372defer mgr.Close() // Always close!
1373```
1374
1375### 2. Handle Context Cancellation
1376
1377Support graceful shutdown:
1378
1379```go
1380ctx, cancel := context.WithCancel(context.Background())
1381defer cancel()
1382
1383// Listen for interrupt
1384sigChan := make(chan os.Signal, 1)
1385signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
1386
1387go func() {
1388 <-sigChan
1389 log.Println("Interrupt received, stopping...")
1390 cancel()
1391}()
1392
1393// Use context in operations
1394bundle, err := mgr.FetchNext(ctx)
1395if err == context.Canceled {
1396 log.Println("Operation cancelled gracefully")
1397 return nil
1398}
1399```
1400
1401### 3. Check Errors Properly
1402
1403Distinguish between different error types:
1404
1405```go
1406bundle, err := mgr.FetchNext(ctx)
1407if err != nil {
1408 // Check if it's just "caught up"
1409 if strings.Contains(err.Error(), "insufficient operations") {
1410 log.Println("No new bundles available (caught up)")
1411 return nil
1412 }
1413
1414 // Real error
1415 return fmt.Errorf("fetch failed: %w", err)
1416}
1417```
1418
1419### 4. Use Streaming for Large Datasets
1420
1421Don't load everything into memory:
1422
1423```go
1424// ❌ Bad: Loads all operations into memory
1425index := mgr.GetIndex()
1426var allOps []plcbundle.PLCOperation
1427for _, meta := range index.GetBundles() {
1428 bundle, _ := mgr.Load(ctx, meta.BundleNumber)
1429 allOps = append(allOps, bundle.Operations...)
1430}
1431
1432// ✅ Good: Process one bundle at a time
1433for _, meta := range index.GetBundles() {
1434 bundle, _ := mgr.Load(ctx, meta.BundleNumber)
1435 for _, op := range bundle.Operations {
1436 processOperation(op)
1437 }
1438}
1439```
1440
1441### 5. Enable Verification in Production
1442
1443```go
1444config := plcbundle.DefaultConfig("./plc_data")
1445config.VerifyOnLoad = true // Verify hashes when loading
1446
1447mgr, err := plcbundle.NewManager(config, plcClient)
1448```
1449
1450### 6. Log Appropriately
1451
1452Implement custom logger for production:
1453
1454```go
1455type ProductionLogger struct {
1456 logger *zap.Logger
1457}
1458
1459func (l *ProductionLogger) Printf(format string, v ...interface{}) {
1460 l.logger.Sugar().Infof(format, v...)
1461}
1462
1463func (l *ProductionLogger) Println(v ...interface{}) {
1464 l.logger.Sugar().Info(v...)
1465}
1466```
1467
1468### 7. Handle Rate Limits
1469
1470Configure PLC client appropriately:
1471
1472```go
1473// Production: Be conservative
1474plcClient := plc.NewClient("https://plc.directory",
1475 plc.WithRateLimit(60, time.Minute), // 60 req/min max
1476 plc.WithTimeout(60*time.Second),
1477)
1478
1479// Development: Can be more aggressive (but respectful)
1480plcClient := plc.NewClient("https://plc.directory",
1481 plc.WithRateLimit(90, time.Minute),
1482 plc.WithTimeout(30*time.Second),
1483)
1484```
1485
1486---
1487
1488## API Reference
1489
1490### Manager Methods
1491
1492```go
1493// Creation
1494New(bundleDir, plcURL string) (*Manager, error)
1495NewManager(config *Config, plcClient *PLCClient) (*Manager, error)
1496
1497// Lifecycle
1498Close()
1499
1500// Fetching
1501FetchNext(ctx) (*Bundle, error)
1502
1503// Loading
1504Load(ctx, bundleNumber int) (*Bundle, error)
1505
1506// Verification
1507Verify(ctx, bundleNumber int) (*VerificationResult, error)
1508VerifyChain(ctx) (*ChainVerificationResult, error)
1509
1510// Exporting
1511Export(ctx, afterTime time.Time, count int) ([]PLCOperation, error)
1512
1513// Streaming
1514StreamRaw(ctx, bundleNumber int) (io.ReadCloser, error)
1515StreamDecompressed(ctx, bundleNumber int) (io.ReadCloser, error)
1516
1517// Index
1518GetIndex() *Index
1519ScanBundle(path string, bundleNumber int) (*BundleMetadata, error)
1520Scan() (*DirectoryScanResult, error)
1521
1522// Mempool
1523GetMempoolStats() map[string]interface{}
1524GetMempoolOperations() ([]PLCOperation, error)
1525ValidateMempool() error
1526ClearMempool() error
1527
1528// Info
1529GetInfo() map[string]interface{}
1530IsBundleIndexed(bundleNumber int) bool
1531```
1532
1533### Index Methods
1534
1535```go
1536// Creation
1537NewIndex() *Index
1538LoadIndex(path string) (*Index, error)
1539
1540// Persistence
1541Save(path string) error
1542
1543// Queries
1544GetBundle(bundleNumber int) (*BundleMetadata, error)
1545GetLastBundle() *BundleMetadata
1546GetBundles() []*BundleMetadata
1547GetBundleRange(start, end int) []*BundleMetadata
1548
1549// Stats
1550Count() int
1551FindGaps() []int
1552GetStats() map[string]interface{}
1553```
1554
1555### Configuration Types
1556
1557```go
1558type Config struct {
1559 BundleDir string
1560 VerifyOnLoad bool
1561 AutoRebuild bool
1562 RebuildWorkers int
1563 RebuildProgress func(current, total int)
1564 Logger Logger
1565}
1566
1567type Logger interface {
1568 Printf(format string, v ...interface{})
1569 Println(v ...interface{})
1570}
1571```
1572
1573---
1574
1575## Troubleshooting
1576
1577### Bundle Not Found Error
1578
1579```go
1580bundle, err := mgr.Load(ctx, 999)
1581if err != nil {
1582 if strings.Contains(err.Error(), "not in index") {
1583 // Bundle doesn't exist
1584 log.Printf("Bundle 999 hasn't been fetched yet")
1585 }
1586}
1587```
1588
1589### Insufficient Operations Error
1590
1591```go
1592bundle, err := mgr.FetchNext(ctx)
1593if err != nil {
1594 if strings.Contains(err.Error(), "insufficient operations") {
1595 // Not enough operations for a complete bundle
1596 // Check mempool
1597 stats := mgr.GetMempoolStats()
1598 count := stats["count"].(int)
1599 log.Printf("Only %d operations available (need %d)", count, plcbundle.BUNDLE_SIZE)
1600 }
1601}
1602```
1603
1604### Memory Usage
1605
1606If processing large numbers of bundles:
1607
1608```go
1609// Force garbage collection between bundles
1610for _, meta := range index.GetBundles() {
1611 bundle, _ := mgr.Load(ctx, meta.BundleNumber)
1612 processBundle(bundle)
1613
1614 runtime.GC() // Help garbage collector
1615}
1616```
1617
1618---
1619
1620## Examples Repository
1621
1622Find complete, runnable examples at:
1623- https://github.com/plcbundle/examples
1624
1625Including:
1626- Complete sync service
1627- API server
1628- Analysis tools
1629- Monitoring services
1630