Library Guide#
A practical guide to using plcbundle as a Go library in your applications.
Table of Contents#
- Getting Started
- Core Concepts
- Common Patterns
- Building Applications
- Advanced Usage
- Best Practices
- API Reference
Getting Started#
Installation#
go get tangled.org/atscan.net/plcbundle
Your First Program#
Create a simple program to fetch and display bundle information:
package main
import (
"context"
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func main() {
// Create a manager
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Get repository info
info := mgr.GetInfo()
log.Printf("Bundle directory: %s", info["bundle_dir"])
// Get index stats
index := mgr.GetIndex()
stats := index.GetStats()
log.Printf("Total bundles: %d", stats["bundle_count"])
}
Run it:
go run main.go
# 2025/01/15 10:30:00 Bundle directory: ./plc_data
# 2025/01/15 10:30:00 Total bundles: 0
Fetching Your First Bundle#
Let's fetch a bundle from the PLC directory:
package main
import (
"context"
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func main() {
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
ctx := context.Background()
// Fetch next bundle
log.Println("Fetching bundle...")
bundle, err := mgr.FetchNext(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("✓ Fetched bundle %d", bundle.BundleNumber)
log.Printf(" Operations: %d", len(bundle.Operations))
log.Printf(" Unique DIDs: %d", bundle.DIDCount)
log.Printf(" Time range: %s to %s",
bundle.StartTime.Format("2006-01-02"),
bundle.EndTime.Format("2006-01-02"))
}
What's happening here?
plcbundle.New()creates a manager that handles all bundle operationsFetchNext()automatically:- Fetches operations from PLC directory
- Creates a bundle when 10,000 operations are collected
- Saves the bundle to disk
- Updates the index
- Returns the bundle object
Reading Bundles#
Once you have bundles, you can load and read them:
package main
import (
"context"
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func main() {
mgr, err := plcbundle.New("./plc_data", "")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
ctx := context.Background()
// Load bundle 1
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
log.Printf("Bundle %d loaded", bundle.BundleNumber)
// Iterate through operations
for i, op := range bundle.Operations {
if i >= 5 {
break // Just show first 5
}
log.Printf("%d. DID: %s, CID: %s", i+1, op.DID, op.CID)
}
}
Core Concepts#
The Manager#
The Manager is your main entry point. It handles:
- Bundle storage and retrieval
- Index management
- PLC directory synchronization
- Verification
- Mempool management
Creating a manager:
// Simple creation
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
// Custom configuration
config := plcbundle.DefaultConfig("./bundles")
config.VerifyOnLoad = true
config.AutoRebuild = true
plcClient := plcbundle.NewPLCClient("https://plc.directory")
mgr, err := plcbundle.NewManager(config, plcClient)
Bundles#
A bundle contains exactly 10,000 operations:
type Bundle struct {
BundleNumber int // Sequential number (1, 2, 3...)
StartTime time.Time // First operation timestamp
EndTime time.Time // Last operation timestamp
Operations []plc.PLCOperation // The 10,000 operations
DIDCount int // Unique DIDs in bundle
Hash string // Chain hash (includes history)
ContentHash string // This bundle's content hash
Parent string // Previous bundle's chain hash
CompressedSize int64 // File size on disk
UncompressedSize int64 // Original JSONL size
}
The Index#
The index tracks all bundles and their metadata:
index := mgr.GetIndex()
// Get all bundles
bundles := index.GetBundles()
for _, meta := range bundles {
log.Printf("Bundle %d: %s to %s",
meta.BundleNumber,
meta.StartTime.Format("2006-01-02"),
meta.EndTime.Format("2006-01-02"))
}
// Get specific bundle metadata
meta, err := index.GetBundle(42)
// Get last bundle
lastBundle := index.GetLastBundle()
Operations#
Each operation represents a DID PLC directory event:
type PLCOperation struct {
DID string // The DID (did:plc:...)
Operation map[string]interface{} // The operation data
CID string // Content identifier
Nullified interface{} // nil, false, or CID string
CreatedAt time.Time // When it was created
RawJSON []byte // Original JSON bytes
}
// Check if operation was nullified
if op.IsNullified() {
log.Printf("Operation %s was nullified by %s", op.CID, op.GetNullifyingCID())
}
Common Patterns#
Pattern 1: Transparent Sync Service#
Goal: Keep a local PLC mirror continuously synchronized.
This is the most common use case - maintaining an up-to-date copy of the PLC directory.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type SyncService struct {
mgr *plcbundle.Manager
interval time.Duration
stop chan struct{}
}
func NewSyncService(bundleDir string, interval time.Duration) (*SyncService, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
return &SyncService{
mgr: mgr,
interval: interval,
stop: make(chan struct{}),
}, nil
}
func (s *SyncService) Start() {
log.Println("Starting sync service...")
// Initial sync
s.sync()
// Periodic sync
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.sync()
case <-s.stop:
log.Println("Sync service stopped")
return
}
}
}
func (s *SyncService) sync() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
log.Println("Checking for new bundles...")
fetched := 0
for {
bundle, err := s.mgr.FetchNext(ctx)
if err != nil {
if isInsufficientOps(err) {
if fetched > 0 {
log.Printf("✓ Synced %d new bundles", fetched)
} else {
log.Println("✓ Up to date")
}
return
}
log.Printf("Error: %v", err)
return
}
fetched++
log.Printf("✓ Fetched bundle %d (%d ops, %d DIDs)",
bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount)
}
}
func (s *SyncService) Stop() {
close(s.stop)
s.mgr.Close()
}
func isInsufficientOps(err error) bool {
return err != nil &&
(strings.Contains(err.Error(), "insufficient operations") ||
strings.Contains(err.Error(), "no more available"))
}
func main() {
service, err := NewSyncService("./plc_data", 5*time.Minute)
if err != nil {
log.Fatal(err)
}
// Start service in background
go service.Start()
// Wait for interrupt
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
service.Stop()
}
Usage:
go run main.go
# Starting sync service...
# Checking for new bundles...
# ✓ Fetched bundle 8548 (10000 ops, 8234 DIDs)
# ✓ Fetched bundle 8549 (10000 ops, 8156 DIDs)
# ✓ Up to date
# ... (repeats every 5 minutes)
Pattern 2: Reading and Processing Operations#
Goal: Process all historical operations for analysis.
package main
import (
"context"
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type OperationProcessor struct {
mgr *plcbundle.Manager
}
func NewOperationProcessor(bundleDir string) (*OperationProcessor, error) {
mgr, err := plcbundle.New(bundleDir, "")
if err != nil {
return nil, err
}
return &OperationProcessor{mgr: mgr}, nil
}
func (p *OperationProcessor) ProcessAll() error {
ctx := context.Background()
index := p.mgr.GetIndex()
bundles := index.GetBundles()
log.Printf("Processing %d bundles...", len(bundles))
totalOps := 0
uniqueDIDs := make(map[string]bool)
for _, meta := range bundles {
// Load bundle
bundle, err := p.mgr.Load(ctx, meta.BundleNumber)
if err != nil {
return err
}
// Process operations
for _, op := range bundle.Operations {
totalOps++
uniqueDIDs[op.DID] = true
// Your processing logic here
p.processOperation(op)
}
if meta.BundleNumber % 100 == 0 {
log.Printf("Processed bundle %d...", meta.BundleNumber)
}
}
log.Printf("✓ Processed %d operations from %d unique DIDs",
totalOps, len(uniqueDIDs))
return nil
}
func (p *OperationProcessor) processOperation(op plcbundle.PLCOperation) {
// Example: Extract PDS endpoints
if services, ok := op.Operation["services"].(map[string]interface{}); ok {
if pds, ok := services["atproto_pds"].(map[string]interface{}); ok {
if endpoint, ok := pds["endpoint"].(string); ok {
log.Printf("DID %s uses PDS: %s", op.DID, endpoint)
}
}
}
}
func main() {
processor, err := NewOperationProcessor("./plc_data")
if err != nil {
log.Fatal(err)
}
if err := processor.ProcessAll(); err != nil {
log.Fatal(err)
}
}
Pattern 3: Time-Based Queries#
Goal: Export operations from a specific time period.
package main
import (
"context"
"encoding/json"
"log"
"os"
"time"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func exportOperationsSince(bundleDir string, since time.Time, limit int) error {
mgr, err := plcbundle.New(bundleDir, "")
if err != nil {
return err
}
defer mgr.Close()
ctx := context.Background()
// Export operations after timestamp
ops, err := mgr.Export(ctx, since, limit)
if err != nil {
return err
}
log.Printf("Exporting %d operations...", len(ops))
// Write as JSONL to stdout
encoder := json.NewEncoder(os.Stdout)
for _, op := range ops {
if err := encoder.Encode(op); err != nil {
return err
}
}
return nil
}
func main() {
// Export operations from the last 7 days
since := time.Now().AddDate(0, 0, -7)
if err := exportOperationsSince("./plc_data", since, 50000); err != nil {
log.Fatal(err)
}
}
Output to file:
go run main.go > last_7_days.jsonl
Pattern 4: Verification Service#
Goal: Periodically verify bundle integrity.
package main
import (
"context"
"log"
"time"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type VerificationService struct {
mgr *plcbundle.Manager
interval time.Duration
}
func NewVerificationService(bundleDir string, interval time.Duration) (*VerificationService, error) {
mgr, err := plcbundle.New(bundleDir, "")
if err != nil {
return nil, err
}
return &VerificationService{
mgr: mgr,
interval: interval,
}, nil
}
func (v *VerificationService) Start() {
ticker := time.NewTicker(v.interval)
defer ticker.Stop()
// Verify immediately on start
v.verify()
for range ticker.C {
v.verify()
}
}
func (v *VerificationService) verify() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
log.Println("Starting chain verification...")
start := time.Now()
result, err := v.mgr.VerifyChain(ctx)
if err != nil {
log.Printf("❌ Verification error: %v", err)
return
}
elapsed := time.Since(start)
if result.Valid {
log.Printf("✅ Chain verified: %d bundles, took %s",
result.ChainLength, elapsed.Round(time.Second))
// Get head hash
index := v.mgr.GetIndex()
if last := index.GetLastBundle(); last != nil {
log.Printf(" Head hash: %s...", last.Hash[:16])
}
} else {
log.Printf("❌ Chain broken at bundle %d: %s",
result.BrokenAt, result.Error)
// Alert or take action
v.handleBrokenChain(result)
}
}
func (v *VerificationService) handleBrokenChain(result *plcbundle.ChainVerificationResult) {
// Send alert, trigger re-sync, etc.
log.Printf("⚠️ ALERT: Chain integrity compromised!")
// TODO: Implement your alerting logic
}
func main() {
service, err := NewVerificationService("./plc_data", 24*time.Hour)
if err != nil {
log.Fatal(err)
}
log.Println("Verification service started (daily checks)")
service.Start()
}
Pattern 5: Custom HTTP API#
Goal: Build a custom API on top of your bundle archive.
package main
import (
"encoding/json"
"log"
"net/http"
"strconv"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type API struct {
mgr *plcbundle.Manager
}
func NewAPI(bundleDir string) (*API, error) {
mgr, err := plcbundle.New(bundleDir, "")
if err != nil {
return nil, err
}
return &API{mgr: mgr}, nil
}
func (api *API) handleStats(w http.ResponseWriter, r *http.Request) {
index := api.mgr.GetIndex()
stats := index.GetStats()
response := map[string]interface{}{
"bundles": stats["bundle_count"],
"first": stats["first_bundle"],
"last": stats["last_bundle"],
"total_size": stats["total_size"],
"start_time": stats["start_time"],
"end_time": stats["end_time"],
"updated_at": stats["updated_at"],
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func (api *API) handleOperations(w http.ResponseWriter, r *http.Request) {
bundleNumStr := r.URL.Query().Get("bundle")
if bundleNumStr == "" {
http.Error(w, "bundle parameter required", http.StatusBadRequest)
return
}
bundleNum, err := strconv.Atoi(bundleNumStr)
if err != nil {
http.Error(w, "invalid bundle number", http.StatusBadRequest)
return
}
ctx := r.Context()
bundle, err := api.mgr.Load(ctx, bundleNum)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/x-ndjson")
encoder := json.NewEncoder(w)
for _, op := range bundle.Operations {
encoder.Encode(op)
}
}
func (api *API) handleDID(w http.ResponseWriter, r *http.Request) {
did := r.URL.Query().Get("did")
if did == "" {
http.Error(w, "did parameter required", http.StatusBadRequest)
return
}
ctx := r.Context()
// Search through bundles for this DID
var operations []plcbundle.PLCOperation
index := api.mgr.GetIndex()
bundles := index.GetBundles()
for _, meta := range bundles {
bundle, err := api.mgr.Load(ctx, meta.BundleNumber)
if err != nil {
continue
}
for _, op := range bundle.Operations {
if op.DID == did {
operations = append(operations, op)
}
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"did": did,
"operations": operations,
"count": len(operations),
})
}
func main() {
api, err := NewAPI("./plc_data")
if err != nil {
log.Fatal(err)
}
http.HandleFunc("/stats", api.handleStats)
http.HandleFunc("/operations", api.handleOperations)
http.HandleFunc("/did", api.handleDID)
log.Println("API listening on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Usage:
# Get stats
curl http://localhost:8080/stats
# Get operations from bundle 1
curl http://localhost:8080/operations?bundle=1
# Get all operations for a DID
curl http://localhost:8080/did?did=did:plc:example123
Building Applications#
Application 1: PDS Discovery Tool#
Find all PDS endpoints in the network:
package main
import (
"context"
"fmt"
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type PDSTracker struct {
mgr *plcbundle.Manager
endpoints map[string]int // endpoint -> count
}
func NewPDSTracker(bundleDir string) (*PDSTracker, error) {
mgr, err := plcbundle.New(bundleDir, "")
if err != nil {
return nil, err
}
return &PDSTracker{
mgr: mgr,
endpoints: make(map[string]int),
}, nil
}
func (pt *PDSTracker) Scan() error {
ctx := context.Background()
index := pt.mgr.GetIndex()
bundles := index.GetBundles()
log.Printf("Scanning %d bundles for PDS endpoints...", len(bundles))
for _, meta := range bundles {
bundle, err := pt.mgr.Load(ctx, meta.BundleNumber)
if err != nil {
return err
}
for _, op := range bundle.Operations {
if endpoint := pt.extractPDS(op); endpoint != "" {
pt.endpoints[endpoint]++
}
}
}
return nil
}
func (pt *PDSTracker) extractPDS(op plcbundle.PLCOperation) string {
services, ok := op.Operation["services"].(map[string]interface{})
if !ok {
return ""
}
pds, ok := services["atproto_pds"].(map[string]interface{})
if !ok {
return ""
}
endpoint, ok := pds["endpoint"].(string)
if !ok {
return ""
}
return endpoint
}
func (pt *PDSTracker) PrintResults() {
log.Printf("\nFound %d unique PDS endpoints:\n", len(pt.endpoints))
// Sort by count
type endpointCount struct {
endpoint string
count int
}
var sorted []endpointCount
for endpoint, count := range pt.endpoints {
sorted = append(sorted, endpointCount{endpoint, count})
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].count > sorted[j].count
})
// Print top 20
for i, ec := range sorted {
if i >= 20 {
break
}
fmt.Printf("%3d. %s (%d DIDs)\n", i+1, ec.endpoint, ec.count)
}
}
func main() {
tracker, err := NewPDSTracker("./plc_data")
if err != nil {
log.Fatal(err)
}
if err := tracker.Scan(); err != nil {
log.Fatal(err)
}
tracker.PrintResults()
}
Application 2: DID History Viewer#
View the complete history of a DID:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type DIDHistory struct {
DID string `json:"did"`
Operations []plcbundle.PLCOperation `json:"operations"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
OpCount int `json:"operation_count"`
}
func getDIDHistory(bundleDir, did string) (*DIDHistory, error) {
mgr, err := plcbundle.New(bundleDir, "")
if err != nil {
return nil, err
}
defer mgr.Close()
ctx := context.Background()
history := &DIDHistory{
DID: did,
Operations: make([]plcbundle.PLCOperation, 0),
}
index := mgr.GetIndex()
bundles := index.GetBundles()
log.Printf("Searching for DID %s...", did)
for _, meta := range bundles {
bundle, err := mgr.Load(ctx, meta.BundleNumber)
if err != nil {
continue
}
for _, op := range bundle.Operations {
if op.DID == did {
history.Operations = append(history.Operations, op)
}
}
}
if len(history.Operations) == 0 {
return nil, fmt.Errorf("DID not found")
}
// Set timestamps
history.FirstSeen = history.Operations[0].CreatedAt
history.LastSeen = history.Operations[len(history.Operations)-1].CreatedAt
history.OpCount = len(history.Operations)
return history, nil
}
func main() {
if len(os.Args) < 2 {
log.Fatal("Usage: did-history <did>")
}
did := os.Args[1]
history, err := getDIDHistory("./plc_data", did)
if err != nil {
log.Fatal(err)
}
// Print as JSON
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
encoder.Encode(history)
}
Application 3: Real-time Monitor#
Monitor new operations as they arrive:
package main
import (
"context"
"log"
"time"
plcbundle "tangled.org/atscan.net/plcbundle"
)
type Monitor struct {
mgr *plcbundle.Manager
lastSeen int // Last bundle number processed
pollInterval time.Duration
}
func NewMonitor(bundleDir string, pollInterval time.Duration) (*Monitor, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
// Get current position
index := mgr.GetIndex()
lastBundle := index.GetLastBundle()
lastSeen := 0
if lastBundle != nil {
lastSeen = lastBundle.BundleNumber
}
return &Monitor{
mgr: mgr,
lastSeen: lastSeen,
pollInterval: pollInterval,
}, nil
}
func (m *Monitor) Start() {
log.Println("Monitor started, watching for new bundles...")
ticker := time.NewTicker(m.pollInterval)
defer ticker.Stop()
for range ticker.C {
m.check()
}
}
func (m *Monitor) check() {
ctx := context.Background()
// Try to fetch next bundle
bundle, err := m.mgr.FetchNext(ctx)
if err != nil {
// Not an error if no new bundle available
return
}
// New bundle!
log.Printf("🔔 New bundle: %d", bundle.BundleNumber)
log.Printf(" Operations: %d", len(bundle.Operations))
log.Printf(" DIDs: %d", bundle.DIDCount)
log.Printf(" Time: %s", bundle.EndTime.Format("2006-01-02 15:04:05"))
// Process new operations
m.processNewOperations(bundle)
m.lastSeen = bundle.BundleNumber
}
func (m *Monitor) processNewOperations(bundle *plcbundle.Bundle) {
for _, op := range bundle.Operations {
// Check for interesting operations
if op.IsNullified() {
log.Printf(" ⚠️ Nullified: %s", op.DID)
}
// Check for new DIDs (operation type "create")
if opType, ok := op.Operation["type"].(string); ok && opType == "create" {
log.Printf(" ➕ New DID: %s", op.DID)
}
}
}
func main() {
monitor, err := NewMonitor("./plc_data", 30*time.Second)
if err != nil {
log.Fatal(err)
}
monitor.Start()
}
Advanced Usage#
Custom Configuration#
Full control over bundle manager behavior:
package main
import (
"log"
"runtime"
"time"
"tangled.org/atscan.net/plcbundle/bundle"
"tangled.org/atscan.net/plcbundle/plc"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func main() {
// Custom configuration
config := &bundle.Config{
BundleDir: "./my_bundles",
VerifyOnLoad: true, // Verify hashes when loading
AutoRebuild: true, // Auto-rebuild index if needed
RebuildWorkers: runtime.NumCPU(), // Parallel workers for rebuild
Logger: &MyCustomLogger{}, // Custom logger
// Progress callback for rebuild
RebuildProgress: func(current, total int) {
if current%100 == 0 {
log.Printf("Rebuild: %d/%d (%.1f%%)",
current, total, float64(current)/float64(total)*100)
}
},
}
// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
plc.WithRateLimit(60, time.Minute), // 60 req/min
plc.WithTimeout(30*time.Second), // 30s timeout
plc.WithLogger(&MyCustomLogger{}), // Custom logger
)
// Create manager
mgr, err := bundle.NewManager(config, plcClient)
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
log.Println("Manager created with custom configuration")
}
// Custom logger implementation
type MyCustomLogger struct{}
func (l *MyCustomLogger) Printf(format string, v ...interface{}) {
// Add custom formatting, filtering, etc.
log.Printf("[PLCBUNDLE] "+format, v...)
}
func (l *MyCustomLogger) Println(v ...interface{}) {
log.Println(append([]interface{}{"[PLCBUNDLE]"}, v...)...)
}
Streaming Data#
Stream bundle data without loading everything into memory:
package main
import (
"bufio"
"context"
"encoding/json"
"io"
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func streamBundle(mgr *plcbundle.Manager, bundleNumber int) error {
ctx := context.Background()
// Get decompressed stream
reader, err := mgr.StreamDecompressed(ctx, bundleNumber)
if err != nil {
return err
}
defer reader.Close()
// Read line by line (JSONL)
scanner := bufio.NewScanner(reader)
// Set buffer size for large lines
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
lineNum := 0
for scanner.Scan() {
lineNum++
var op plcbundle.PLCOperation
if err := json.Unmarshal(scanner.Bytes(), &op); err != nil {
log.Printf("Warning: failed to parse line %d: %v", lineNum, err)
continue
}
// Process operation without storing all in memory
processOperation(op)
}
return scanner.Err()
}
func processOperation(op plcbundle.PLCOperation) {
// Your processing logic
log.Printf("Processing: %s", op.DID)
}
func main() {
mgr, err := plcbundle.New("./plc_data", "")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Stream bundle 1
if err := streamBundle(mgr, 1); err != nil {
log.Fatal(err)
}
}
Parallel Processing#
Process multiple bundles concurrently:
package main
import (
"context"
"log"
"sync"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func processParallel(mgr *plcbundle.Manager, workers int) error {
ctx := context.Background()
index := mgr.GetIndex()
bundles := index.GetBundles()
// Create job channel
jobs := make(chan int, len(bundles))
results := make(chan error, len(bundles))
// Start workers
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for bundleNum := range jobs {
if err := processBundle(ctx, mgr, bundleNum); err != nil {
results <- err
} else {
results <- nil
}
}
}()
}
// Send jobs
for _, meta := range bundles {
jobs <- meta.BundleNumber
}
close(jobs)
// Wait for completion
go func() {
wg.Wait()
close(results)
}()
// Collect results
errors := 0
for err := range results {
if err != nil {
log.Printf("Error: %v", err)
errors++
}
}
if errors > 0 {
return fmt.Errorf("%d bundles failed processing", errors)
}
return nil
}
func processBundle(ctx context.Context, mgr *plcbundle.Manager, bundleNum int) error {
bundle, err := mgr.Load(ctx, bundleNum)
if err != nil {
return err
}
// Process operations
for _, op := range bundle.Operations {
// Your logic here
_ = op
}
log.Printf("Processed bundle %d", bundleNum)
return nil
}
func main() {
mgr, err := plcbundle.New("./plc_data", "")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Process with 8 workers
if err := processParallel(mgr, 8); err != nil {
log.Fatal(err)
}
}
Working with Mempool#
Access operations before they're bundled:
package main
import (
"log"
plcbundle "tangled.org/atscan.net/plcbundle"
)
func main() {
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Get mempool stats
stats := mgr.GetMempoolStats()
count := stats["count"].(int)
targetBundle := stats["target_bundle"].(int)
canCreate := stats["can_create_bundle"].(bool)
log.Printf("Mempool status:")
log.Printf(" Target bundle: %d", targetBundle)
log.Printf(" Operations: %d/%d", count, plcbundle.BUNDLE_SIZE)
log.Printf(" Ready: %v", canCreate)
if count > 0 {
// Get mempool operations
ops, err := mgr.GetMempoolOperations()
if err != nil {
log.Fatal(err)
}
log.Printf("Latest unbundled operations:")
for i, op := range ops {
if i >= 5 {
break
}
log.Printf(" %d. %s (%s)", i+1, op.DID, op.CreatedAt.Format("15:04:05"))
}
}
// Validate chronological order
if err := mgr.ValidateMempool(); err != nil {
log.Printf("⚠️ Mempool validation failed: %v", err)
} else {
log.Println("✓ Mempool validated")
}
}
Best Practices#
1. Always Close the Manager#
Use defer to ensure cleanup:
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
return err
}
defer mgr.Close() // Always close!
2. Handle Context Cancellation#
Support graceful shutdown:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Listen for interrupt
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Interrupt received, stopping...")
cancel()
}()
// Use context in operations
bundle, err := mgr.FetchNext(ctx)
if err == context.Canceled {
log.Println("Operation cancelled gracefully")
return nil
}
3. Check Errors Properly#
Distinguish between different error types:
bundle, err := mgr.FetchNext(ctx)
if err != nil {
// Check if it's just "caught up"
if strings.Contains(err.Error(), "insufficient operations") {
log.Println("No new bundles available (caught up)")
return nil
}
// Real error
return fmt.Errorf("fetch failed: %w", err)
}
4. Use Streaming for Large Datasets#
Don't load everything into memory:
// ❌ Bad: Loads all operations into memory
index := mgr.GetIndex()
var allOps []plcbundle.PLCOperation
for _, meta := range index.GetBundles() {
bundle, _ := mgr.Load(ctx, meta.BundleNumber)
allOps = append(allOps, bundle.Operations...)
}
// ✅ Good: Process one bundle at a time
for _, meta := range index.GetBundles() {
bundle, _ := mgr.Load(ctx, meta.BundleNumber)
for _, op := range bundle.Operations {
processOperation(op)
}
}
5. Enable Verification in Production#
config := plcbundle.DefaultConfig("./plc_data")
config.VerifyOnLoad = true // Verify hashes when loading
mgr, err := plcbundle.NewManager(config, plcClient)
6. Log Appropriately#
Implement custom logger for production:
type ProductionLogger struct {
logger *zap.Logger
}
func (l *ProductionLogger) Printf(format string, v ...interface{}) {
l.logger.Sugar().Infof(format, v...)
}
func (l *ProductionLogger) Println(v ...interface{}) {
l.logger.Sugar().Info(v...)
}
7. Handle Rate Limits#
Configure PLC client appropriately:
// Production: Be conservative
plcClient := plc.NewClient("https://plc.directory",
plc.WithRateLimit(60, time.Minute), // 60 req/min max
plc.WithTimeout(60*time.Second),
)
// Development: Can be more aggressive (but respectful)
plcClient := plc.NewClient("https://plc.directory",
plc.WithRateLimit(90, time.Minute),
plc.WithTimeout(30*time.Second),
)
API Reference#
Manager Methods#
// Creation
New(bundleDir, plcURL string) (*Manager, error)
NewManager(config *Config, plcClient *PLCClient) (*Manager, error)
// Lifecycle
Close()
// Fetching
FetchNext(ctx) (*Bundle, error)
// Loading
Load(ctx, bundleNumber int) (*Bundle, error)
// Verification
Verify(ctx, bundleNumber int) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)
// Exporting
Export(ctx, afterTime time.Time, count int) ([]PLCOperation, error)
// Streaming
StreamRaw(ctx, bundleNumber int) (io.ReadCloser, error)
StreamDecompressed(ctx, bundleNumber int) (io.ReadCloser, error)
// Index
GetIndex() *Index
ScanBundle(path string, bundleNumber int) (*BundleMetadata, error)
Scan() (*DirectoryScanResult, error)
// Mempool
GetMempoolStats() map[string]interface{}
GetMempoolOperations() ([]PLCOperation, error)
ValidateMempool() error
ClearMempool() error
// Info
GetInfo() map[string]interface{}
IsBundleIndexed(bundleNumber int) bool
Index Methods#
// Creation
NewIndex() *Index
LoadIndex(path string) (*Index, error)
// Persistence
Save(path string) error
// Queries
GetBundle(bundleNumber int) (*BundleMetadata, error)
GetLastBundle() *BundleMetadata
GetBundles() []*BundleMetadata
GetBundleRange(start, end int) []*BundleMetadata
// Stats
Count() int
FindGaps() []int
GetStats() map[string]interface{}
Configuration Types#
type Config struct {
BundleDir string
VerifyOnLoad bool
AutoRebuild bool
RebuildWorkers int
RebuildProgress func(current, total int)
Logger Logger
}
type Logger interface {
Printf(format string, v ...interface{})
Println(v ...interface{})
}
Troubleshooting#
Bundle Not Found Error#
bundle, err := mgr.Load(ctx, 999)
if err != nil {
if strings.Contains(err.Error(), "not in index") {
// Bundle doesn't exist
log.Printf("Bundle 999 hasn't been fetched yet")
}
}
Insufficient Operations Error#
bundle, err := mgr.FetchNext(ctx)
if err != nil {
if strings.Contains(err.Error(), "insufficient operations") {
// Not enough operations for a complete bundle
// Check mempool
stats := mgr.GetMempoolStats()
count := stats["count"].(int)
log.Printf("Only %d operations available (need %d)", count, plcbundle.BUNDLE_SIZE)
}
}
Memory Usage#
If processing large numbers of bundles:
// Force garbage collection between bundles
for _, meta := range index.GetBundles() {
bundle, _ := mgr.Load(ctx, meta.BundleNumber)
processBundle(bundle)
runtime.GC() // Help garbage collector
}
Examples Repository#
Find complete, runnable examples at:
Including:
- Complete sync service
- API server
- Analysis tools
- Monitoring services