A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

Go 96.8%
Makefile 1.5%
Shell 1.4%
Other 0.3%
62 3 47

Clone this repository

https://tangled.org/atscan.net/plcbundle https://tangled.org/did:plc:ft3tl5dxjn4psdk6asenqn3r/plcbundle
git@tangled.org:atscan.net/plcbundle git@tangled.org:did:plc:ft3tl5dxjn4psdk6asenqn3r/plcbundle

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

PLC Bundle#

	⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⢸⣿⣿⢿⣾⣯⣑⢄⡂⠀⠄⠂⠀⠀⢀⠀⠀⠐⠀⠀⠀⠀⠀⠀⠀⠀⠄⠐⠀⠀⠀⠀⣄⠭⠂⠈⠜⣩⣿⢝⠃⠀⠁⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⣻⡟⠏⠀⠚⠈⠚⡉⡝⢶⣱⢤⣅⠈⠀⠄⠀⠀⠀⠀⠀⠠⠀⠀⡂⠐⣤⢕⡪⢼⣈⡹⡇⠏⠏⠋⠅⢃⣪⡏⡇⡍⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣻⡄⠀⠀⠀⢠⠌⠃⠐⠉⢡⠱⠧⠝⡯⣮⢶⣴⣤⡆⢐⣣⢅⣮⡟⠦⠍⠉⠀⠁⠐⠀⠀⠀⠄⠐⠡⣽⡸⣎⢁⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢈⡻⣧⠀⠁⠐⠀⠀⠀⠀⠀⠀⠊⠀⠕⢀⡉⠈⡫⠽⡿⡟⠿⠟⠁⠀⠀⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠬⠥⣋⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⡀⣾⡍⠕⡀⠀⠀⠀⠄⠠⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠥⣤⢌⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠄⢀⠀⢝⢞⣫⡆⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣽⡶⡄⠐⡀⠀⠀⠀⠀⠀⠀⢀⠀⠄⠀⠀⠀⠄⠁⠇⣷⡆⠀⠀⠀⢀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡸⢝⣮⠍⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⢀⠀⢾⣷⠀⠠⡀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠀⠁⡁⠀⠀⣾⡥⠖⠀⠀⠀⠂⠀⠀⠀⠀⠀⠁⠀⡀⠁⠀⠀⠻⢳⣻⢄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣞⡙⠨⣀⠠⠄⠀⠂⠀⠀⠀⠈⢀⠀⠀⠀⠀⠀⠤⢚⢢⣟⠀⠀⠀⠀⡐⠀⠀⡀⠀⠀⠀⠀⠁⠈⠌⠊⣯⣮⡏⠡⠂⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣻⡟⡄⡡⣄⠀⠠⠀⠀⡅⠀⠐⠀⡀⠀⡀⠀⠄⠈⠃⠳⠪⠤⠀⠀⠀⠀⡀⠀⠂⠀⠀⠀⠁⠈⢠⣠⠒⠻⣻⡧⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠪⡎⠠⢌⠑⡀⠂⠀⠄⠠⠀⠠⠀⠁⡀⠠⠠⡀⣀⠜⢏⡅⠀⠀⡀⠁⠀⠀⠁⠁⠐⠄⡀⢀⠂⠀⠄⢑⣿⣿⣿⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠼⣻⠧⣣⣀⠐⠨⠁⠕⢈⢀⢀⡁⠀⠈⠠⢀⠀⠐⠜⣽⡗⡤⠀⠂⠀⠠⠀⢂⠠⠀⠁⠄⠀⠔⠀⠑⣨⣿⢯⠋⡅⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⡚⣷⣭⠎⢃⡗⠄⡄⢀⠁⠀⠅⢀⢅⡀⠠⠀⢠⡀⡩⠷⢇⠀⡀⠄⡠⠤⠆⣀⡀⠄⠉⣠⠃⠴⠀⠈⢁⣿⡛⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠘⡬⡿⣿⡏⡻⡯⠌⢁⢛⠠⠓⠐⠐⠐⠌⠃⠋⠂⡢⢰⣈⢏⣰⠂⠈⠀⠠⠒⠡⠌⠫⠭⠩⠢⡬⠆⠿⢷⢿⡽⡧⠉⠊⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣷⣺⣗⣿⡶⡎⡅⣣⢎⠠⡅⣢⡖⠴⠬⡈⠂⡨⢡⠾⣣⣢⠀⠀⡹⠄⡄⠄⡇⣰⡖⡊⠔⢹⣄⣿⣭⣵⣿⢷⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠩⣿⣿⣲⣿⣷⣟⣼⠟⣬⢉⡠⣪⢜⣂⣁⠥⠓⠚⡁⢶⣷⣠⠂⡄⡢⣀⡐⠧⢆⣒⡲⡳⡫⢟⡃⢪⡧⣟⡟⣯⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢺⠟⢿⢟⢻⡗⡮⡿⣲⢷⣆⣏⣇⡧⣄⢖⠾⡷⣿⣤⢳⢷⣣⣦⡜⠗⣭⢂⠩⣹⢿⡲⢎⡧⣕⣖⣓⣽⡿⡖⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀

