PLC Bundle#
A Go library and CLI tool for managing DID PLC Directory bundles with transparent synchronization, compression, and verification.
Features#
- 📦 Bundle Management: Automatically organize PLC operations into compressed bundles (10,000 operations each)
- 🔄 Transparent Sync: Fetch and cache PLC operations with automatic deduplication
- 🗜️ Efficient Storage: Zstandard compression with configurable levels
- ✅ Integrity: SHA-256 hash verification and blockchain-like chain validation
- 🔍 Indexing: Fast bundle lookup and gap detection
- 📊 Export: Query operations by time range
Installation#
go get github.com/atscan/plcbundle
For the CLI tool:
go install github.com/atscan/plcbundle/cmd/plcbundle@latest
Quick Start (Library)#
package main
import (
"context"
"log"
"time"
plcbundle "github.com/atscan/plcbundle"
)
func main() {
// Create a bundle manager
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Fetch latest bundles
ctx := context.Background()
bundle, err := mgr.FetchNext(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Fetched bundle %d with %d operations",
bundle.BundleNumber, len(bundle.Operations))
}
Library Usage#
1. Basic Setup#
import (
"context"
plcbundle "github.com/atscan/plcbundle"
)
// Create manager with defaults
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
2. Custom Configuration#
import (
"github.com/atscan/plcbundle/bundle"
"github.com/atscan/plcbundle/plc"
)
// Custom config
config := &bundle.Config{
BundleDir: "./my_bundles",
CompressionLevel: bundle.CompressionBest,
VerifyOnLoad: true,
Logger: myCustomLogger,
}
// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
plc.WithRateLimit(60, time.Minute), // 60 req/min
plc.WithTimeout(30*time.Second),
)
mgr, err := bundle.NewManager(config, plcClient)
3. Transparent Synchronization (Main Use Case)#
This is the primary pattern for keeping your local PLC mirror up-to-date:
package main
import (
"context"
"log"
"time"
plcbundle "github.com/atscan/plcbundle"
)
type PLCSync struct {
mgr *plcbundle.BundleManager
ctx context.Context
cancel context.CancelFunc
}
func NewPLCSync(bundleDir string) (*PLCSync, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
sync := &PLCSync{
mgr: mgr,
ctx: ctx,
cancel: cancel,
}
return sync, nil
}
func (s *PLCSync) Start(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
log.Println("Starting PLC synchronization...")
for {
select {
case <-ticker.C:
if err := s.Update(); err != nil {
log.Printf("Update error: %v", err)
}
case <-s.ctx.Done():
return
}
}
}
func (s *PLCSync) Update() error {
log.Println("Checking for new bundles...")
for {
bundle, err := s.mgr.FetchNext(s.ctx)
if err != nil {
// Check if we're caught up
if isEndOfData(err) {
log.Println("✓ Up to date!")
return nil
}
return err
}
log.Printf("✓ Fetched bundle %06d (%d ops, %d DIDs)",
bundle.BundleNumber,
len(bundle.Operations),
bundle.DIDCount)
}
}
func (s *PLCSync) Stop() {
s.cancel()
s.mgr.Close()
}
func isEndOfData(err error) bool {
return err != nil &&
(strings.Contains(err.Error(), "insufficient operations") ||
strings.Contains(err.Error(), "caught up"))
}
// Usage
func main() {
sync, err := NewPLCSync("./plc_bundles")
if err != nil {
log.Fatal(err)
}
defer sync.Stop()
// Update every 5 minutes
sync.Start(5 * time.Minute)
}
4. Getting Bundles#
ctx := context.Background()
// Get all bundles
index := mgr.GetIndex()
bundles := index.GetBundles()
for _, meta := range bundles {
log.Printf("Bundle %06d: %d ops, %s to %s",
meta.BundleNumber,
meta.OperationCount,
meta.StartTime.Format(time.RFC3339),
meta.EndTime.Format(time.RFC3339))
}
// Load specific bundle
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
log.Printf("Loaded %d operations", len(bundle.Operations))
5. Getting Operations from Bundles#
// Load a bundle and iterate operations
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
for _, op := range bundle.Operations {
log.Printf("DID: %s, CID: %s, Time: %s",
op.DID,
op.CID,
op.CreatedAt.Format(time.RFC3339))
// Access operation data
if opType, ok := op.Operation["type"].(string); ok {
log.Printf(" Type: %s", opType)
}
}
6. Export Operations by Time Range#
// Export operations after a specific time
afterTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
operations, err := mgr.Export(ctx, afterTime, 5000)
if err != nil {
log.Fatal(err)
}
log.Printf("Exported %d operations", len(operations))
// Process operations
for _, op := range operations {
// Your processing logic
processOperation(op)
}
7. Periodic Update Pattern#
// Simple periodic updater
func runPeriodicUpdate(mgr *plcbundle.BundleManager, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
// Try to fetch next bundle
bundle, err := mgr.FetchNext(ctx)
if err != nil {
if strings.Contains(err.Error(), "insufficient operations") {
log.Println("Caught up!")
continue
}
log.Printf("Error: %v", err)
continue
}
log.Printf("New bundle %d: %d operations",
bundle.BundleNumber,
len(bundle.Operations))
// Process new operations
for _, op := range bundle.Operations {
handleOperation(op)
}
}
}
// Usage
go runPeriodicUpdate(mgr, 10*time.Minute)
8. Verify Integrity#
// Verify specific bundle
result, err := mgr.Verify(ctx, 1)
if err != nil {
log.Fatal(err)
}
if result.Valid {
log.Println("✓ Bundle is valid")
} else {
log.Printf("✗ Invalid: %v", result.Error)
}
// Verify entire chain
chainResult, err := mgr.VerifyChain(ctx)
if err != nil {
log.Fatal(err)
}
if chainResult.Valid {
log.Printf("✓ Chain verified: %d bundles", chainResult.ChainLength)
} else {
log.Printf("✗ Chain broken at bundle %d: %s",
chainResult.BrokenAt,
chainResult.Error)
}
9. Scan Directory (Re-index)#
// Scan directory and rebuild index from existing bundles
result, err := mgr.Scan()
if err != nil {
log.Fatal(err)
}
log.Printf("Scanned %d bundles", result.BundleCount)
if len(result.MissingGaps) > 0 {
log.Printf("Warning: Missing bundles: %v", result.MissingGaps)
}
10. Complete Example: Background Sync Service#
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
plcbundle "github.com/atscan/plcbundle"
)
type PLCService struct {
mgr *plcbundle.BundleManager
updateCh chan struct{}
stopCh chan struct{}
}
func NewPLCService(bundleDir string) (*PLCService, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
return &PLCService{
mgr: mgr,
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
}, nil
}
func (s *PLCService) Start() {
log.Println("Starting PLC service...")
// Initial scan
if _, err := s.mgr.Scan(); err != nil {
log.Printf("Scan warning: %v", err)
}
// Start update loop
go s.updateLoop()
// Periodic trigger
go s.periodicTrigger(5 * time.Minute)
}
func (s *PLCService) updateLoop() {
for {
select {
case <-s.updateCh:
s.fetchNewBundles()
case <-s.stopCh:
return
}
}
}
func (s *PLCService) periodicTrigger(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.TriggerUpdate()
case <-s.stopCh:
return
}
}
}
func (s *PLCService) TriggerUpdate() {
select {
case s.updateCh <- struct{}{}:
default:
// Update already in progress
}
}
func (s *PLCService) fetchNewBundles() {
ctx := context.Background()
fetched := 0
for {
bundle, err := s.mgr.FetchNext(ctx)
if err != nil {
if isEndOfData(err) {
if fetched > 0 {
log.Printf("✓ Fetched %d new bundles", fetched)
}
return
}
log.Printf("Fetch error: %v", err)
return
}
fetched++
log.Printf("Bundle %06d: %d operations",
bundle.BundleNumber,
len(bundle.Operations))
}
}
func (s *PLCService) GetBundles() []*plcbundle.BundleMetadata {
return s.mgr.GetIndex().GetBundles()
}
func (s *PLCService) GetOperations(bundleNum int) ([]plcbundle.PLCOperation, error) {
ctx := context.Background()
bundle, err := s.mgr.Load(ctx, bundleNum)
if err != nil {
return nil, err
}
return bundle.Operations, nil
}
func (s *PLCService) Stop() {
close(s.stopCh)
s.mgr.Close()
}
func main() {
service, err := NewPLCService("./plc_data")
if err != nil {
log.Fatal(err)
}
service.Start()
// Wait for interrupt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
service.Stop()
}
CLI Tool Usage#
Fetch bundles#
# Fetch next bundle
plcbundle fetch
# Fetch specific number of bundles
plcbundle fetch -count 10
# Fetch all available bundles
plcbundle fetch -count 0
Scan directory#
# Scan and rebuild index
plcbundle scan
Verify integrity#
# Verify specific bundle
plcbundle verify -bundle 1
# Verify entire chain
plcbundle verify
# Verbose output
plcbundle verify -v
Show information#
# General info
plcbundle info
# Specific bundle info
plcbundle info -bundle 1
Export operations#
# Export operations to stdout (JSONL)
plcbundle export -count 1000 > operations.jsonl
# Export after specific time
plcbundle export -after "2024-01-01T00:00:00Z" -count 5000
Backfill#
# Fetch all bundles and stream to stdout
plcbundle backfill > all_operations.jsonl
# Start from specific bundle
plcbundle backfill -start 100 -end 200
API Reference#
Types#
type BundleManager struct { ... }
type Bundle struct {
BundleNumber int
StartTime time.Time
EndTime time.Time
Operations []PLCOperation
DIDCount int
Hash string
// ...
}
type PLCOperation struct {
DID string
Operation map[string]interface{}
CID string
CreatedAt time.Time
RawJSON []byte
}
Methods#
// Create
New(bundleDir, plcURL string) (*BundleManager, error)
// Sync
FetchNext(ctx) (*Bundle, error)
Export(ctx, afterTime, count) ([]PLCOperation, error)
// Query
Load(ctx, bundleNumber) (*Bundle, error)
GetIndex() *Index
GetInfo() map[string]interface{}
// Verify
Verify(ctx, bundleNumber) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)
// Manage
Scan() (*DirectoryScanResult, error)
Close()
Configuration#
type Config struct {
BundleDir string // Storage directory
CompressionLevel CompressionLevel // Compression level
VerifyOnLoad bool // Verify hashes when loading
Logger Logger // Custom logger
}
// Compression levels
const (
CompressionFastest // Fastest compression
CompressionDefault // Balanced
CompressionBetter // Better compression (default)
CompressionBest // Best compression
)
Bundle Format#
- File naming:
NNNNNN.jsonl.zst(e.g.,000001.jsonl.zst) - Size: 10,000 operations per bundle
- Compression: Zstandard
- Index:
plc_bundles.json(metadata and chain info)
Best Practices#
- Regular Updates: Run periodic updates (5-10 minutes) to stay synchronized
- Error Handling: Handle "insufficient operations" error as "caught up" signal
- Verification: Periodically verify chain integrity
- Rate Limiting: Default 90 req/min is safe for PLC directory
- Storage: Plan ~1-2 GB per million operations (compressed)
License#
MIT
Contributing#
Contributions welcome! Please open an issue or PR.