PLC Bundle#
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⢸⣿⣿⢿⣾⣯⣑⢄⡂⠀⠄⠂⠀⠀⢀⠀⠀⠐⠀⠀⠀⠀⠀⠀⠀⠀⠄⠐⠀⠀⠀⠀⣄⠭⠂⠈⠜⣩⣿⢝⠃⠀⠁⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⣻⡟⠏⠀⠚⠈⠚⡉⡝⢶⣱⢤⣅⠈⠀⠄⠀⠀⠀⠀⠀⠠⠀⠀⡂⠐⣤⢕⡪⢼⣈⡹⡇⠏⠏⠋⠅⢃⣪⡏⡇⡍⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣻⡄⠀⠀⠀⢠⠌⠃⠐⠉⢡⠱⠧⠝⡯⣮⢶⣴⣤⡆⢐⣣⢅⣮⡟⠦⠍⠉⠀⠁⠐⠀⠀⠀⠄⠐⠡⣽⡸⣎⢁⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢈⡻⣧⠀⠁⠐⠀⠀⠀⠀⠀⠀⠊⠀⠕⢀⡉⠈⡫⠽⡿⡟⠿⠟⠁⠀⠀⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠬⠥⣋⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⡀⣾⡍⠕⡀⠀⠀⠀⠄⠠⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠥⣤⢌⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠄⢀⠀⢝⢞⣫⡆⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣽⡶⡄⠐⡀⠀⠀⠀⠀⠀⠀⢀⠀⠄⠀⠀⠀⠄⠁⠇⣷⡆⠀⠀⠀⢀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡸⢝⣮⠍⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⢀⠀⢾⣷⠀⠠⡀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠀⠁⡁⠀⠀⣾⡥⠖⠀⠀⠀⠂⠀⠀⠀⠀⠀⠁⠀⡀⠁⠀⠀⠻⢳⣻⢄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣞⡙⠨⣀⠠⠄⠀⠂⠀⠀⠀⠈⢀⠀⠀⠀⠀⠀⠤⢚⢢⣟⠀⠀⠀⠀⡐⠀⠀⡀⠀⠀⠀⠀⠁⠈⠌⠊⣯⣮⡏⠡⠂⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣻⡟⡄⡡⣄⠀⠠⠀⠀⡅⠀⠐⠀⡀⠀⡀⠀⠄⠈⠃⠳⠪⠤⠀⠀⠀⠀⡀⠀⠂⠀⠀⠀⠁⠈⢠⣠⠒⠻⣻⡧⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠪⡎⠠⢌⠑⡀⠂⠀⠄⠠⠀⠠⠀⠁⡀⠠⠠⡀⣀⠜⢏⡅⠀⠀⡀⠁⠀⠀⠁⠁⠐⠄⡀⢀⠂⠀⠄⢑⣿⣿⣿⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠼⣻⠧⣣⣀⠐⠨⠁⠕⢈⢀⢀⡁⠀⠈⠠⢀⠀⠐⠜⣽⡗⡤⠀⠂⠀⠠⠀⢂⠠⠀⠁⠄⠀⠔⠀⠑⣨⣿⢯⠋⡅⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⡚⣷⣭⠎⢃⡗⠄⡄⢀⠁⠀⠅⢀⢅⡀⠠⠀⢠⡀⡩⠷⢇⠀⡀⠄⡠⠤⠆⣀⡀⠄⠉⣠⠃⠴⠀⠈⢁⣿⡛⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠘⡬⡿⣿⡏⡻⡯⠌⢁⢛⠠⠓⠐⠐⠐⠌⠃⠋⠂⡢⢰⣈⢏⣰⠂⠈⠀⠠⠒⠡⠌⠫⠭⠩⠢⡬⠆⠿⢷⢿⡽⡧⠉⠊⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣷⣺⣗⣿⡶⡎⡅⣣⢎⠠⡅⣢⡖⠴⠬⡈⠂⡨⢡⠾⣣⣢⠀⠀⡹⠄⡄⠄⡇⣰⡖⡊⠔⢹⣄⣿⣭⣵⣿⢷⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠩⣿⣿⣲⣿⣷⣟⣼⠟⣬⢉⡠⣪⢜⣂⣁⠥⠓⠚⡁⢶⣷⣠⠂⡄⡢⣀⡐⠧⢆⣒⡲⡳⡫⢟⡃⢪⡧⣟⡟⣯⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢺⠟⢿⢟⢻⡗⡮⡿⣲⢷⣆⣏⣇⡧⣄⢖⠾⡷⣿⣤⢳⢷⣣⣦⡜⠗⣭⢂⠩⣹⢿⡲⢎⡧⣕⣖⣓⣽⡿⡖⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⚠️ Preview Version - Do Not Use in Production!
This project and plcbundle specification is currently unstable and under heavy development. Things can break at any time. Bundle hashes or data formats may change. Do not use this for production systems. Please wait for the
1.0release.
plcbundle archives AT Protocol's DID PLC Directory operations into immutable, cryptographically-chained bundles of 10,000 operations. Each bundle is hashed (SHA-256), compressed (zstd), and linked to the previous bundle, creating a verifiable chain of DID operations.
This repository contains a reference library and a CLI tool written in Go language.
The technical specification for the plcbundle V1 format, index, and creation process can be found in the specification document.
Features#
- 📦 Bundle Management: Automatically organize PLC operations into compressed bundles (10,000 operations each)
- 🔄 Transparent Sync: Fetch and cache PLC operations with automatic deduplication
- 🗜️ Efficient Storage: Zstandard compression with configurable levels
- ✅ Integrity: SHA-256 hash verification and blockchain-like chain validation
- 🔍 Indexing: Fast bundle lookup and gap detection
- 📊 Export: Query operations by time range
Installation#
go get github.com/atscan/plcbundle
For the CLI tool:
go install github.com/atscan/plcbundle/cmd/plcbundle@latest
Quick Start (Library)#
package main
import (
"context"
"log"
"time"
plcbundle "github.com/atscan/plcbundle"
)
func main() {
// Create a bundle manager
mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
// Fetch latest bundles
ctx := context.Background()
bundle, err := mgr.FetchNext(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("Fetched bundle %d with %d operations",
bundle.BundleNumber, len(bundle.Operations))
}
Library Usage#
1. Basic Setup#
import (
"context"
plcbundle "github.com/atscan/plcbundle"
)
// Create manager with defaults
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
if err != nil {
log.Fatal(err)
}
defer mgr.Close()
2. Custom Configuration#
import (
"github.com/atscan/plcbundle/bundle"
"github.com/atscan/plcbundle/plc"
)
// Custom config
config := &bundle.Config{
BundleDir: "./my_bundles",
CompressionLevel: bundle.CompressionBest,
VerifyOnLoad: true,
Logger: myCustomLogger,
}
// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
plc.WithRateLimit(60, time.Minute), // 60 req/min
plc.WithTimeout(30*time.Second),
)
mgr, err := bundle.NewManager(config, plcClient)
3. Transparent Synchronization (Main Use Case)#
This is the primary pattern for keeping your local PLC mirror up-to-date:
package main
import (
"context"
"log"
"time"
plcbundle "github.com/atscan/plcbundle"
)
type PLCSync struct {
mgr *plcbundle.BundleManager
ctx context.Context
cancel context.CancelFunc
}
func NewPLCSync(bundleDir string) (*PLCSync, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
sync := &PLCSync{
mgr: mgr,
ctx: ctx,
cancel: cancel,
}
return sync, nil
}
func (s *PLCSync) Start(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
log.Println("Starting PLC synchronization...")
for {
select {
case <-ticker.C:
if err := s.Update(); err != nil {
log.Printf("Update error: %v", err)
}
case <-s.ctx.Done():
return
}
}
}
func (s *PLCSync) Update() error {
log.Println("Checking for new bundles...")
for {
bundle, err := s.mgr.FetchNext(s.ctx)
if err != nil {
// Check if we're caught up
if isEndOfData(err) {
log.Println("✓ Up to date!")
return nil
}
return err
}
log.Printf("✓ Fetched bundle %06d (%d ops, %d DIDs)",
bundle.BundleNumber,
len(bundle.Operations),
bundle.DIDCount)
}
}
func (s *PLCSync) Stop() {
s.cancel()
s.mgr.Close()
}
func isEndOfData(err error) bool {
return err != nil &&
(strings.Contains(err.Error(), "insufficient operations") ||
strings.Contains(err.Error(), "caught up"))
}
// Usage
func main() {
sync, err := NewPLCSync("./plc_bundles")
if err != nil {
log.Fatal(err)
}
defer sync.Stop()
// Update every 5 minutes
sync.Start(5 * time.Minute)
}
4. Getting Bundles#
ctx := context.Background()
// Get all bundles
index := mgr.GetIndex()
bundles := index.GetBundles()
for _, meta := range bundles {
log.Printf("Bundle %06d: %d ops, %s to %s",
meta.BundleNumber,
meta.OperationCount,
meta.StartTime.Format(time.RFC3339),
meta.EndTime.Format(time.RFC3339))
}
// Load specific bundle
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
log.Printf("Loaded %d operations", len(bundle.Operations))
5. Getting Operations from Bundles#
// Load a bundle and iterate operations
bundle, err := mgr.Load(ctx, 1)
if err != nil {
log.Fatal(err)
}
for _, op := range bundle.Operations {
log.Printf("DID: %s, CID: %s, Time: %s",
op.DID,
op.CID,
op.CreatedAt.Format(time.RFC3339))
// Access operation data
if opType, ok := op.Operation["type"].(string); ok {
log.Printf(" Type: %s", opType)
}
}
6. Export Operations by Time Range#
// Export operations after a specific time
afterTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
operations, err := mgr.Export(ctx, afterTime, 5000)
if err != nil {
log.Fatal(err)
}
log.Printf("Exported %d operations", len(operations))
// Process operations
for _, op := range operations {
// Your processing logic
processOperation(op)
}
7. Periodic Update Pattern#
// Simple periodic updater
func runPeriodicUpdate(mgr *plcbundle.BundleManager, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
// Try to fetch next bundle
bundle, err := mgr.FetchNext(ctx)
if err != nil {
if strings.Contains(err.Error(), "insufficient operations") {
log.Println("Caught up!")
continue
}
log.Printf("Error: %v", err)
continue
}
log.Printf("New bundle %d: %d operations",
bundle.BundleNumber,
len(bundle.Operations))
// Process new operations
for _, op := range bundle.Operations {
handleOperation(op)
}
}
}
// Usage
go runPeriodicUpdate(mgr, 10*time.Minute)
8. Verify Integrity#
// Verify specific bundle
result, err := mgr.Verify(ctx, 1)
if err != nil {
log.Fatal(err)
}
if result.Valid {
log.Println("✓ Bundle is valid")
} else {
log.Printf("✗ Invalid: %v", result.Error)
}
// Verify entire chain
chainResult, err := mgr.VerifyChain(ctx)
if err != nil {
log.Fatal(err)
}
if chainResult.Valid {
log.Printf("✓ Chain verified: %d bundles", chainResult.ChainLength)
} else {
log.Printf("✗ Chain broken at bundle %d: %s",
chainResult.BrokenAt,
chainResult.Error)
}
9. Scan Directory (Re-index)#
// Scan directory and rebuild index from existing bundles
result, err := mgr.Scan()
if err != nil {
log.Fatal(err)
}
log.Printf("Scanned %d bundles", result.BundleCount)
if len(result.MissingGaps) > 0 {
log.Printf("Warning: Missing bundles: %v", result.MissingGaps)
}
10. Complete Example: Background Sync Service#
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
plcbundle "github.com/atscan/plcbundle"
)
type PLCService struct {
mgr *plcbundle.BundleManager
updateCh chan struct{}
stopCh chan struct{}
}
func NewPLCService(bundleDir string) (*PLCService, error) {
mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
if err != nil {
return nil, err
}
return &PLCService{
mgr: mgr,
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
}, nil
}
func (s *PLCService) Start() {
log.Println("Starting PLC service...")
// Initial scan
if _, err := s.mgr.Scan(); err != nil {
log.Printf("Scan warning: %v", err)
}
// Start update loop
go s.updateLoop()
// Periodic trigger
go s.periodicTrigger(5 * time.Minute)
}
func (s *PLCService) updateLoop() {
for {
select {
case <-s.updateCh:
s.fetchNewBundles()
case <-s.stopCh:
return
}
}
}
func (s *PLCService) periodicTrigger(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.TriggerUpdate()
case <-s.stopCh:
return
}
}
}
func (s *PLCService) TriggerUpdate() {
select {
case s.updateCh <- struct{}{}:
default:
// Update already in progress
}
}
func (s *PLCService) fetchNewBundles() {
ctx := context.Background()
fetched := 0
for {
bundle, err := s.mgr.FetchNext(ctx)
if err != nil {
if isEndOfData(err) {
if fetched > 0 {
log.Printf("✓ Fetched %d new bundles", fetched)
}
return
}
log.Printf("Fetch error: %v", err)
return
}
fetched++
log.Printf("Bundle %06d: %d operations",
bundle.BundleNumber,
len(bundle.Operations))
}
}
func (s *PLCService) GetBundles() []*plcbundle.BundleMetadata {
return s.mgr.GetIndex().GetBundles()
}
func (s *PLCService) GetOperations(bundleNum int) ([]plcbundle.PLCOperation, error) {
ctx := context.Background()
bundle, err := s.mgr.Load(ctx, bundleNum)
if err != nil {
return nil, err
}
return bundle.Operations, nil
}
func (s *PLCService) Stop() {
close(s.stopCh)
s.mgr.Close()
}
func main() {
service, err := NewPLCService("./plc_data")
if err != nil {
log.Fatal(err)
}
service.Start()
// Wait for interrupt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
service.Stop()
}
CLI Tool Usage#
Fetch bundles#
# Fetch next bundle
plcbundle fetch
# Fetch specific number of bundles
plcbundle fetch -count 10
# Fetch all available bundles
plcbundle fetch -count 0
Scan directory#
# Scan and rebuild index
plcbundle scan
Verify integrity#
# Verify specific bundle
plcbundle verify -bundle 1
# Verify entire chain
plcbundle verify
# Verbose output
plcbundle verify -v
Show information#
# General info
plcbundle info
# Specific bundle info
plcbundle info -bundle 1
Export operations#
# Export operations to stdout (JSONL)
plcbundle export -count 1000 > operations.jsonl
# Export after specific time
plcbundle export -after "2024-01-01T00:00:00Z" -count 5000
Backfill#
# Fetch all bundles and stream to stdout
plcbundle backfill > all_operations.jsonl
# Start from specific bundle
plcbundle backfill -start 100 -end 200
API Reference#
Types#
type BundleManager struct { ... }
type Bundle struct {
BundleNumber int
StartTime time.Time
EndTime time.Time
Operations []PLCOperation
DIDCount int
Hash string
// ...
}
type PLCOperation struct {
DID string
Operation map[string]interface{}
CID string
CreatedAt time.Time
RawJSON []byte
}
Methods#
// Create
New(bundleDir, plcURL string) (*BundleManager, error)
// Sync
FetchNext(ctx) (*Bundle, error)
Export(ctx, afterTime, count) ([]PLCOperation, error)
// Query
Load(ctx, bundleNumber) (*Bundle, error)
GetIndex() *Index
GetInfo() map[string]interface{}
// Verify
Verify(ctx, bundleNumber) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)
// Manage
Scan() (*DirectoryScanResult, error)
Close()
Configuration#
type Config struct {
BundleDir string // Storage directory
CompressionLevel CompressionLevel // Compression level
VerifyOnLoad bool // Verify hashes when loading
Logger Logger // Custom logger
}
License#
MIT
Contributing#
Contributions welcome! Please open an issue or PR.