⚠️ Preview Version - Do Not Use in Production!

This project and plcbundle specification is currently unstable and under heavy development. Things can break at any time. Bundle hashes or data formats may change. Do not use this for production systems. Please wait for the 1.0 release.

A Go library and CLI tool for managing DID PLC Directory bundles with transparent synchronization, compression, and verification.

Features#

  • 📦 Bundle Management: Automatically organize PLC operations into compressed bundles (10,000 operations each)
  • 🔄 Transparent Sync: Fetch and cache PLC operations with automatic deduplication
  • 🗜️ Efficient Storage: Zstandard compression with configurable levels
  • Integrity: SHA-256 hash verification and blockchain-like chain validation
  • 🔍 Indexing: Fast bundle lookup and gap detection
  • 📊 Export: Query operations by time range

Installation#

go get github.com/atscan/plcbundle

For the CLI tool:

go install github.com/atscan/plcbundle/cmd/plcbundle@latest

Quick Start (Library)#

package main

import (
    "context"
    "log"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

func main() {
    // Create a bundle manager
    mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
    if err != nil {
        log.Fatal(err)
    }
    defer mgr.Close()
    
    // Fetch latest bundles
    ctx := context.Background()
    bundle, err := mgr.FetchNext(ctx)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Fetched bundle %d with %d operations", 
        bundle.BundleNumber, len(bundle.Operations))
}

Library Usage#

1. Basic Setup#

import (
    "context"
    plcbundle "github.com/atscan/plcbundle"
)

// Create manager with defaults
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
if err != nil {
    log.Fatal(err)
}
defer mgr.Close()

2. Custom Configuration#

import (
    "github.com/atscan/plcbundle/bundle"
    "github.com/atscan/plcbundle/plc"
)

// Custom config
config := &bundle.Config{
    BundleDir:        "./my_bundles",
    CompressionLevel: bundle.CompressionBest,
    VerifyOnLoad:     true,
    Logger:           myCustomLogger,
}

// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
    plc.WithRateLimit(60, time.Minute),  // 60 req/min
    plc.WithTimeout(30*time.Second),
)

mgr, err := bundle.NewManager(config, plcClient)

3. Transparent Synchronization (Main Use Case)#

This is the primary pattern for keeping your local PLC mirror up-to-date:

package main

