PLC Bundle#
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⢸⣿⣿⢿⣾⣯⣑⢄⡂⠀⠄⠂⠀⠀⢀⠀⠀⠐⠀⠀⠀⠀⠀⠀⠀⠀⠄⠐⠀⠀⠀⠀⣄⠭⠂⠈⠜⣩⣿⢝⠃⠀⠁⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⣻⡟⠏⠀⠚⠈⠚⡉⡝⢶⣱⢤⣅⠈⠀⠄⠀⠀⠀⠀⠀⠠⠀⠀⡂⠐⣤⢕⡪⢼⣈⡹⡇⠏⠏⠋⠅⢃⣪⡏⡇⡍⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣻⡄⠀⠀⠀⢠⠌⠃⠐⠉⢡⠱⠧⠝⡯⣮⢶⣴⣤⡆⢐⣣⢅⣮⡟⠦⠍⠉⠀⠁⠐⠀⠀⠀⠄⠐⠡⣽⡸⣎⢁⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢈⡻⣧⠀⠁⠐⠀⠀⠀⠀⠀⠀⠊⠀⠕⢀⡉⠈⡫⠽⡿⡟⠿⠟⠁⠀⠀⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠬⠥⣋⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⡀⣾⡍⠕⡀⠀⠀⠀⠄⠠⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠥⣤⢌⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠄⢀⠀⢝⢞⣫⡆⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣽⡶⡄⠐⡀⠀⠀⠀⠀⠀⠀⢀⠀⠄⠀⠀⠀⠄⠁⠇⣷⡆⠀⠀⠀⢀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡸⢝⣮⠍⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⢀⠀⢾⣷⠀⠠⡀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠀⠁⡁⠀⠀⣾⡥⠖⠀⠀⠀⠂⠀⠀⠀⠀⠀⠁⠀⡀⠁⠀⠀⠻⢳⣻⢄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣞⡙⠨⣀⠠⠄⠀⠂⠀⠀⠀⠈⢀⠀⠀⠀⠀⠀⠤⢚⢢⣟⠀⠀⠀⠀⡐⠀⠀⡀⠀⠀⠀⠀⠁⠈⠌⠊⣯⣮⡏⠡⠂⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣻⡟⡄⡡⣄⠀⠠⠀⠀⡅⠀⠐⠀⡀⠀⡀⠀⠄⠈⠃⠳⠪⠤⠀⠀⠀⠀⡀⠀⠂⠀⠀⠀⠁⠈⢠⣠⠒⠻⣻⡧⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠪⡎⠠⢌⠑⡀⠂⠀⠄⠠⠀⠠⠀⠁⡀⠠⠠⡀⣀⠜⢏⡅⠀⠀⡀⠁⠀⠀⠁⠁⠐⠄⡀⢀⠂⠀⠄⢑⣿⣿⣿⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠼⣻⠧⣣⣀⠐⠨⠁⠕⢈⢀⢀⡁⠀⠈⠠⢀⠀⠐⠜⣽⡗⡤⠀⠂⠀⠠⠀⢂⠠⠀⠁⠄⠀⠔⠀⠑⣨⣿⢯⠋⡅⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⡚⣷⣭⠎⢃⡗⠄⡄⢀⠁⠀⠅⢀⢅⡀⠠⠀⢠⡀⡩⠷⢇⠀⡀⠄⡠⠤⠆⣀⡀⠄⠉⣠⠃⠴⠀⠈⢁⣿⡛⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠘⡬⡿⣿⡏⡻⡯⠌⢁⢛⠠⠓⠐⠐⠐⠌⠃⠋⠂⡢⢰⣈⢏⣰⠂⠈⠀⠠⠒⠡⠌⠫⠭⠩⠢⡬⠆⠿⢷⢿⡽⡧⠉⠊⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣷⣺⣗⣿⡶⡎⡅⣣⢎⠠⡅⣢⡖⠴⠬⡈⠂⡨⢡⠾⣣⣢⠀⠀⡹⠄⡄⠄⡇⣰⡖⡊⠔⢹⣄⣿⣭⣵⣿⢷⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠩⣿⣿⣲⣿⣷⣟⣼⠟⣬⢉⡠⣪⢜⣂⣁⠥⠓⠚⡁⢶⣷⣠⠂⡄⡢⣀⡐⠧⢆⣒⡲⡳⡫⢟⡃⢪⡧⣟⡟⣯⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢺⠟⢿⢟⢻⡗⡮⡿⣲⢷⣆⣏⣇⡧⣄⢖⠾⡷⣿⣤⢳⢷⣣⣦⡜⠗⣭⢂⠩⣹⢿⡲⢎⡧⣕⣖⣓⣽⡿⡖⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⚠️ Preview Version - Do Not Use in Production!
This project and plcbundle specification is currently unstable and under heavy development. Things can break at any time. Bundle hashes or data formats may change. Do not use this for production systems. Please wait for the
1.0release.
A Go library and CLI tool for managing DID PLC Directory bundles with transparent synchronization, compression, and verification.
Features#
- 📦 Bundle Management: Automatically organize PLC operations into compressed bundles (10,000 operations each)
- 🔄 Transparent Sync: Fetch and cache PLC operations with automatic deduplication
- 🗜️ Efficient Storage: Zstandard compression with configurable levels
- ✅ Integrity: SHA-256 hash verification and blockchain-like chain validation
- 🔍 Indexing: Fast bundle lookup and gap detection
- 📊 Export: Query operations by time range
Installation#
go get github.com/atscan/plcbundle
For the CLI tool:
go install github.com/atscan/plcbundle/cmd/plcbundle@latest
Quick Start (Library)#
package main
import (
"context"
"log"
"time"
plcbundle "github.com/atscan/plcbundle"
)
func main() {
// Create a bundle manager
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Fetch latest bundles
ctx := context.Background()
bundle, err := mgr.FetchNext(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Fetched bundle %d with %d operations",
bundle.BundleNumber, len(bundle.Operations))
}
Library Usage#
1. Basic Setup#
import (
"context"
plcbundle "github.com/atscan/plcbundle"
)
// Create manager with defaults
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
2. Custom Configuration#
import (
"github.com/atscan/plcbundle/bundle"
"github.com/atscan/plcbundle/plc"
)
// Custom config
config := &bundle.Config{
BundleDir: "./my_bundles",
CompressionLevel: bundle.CompressionBest,
VerifyOnLoad: true,
Logger: myCustomLogger,
}
// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
plc.WithRateLimit(60, time.Minute), // 60 req/min
plc.WithTimeout(30*time.Second),
)
mgr, err := bundle.NewManager(config, plcClient)
3. Transparent Synchronization (Main Use Case)#
This is the primary pattern for keeping your local PLC mirror up-to-date:
package main
import (
"context"
"log"
"time"
plcbundle "github.com/atscan/plcbundle"
)
type PLCSync struct {
mgr *plcbundle.BundleManager
ctx context.Context
cancel context.CancelFunc
}
func NewPLCSync(bundleDir string) (*PLCSync, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
sync := &PLCSync{
mgr: mgr,
ctx: ctx,
cancel: cancel,
}
return sync, nil
}
func (s *PLCSync) Start(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
log.Println("Starting PLC synchronization...")
for {
select {
case <-ticker.C:
if err := s.Update(); err != nil {
log.Printf("Update error: %v", err)
}
case <-s.ctx.Done():
return
}
}
}
func (s *PLCSync) Update() error {
log.Println("Checking for new bundles...")
for {
bundle, err := s.mgr.FetchNext(s.ctx)
if err != nil {
// Check if we're caught up
if isEndOfData(err) {
log.Println("✓ Up to date!")
return nil
}
return err
}
log.Printf("✓ Fetched bundle %06d (%d ops, %d DIDs)",
bundle.BundleNumber,
len(bundle.Operations),
bundle.DIDCount)
}
}
func (s *PLCSync) Stop() {
s.cancel()
s.mgr.Close()
}
func isEndOfData(err error) bool {
return err != nil &&
(strings.Contains(err.Error(), "insufficient operations") ||
strings.Contains(err.Error(), "caught up"))
}
// Usage
func main() {
sync, err := NewPLCSync("./plc_bundles")
if err != nil {
log.Fatal(err)
}
defer sync.Stop()
// Update every 5 minutes
sync.Start(5 * time.Minute)
}
4. Getting Bundles#
ctx := context.Background()
// Get all bundles
index := mgr.GetIndex()
bundles := index.GetBundles()
for _, meta := range bundles {
log.Printf("Bundle %06d: %d ops, %s to %s",
meta.BundleNumber,
meta.OperationCount,
meta.StartTime.Format(time.RFC3339),
meta.EndTime.Format(time.RFC3339))
}
// Load specific bundle
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
log.Printf("Loaded %d operations", len(bundle.Operations))
5. Getting Operations from Bundles#
// Load a bundle and iterate operations
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
for _, op := range bundle.Operations {
log.Printf("DID: %s, CID: %s, Time: %s",
op.DID,
op.CID,
op.CreatedAt.Format(time.RFC3339))
// Access operation data
if opType, ok := op.Operation["type"].(string); ok {
log.Printf(" Type: %s", opType)
}
}
6. Export Operations by Time Range#
// Export operations after a specific time
afterTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
operations, err := mgr.Export(ctx, afterTime, 5000)
if err != nil {
log.Fatal(err)
}
log.Printf("Exported %d operations", len(operations))
// Process operations
for _, op := range operations {
// Your processing logic
processOperation(op)
}
7. Periodic Update Pattern#
// Simple periodic updater
func runPeriodicUpdate(mgr *plcbundle.BundleManager, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
// Try to fetch next bundle
bundle, err := mgr.FetchNext(ctx)
if err != nil {
if strings.Contains(err.Error(), "insufficient operations") {
log.Println("Caught up!")
continue
}
log.Printf("Error: %v", err)
continue
}
log.Printf("New bundle %d: %d operations",
bundle.BundleNumber,
len(bundle.Operations))
// Process new operations
for _, op := range bundle.Operations {
handleOperation(op)
}
}
}
// Usage
go runPeriodicUpdate(mgr, 10*time.Minute)
8. Verify Integrity#
// Verify specific bundle
result, err := mgr.Verify(ctx, 1)
if err != nil {
log.Fatal(err)
}
if result.Valid {
log.Println("✓ Bundle is valid")
} else {
log.Printf("✗ Invalid: %v", result.Error)
}
// Verify entire chain
chainResult, err := mgr.VerifyChain(ctx)
if err != nil {
log.Fatal(err)
}
if chainResult.Valid {
log.Printf("✓ Chain verified: %d bundles", chainResult.ChainLength)
} else {
log.Printf("✗ Chain broken at bundle %d: %s",
chainResult.BrokenAt,
chainResult.Error)
}
9. Scan Directory (Re-index)#
// Scan directory and rebuild index from existing bundles
result, err := mgr.Scan()
if err != nil {
log.Fatal(err)
}
log.Printf("Scanned %d bundles", result.BundleCount)
if len(result.MissingGaps) > 0 {
log.Printf("Warning: Missing bundles: %v", result.MissingGaps)
}
10. Complete Example: Background Sync Service#
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
plcbundle "github.com/atscan/plcbundle"
)
type PLCService struct {
mgr *plcbundle.BundleManager
updateCh chan struct{}
stopCh chan struct{}
}
func NewPLCService(bundleDir string) (*PLCService, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
return &PLCService{
mgr: mgr,
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
}, nil
}
func (s *PLCService) Start() {
log.Println("Starting PLC service...")
// Initial scan
if _, err := s.mgr.Scan(); err != nil {
log.Printf("Scan warning: %v", err)
}
// Start update loop
go s.updateLoop()
// Periodic trigger
go s.periodicTrigger(5 * time.Minute)
}
func (s *PLCService) updateLoop() {
for {
select {
case <-s.updateCh:
s.fetchNewBundles()
case <-s.stopCh:
return
}
}
}
func (s *PLCService) periodicTrigger(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.TriggerUpdate()
case <-s.stopCh:
return
}
}
}
func (s *PLCService) TriggerUpdate() {
select {
case s.updateCh <- struct{}{}:
default:
// Update already in progress
}
}
func (s *PLCService) fetchNewBundles() {
ctx := context.Background()
fetched := 0
for {
bundle, err := s.mgr.FetchNext(ctx)
if err != nil {
if isEndOfData(err) {
if fetched > 0 {
log.Printf("✓ Fetched %d new bundles", fetched)
}
return
}
log.Printf("Fetch error: %v", err)
return
}
fetched++
log.Printf("Bundle %06d: %d operations",
bundle.BundleNumber,
len(bundle.Operations))
}
}
func (s *PLCService) GetBundles() []*plcbundle.BundleMetadata {
return s.mgr.GetIndex().GetBundles()
}
func (s *PLCService) GetOperations(bundleNum int) ([]plcbundle.PLCOperation, error) {
ctx := context.Background()
bundle, err := s.mgr.Load(ctx, bundleNum)
if err != nil {
return nil, err
}
return bundle.Operations, nil
}
func (s *PLCService) Stop() {
close(s.stopCh)
s.mgr.Close()
}
func main() {
service, err := NewPLCService("./plc_data")
if err != nil {
log.Fatal(err)
}
service.Start()
// Wait for interrupt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
service.Stop()
}
CLI Tool Usage#
Fetch bundles#
# Fetch next bundle
plcbundle fetch
# Fetch specific number of bundles
plcbundle fetch -count 10
# Fetch all available bundles
plcbundle fetch -count 0
Scan directory#
# Scan and rebuild index
plcbundle scan
Verify integrity#
# Verify specific bundle
plcbundle verify -bundle 1
# Verify entire chain
plcbundle verify
# Verbose output
plcbundle verify -v
Show information#
# General info
plcbundle info
# Specific bundle info
plcbundle info -bundle 1
Export operations#
# Export operations to stdout (JSONL)
plcbundle export -count 1000 > operations.jsonl
# Export after specific time
plcbundle export -after "2024-01-01T00:00:00Z" -count 5000
Backfill#
# Fetch all bundles and stream to stdout
plcbundle backfill > all_operations.jsonl
# Start from specific bundle
plcbundle backfill -start 100 -end 200
API Reference#
Types#
type BundleManager struct { ... }
type Bundle struct {
BundleNumber int
StartTime time.Time
EndTime time.Time
Operations []PLCOperation
DIDCount int
Hash string
// ...
}
type PLCOperation struct {
DID string
Operation map[string]interface{}
CID string
CreatedAt time.Time
RawJSON []byte
}
Methods#
// Create
New(bundleDir, plcURL string) (*BundleManager, error)
// Sync
FetchNext(ctx) (*Bundle, error)
Export(ctx, afterTime, count) ([]PLCOperation, error)
// Query
Load(ctx, bundleNumber) (*Bundle, error)
GetIndex() *Index
GetInfo() map[string]interface{}
// Verify
Verify(ctx, bundleNumber) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)
// Manage
Scan() (*DirectoryScanResult, error)
Close()
Configuration#
type Config struct {
BundleDir string // Storage directory
CompressionLevel CompressionLevel // Compression level
VerifyOnLoad bool // Verify hashes when loading
Logger Logger // Custom logger
}
// Compression levels
const (
CompressionFastest // Fastest compression
CompressionDefault // Balanced
CompressionBetter // Better compression (default)
CompressionBest // Best compression
)
Bundle Format#
- File naming:
NNNNNN.jsonl.zst(e.g.,000001.jsonl.zst) - Size: 10,000 operations per bundle
- Compression: Zstandard
- Index:
plc_bundles.json(metadata and chain info)
Best Practices#
- Regular Updates: Run periodic updates (5-10 minutes) to stay synchronized
- Error Handling: Handle "insufficient operations" error as "caught up" signal
- Verification: Periodically verify chain integrity
- Rate Limiting: Default 90 req/min is safe for PLC directory
- Storage: Plan ~1-2 GB per million operations (compressed)
License#
MIT
Contributing#
Contributions welcome! Please open an issue or PR.