import (
    "context"
    "log"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

type PLCSync struct {
    mgr    *plcbundle.BundleManager
    ctx    context.Context
    cancel context.CancelFunc
}

func NewPLCSync(bundleDir string) (*PLCSync, error) {
    mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
    if err != nil {
        return nil, err
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    
    sync := &PLCSync{
        mgr:    mgr,
        ctx:    ctx,
        cancel: cancel,
    }
    
    return sync, nil
}

func (s *PLCSync) Start(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    log.Println("Starting PLC synchronization...")
    
    for {
        select {
        case <-ticker.C:
            if err := s.Update(); err != nil {
                log.Printf("Update error: %v", err)
            }
        case <-s.ctx.Done():
            return
        }
    }
}

func (s *PLCSync) Update() error {
    log.Println("Checking for new bundles...")
    
    for {
        bundle, err := s.mgr.FetchNext(s.ctx)
        if err != nil {
            // Check if we're caught up
            if isEndOfData(err) {
                log.Println("✓ Up to date!")
                return nil
            }
            return err
        }
        
        log.Printf("✓ Fetched bundle %06d (%d ops, %d DIDs)",
            bundle.BundleNumber, 
            len(bundle.Operations),
            bundle.DIDCount)
    }
}

func (s *PLCSync) Stop() {
    s.cancel()
    s.mgr.Close()
}

func isEndOfData(err error) bool {
    return err != nil && 
        (strings.Contains(err.Error(), "insufficient operations") ||
         strings.Contains(err.Error(), "caught up"))
}

// Usage
func main() {
    sync, err := NewPLCSync("./plc_bundles")
    if err != nil {
        log.Fatal(err)
    }
    defer sync.Stop()
    
    // Update every 5 minutes
    sync.Start(5 * time.Minute)
}

4. Getting Bundles#

ctx := context.Background()

// Get all bundles
index := mgr.GetIndex()
bundles := index.GetBundles()

for _, meta := range bundles {
    log.Printf("Bundle %06d: %d ops, %s to %s",
        meta.BundleNumber,
        meta.OperationCount,
        meta.StartTime.Format(time.RFC3339),
        meta.EndTime.Format(time.RFC3339))
}

// Load specific bundle
bundle, err := mgr.Load(ctx, 1)
if err != nil {
    log.Fatal(err)
}

log.Printf("Loaded %d operations", len(bundle.Operations))

5. Getting Operations from Bundles#

// Load a bundle and iterate operations
bundle, err := mgr.Load(ctx, 1)
if err != nil {
    log.Fatal(err)
}

for _, op := range bundle.Operations {
    log.Printf("DID: %s, CID: %s, Time: %s",
        op.DID,
        op.CID,
        op.CreatedAt.Format(time.RFC3339))
    
    // Access operation data
    if opType, ok := op.Operation["type"].(string); ok {
        log.Printf("  Type: %s", opType)
    }
}

6. Export Operations by Time Range#

// Export operations after a specific time
afterTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
operations, err := mgr.Export(ctx, afterTime, 5000)
if err != nil {
    log.Fatal(err)
}

log.Printf("Exported %d operations", len(operations))

// Process operations
for _, op := range operations {
    // Your processing logic
    processOperation(op)
}

7. Periodic Update Pattern#

// Simple periodic updater
func runPeriodicUpdate(mgr *plcbundle.BundleManager, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for range ticker.C {
        ctx := context.Background()
        
        // Try to fetch next bundle
        bundle, err := mgr.FetchNext(ctx)
        if err != nil {
            if strings.Contains(err.Error(), "insufficient operations") {
                log.Println("Caught up!")
                continue
            }
            log.Printf("Error: %v", err)
            continue
        }
        
        log.Printf("New bundle %d: %d operations", 
            bundle.BundleNumber,
            len(bundle.Operations))
        
        // Process new operations
        for _, op := range bundle.Operations {
            handleOperation(op)
        }
    }
}

// Usage
go runPeriodicUpdate(mgr, 10*time.Minute)

8. Verify Integrity#

// Verify specific bundle
result, err := mgr.Verify(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if result.Valid {
    log.Println("✓ Bundle is valid")
} else {
    log.Printf("✗ Invalid: %v", result.Error)
}

// Verify entire chain
chainResult, err := mgr.VerifyChain(ctx)
if err != nil {
    log.Fatal(err)
}

if chainResult.Valid {
    log.Printf("✓ Chain verified: %d bundles", chainResult.ChainLength)
} else {
    log.Printf("✗ Chain broken at bundle %d: %s", 
        chainResult.BrokenAt,
        chainResult.Error)
}

9. Scan Directory (Re-index)#

// Scan directory and rebuild index from existing bundles
result, err := mgr.Scan()
if err != nil {
    log.Fatal(err)
}

log.Printf("Scanned %d bundles", result.BundleCount)
if len(result.MissingGaps) > 0 {
    log.Printf("Warning: Missing bundles: %v", result.MissingGaps)
}

10. Complete Example: Background Sync Service#

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

type PLCService struct {
    mgr       *plcbundle.BundleManager
    updateCh  chan struct{}
    stopCh    chan struct{}
}

func NewPLCService(bundleDir string) (*PLCService, error) {
    mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
    if err != nil {
        return nil, err
    }
    
    return &PLCService{
        mgr:      mgr,
        updateCh: make(chan struct{}, 1),
        stopCh:   make(chan struct{}),
    }, nil
}

func (s *PLCService) Start() {
    log.Println("Starting PLC service...")
    
    // Initial scan
    if _, err := s.mgr.Scan(); err != nil {
        log.Printf("Scan warning: %v", err)
    }
    
    // Start update loop
    go s.updateLoop()
    
    // Periodic trigger
    go s.periodicTrigger(5 * time.Minute)
}

func (s *PLCService) updateLoop() {
    for {
        select {
        case <-s.updateCh:
            s.fetchNewBundles()
        case <-s.stopCh:
            return
        }
    }
}

func (s *PLCService) periodicTrigger(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            s.TriggerUpdate()
        case <-s.stopCh:
            return
        }
    }
}

func (s *PLCService) TriggerUpdate() {
    select {
    case s.updateCh <- struct{}{}:
    default:
        // Update already in progress
    }
}

func (s *PLCService) fetchNewBundles() {
    ctx := context.Background()
    fetched := 0
    
    for {
        bundle, err := s.mgr.FetchNext(ctx)
        if err != nil {
            if isEndOfData(err) {
                if fetched > 0 {
                    log.Printf("✓ Fetched %d new bundles", fetched)
                }
                return
            }
            log.Printf("Fetch error: %v", err)
            return
        }
        
        fetched++
        log.Printf("Bundle %06d: %d operations", 
            bundle.BundleNumber,
            len(bundle.Operations))
    }
}

func (s *PLCService) GetBundles() []*plcbundle.BundleMetadata {
    return s.mgr.GetIndex().GetBundles()
}

func (s *PLCService) GetOperations(bundleNum int) ([]plcbundle.PLCOperation, error) {
    ctx := context.Background()
    bundle, err := s.mgr.Load(ctx, bundleNum)
    if err != nil {
        return nil, err
    }
    return bundle.Operations, nil
}

func (s *PLCService) Stop() {
    close(s.stopCh)
    s.mgr.Close()
}

func main() {
    service, err := NewPLCService("./plc_data")
    if err != nil {
        log.Fatal(err)
    }
    
    service.Start()
    
    // Wait for interrupt
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh
    
    log.Println("Shutting down...")
    service.Stop()
}

CLI Tool Usage#

Fetch bundles#

# Fetch next bundle
plcbundle fetch

# Fetch specific number of bundles
plcbundle fetch -count 10

# Fetch all available bundles
plcbundle fetch -count 0

Scan directory#

# Scan and rebuild index
plcbundle scan

Verify integrity#

# Verify specific bundle
plcbundle verify -bundle 1

# Verify entire chain
plcbundle verify

# Verbose output
plcbundle verify -v

Show information#

# General info
plcbundle info

# Specific bundle info
plcbundle info -bundle 1

Export operations#

# Export operations to stdout (JSONL)
plcbundle export -count 1000 > operations.jsonl

# Export after specific time
plcbundle export -after "2024-01-01T00:00:00Z" -count 5000

Backfill#

# Fetch all bundles and stream to stdout
plcbundle backfill > all_operations.jsonl

# Start from specific bundle
plcbundle backfill -start 100 -end 200

API Reference#

Types#

type BundleManager struct { ... }
type Bundle struct {
    BundleNumber int
    StartTime    time.Time
    EndTime      time.Time
    Operations   []PLCOperation
    DIDCount     int
    Hash         string
    // ...
}

type PLCOperation struct {
    DID       string
    Operation map[string]interface{}
    CID       string
    CreatedAt time.Time
    RawJSON   []byte
}

Methods#

// Create
New(bundleDir, plcURL string) (*BundleManager, error)

// Sync
FetchNext(ctx) (*Bundle, error)
Export(ctx, afterTime, count) ([]PLCOperation, error)

// Query
Load(ctx, bundleNumber) (*Bundle, error)
GetIndex() *Index
GetInfo() map[string]interface{}

// Verify
Verify(ctx, bundleNumber) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)

// Manage
Scan() (*DirectoryScanResult, error)
Close()

Configuration#

type Config struct {
    BundleDir        string             // Storage directory
    CompressionLevel CompressionLevel   // Compression level
    VerifyOnLoad     bool               // Verify hashes when loading
    Logger           Logger             // Custom logger
}

// Compression levels
const (
    CompressionFastest  // Fastest compression
    CompressionDefault  // Balanced
    CompressionBetter   // Better compression (default)
    CompressionBest     // Best compression
)

Bundle Format#

  • File naming: NNNNNN.jsonl.zst (e.g., 000001.jsonl.zst)
  • Size: 10,000 operations per bundle
  • Compression: Zstandard
  • Index: plc_bundles.json (metadata and chain info)

Best Practices#

  1. Regular Updates: Run periodic updates (5-10 minutes) to stay synchronized
  2. Error Handling: Handle "insufficient operations" error as "caught up" signal
  3. Verification: Periodically verify chain integrity
  4. Rate Limiting: Default 90 req/min is safe for PLC directory
  5. Storage: Plan ~1-2 GB per million operations (compressed)

License#

MIT

Contributing#

Contributions welcome! Please open an issue or PR.