A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

Go 95.5%
Makefile 2.4%
Shell 2.1%
33 3 47

Clone this repository

https://tangled.org/atscan.net/plcbundle https://tangled.org/did:plc:ft3tl5dxjn4psdk6asenqn3r/plcbundle
git@tangled.org:atscan.net/plcbundle git@tangled.org:did:plc:ft3tl5dxjn4psdk6asenqn3r/plcbundle

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

PLC Bundle#

A Go library and CLI tool for managing DID PLC Directory bundles with transparent synchronization, compression, and verification.

Features#

  • 📦 Bundle Management: Automatically organize PLC operations into compressed bundles (10,000 operations each)
  • 🔄 Transparent Sync: Fetch and cache PLC operations with automatic deduplication
  • 🗜️ Efficient Storage: Zstandard compression with configurable levels
  • Integrity: SHA-256 hash verification and blockchain-like chain validation
  • 🔍 Indexing: Fast bundle lookup and gap detection
  • 📊 Export: Query operations by time range

Installation#

go get github.com/atscan/plcbundle

For the CLI tool:

go install github.com/atscan/plcbundle/cmd/plcbundle@latest

Quick Start (Library)#

package main

import (
    "context"
    "log"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

func main() {
    // Create a bundle manager
    mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
    if err != nil {
        log.Fatal(err)
    }
    defer mgr.Close()
    
    // Fetch latest bundles
    ctx := context.Background()
    bundle, err := mgr.FetchNext(ctx)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Fetched bundle %d with %d operations", 
        bundle.BundleNumber, len(bundle.Operations))
}

Library Usage#

1. Basic Setup#

import (
    "context"
    plcbundle "github.com/atscan/plcbundle"
)

// Create manager with defaults
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
if err != nil {
    log.Fatal(err)
}
defer mgr.Close()

2. Custom Configuration#

import (
    "github.com/atscan/plcbundle/bundle"
    "github.com/atscan/plcbundle/plc"
)

// Custom config
config := &bundle.Config{
    BundleDir:        "./my_bundles",
    CompressionLevel: bundle.CompressionBest,
    VerifyOnLoad:     true,
    Logger:           myCustomLogger,
}

// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
    plc.WithRateLimit(60, time.Minute),  // 60 req/min
    plc.WithTimeout(30*time.Second),
)

mgr, err := bundle.NewManager(config, plcClient)

3. Transparent Synchronization (Main Use Case)#

This is the primary pattern for keeping your local PLC mirror up-to-date:

package main

import (
    "context"
    "log"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

type PLCSync struct {
    mgr    *plcbundle.BundleManager
    ctx    context.Context
    cancel context.CancelFunc
}

func NewPLCSync(bundleDir string) (*PLCSync, error) {
    mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
    if err != nil {
        return nil, err
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    
    sync := &PLCSync{
        mgr:    mgr,
        ctx:    ctx,
        cancel: cancel,
    }
    
    return sync, nil
}

func (s *PLCSync) Start(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    log.Println("Starting PLC synchronization...")
    
    for {
        select {
        case <-ticker.C:
            if err := s.Update(); err != nil {
                log.Printf("Update error: %v", err)
            }
        case <-s.ctx.Done():
            return
        }
    }
}

func (s *PLCSync) Update() error {
    log.Println("Checking for new bundles...")
    
    for {
        bundle, err := s.mgr.FetchNext(s.ctx)
        if err != nil {
            // Check if we're caught up
            if isEndOfData(err) {
                log.Println("✓ Up to date!")
                return nil
            }
            return err
        }
        
        log.Printf("✓ Fetched bundle %06d (%d ops, %d DIDs)",
            bundle.BundleNumber, 
            len(bundle.Operations),
            bundle.DIDCount)
    }
}

func (s *PLCSync) Stop() {
    s.cancel()
    s.mgr.Close()
}

func isEndOfData(err error) bool {
    return err != nil && 
        (strings.Contains(err.Error(), "insufficient operations") ||
         strings.Contains(err.Error(), "caught up"))
}

// Usage
func main() {
    sync, err := NewPLCSync("./plc_bundles")
    if err != nil {
        log.Fatal(err)
    }
    defer sync.Stop()
    
    // Update every 5 minutes
    sync.Start(5 * time.Minute)
}

4. Getting Bundles#

ctx := context.Background()

// Get all bundles
index := mgr.GetIndex()
bundles := index.GetBundles()

for _, meta := range bundles {
    log.Printf("Bundle %06d: %d ops, %s to %s",
        meta.BundleNumber,
        meta.OperationCount,
        meta.StartTime.Format(time.RFC3339),
        meta.EndTime.Format(time.RFC3339))
}

// Load specific bundle
bundle, err := mgr.Load(ctx, 1)
if err != nil {
    log.Fatal(err)
}

log.Printf("Loaded %d operations", len(bundle.Operations))

5. Getting Operations from Bundles#

// Load a bundle and iterate operations
bundle, err := mgr.Load(ctx, 1)
if err != nil {
    log.Fatal(err)
}

for _, op := range bundle.Operations {
    log.Printf("DID: %s, CID: %s, Time: %s",
        op.DID,
        op.CID,
        op.CreatedAt.Format(time.RFC3339))
    
    // Access operation data
    if opType, ok := op.Operation["type"].(string); ok {
        log.Printf("  Type: %s", opType)
    }
}

6. Export Operations by Time Range#

// Export operations after a specific time
afterTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
operations, err := mgr.Export(ctx, afterTime, 5000)
if err != nil {
    log.Fatal(err)
}

log.Printf("Exported %d operations", len(operations))

// Process operations
for _, op := range operations {
    // Your processing logic
    processOperation(op)
}

7. Periodic Update Pattern#

// Simple periodic updater
func runPeriodicUpdate(mgr *plcbundle.BundleManager, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for range ticker.C {
        ctx := context.Background()
        
        // Try to fetch next bundle
        bundle, err := mgr.FetchNext(ctx)
        if err != nil {
            if strings.Contains(err.Error(), "insufficient operations") {
                log.Println("Caught up!")
                continue
            }
            log.Printf("Error: %v", err)
            continue
        }
        
        log.Printf("New bundle %d: %d operations", 
            bundle.BundleNumber,
            len(bundle.Operations))
        
        // Process new operations
        for _, op := range bundle.Operations {
            handleOperation(op)
        }
    }
}

// Usage
go runPeriodicUpdate(mgr, 10*time.Minute)

8. Verify Integrity#

// Verify specific bundle
result, err := mgr.Verify(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if result.Valid {
    log.Println("✓ Bundle is valid")
} else {
    log.Printf("✗ Invalid: %v", result.Error)
}

// Verify entire chain
chainResult, err := mgr.VerifyChain(ctx)
if err != nil {
    log.Fatal(err)
}

if chainResult.Valid {
    log.Printf("✓ Chain verified: %d bundles", chainResult.ChainLength)
} else {
    log.Printf("✗ Chain broken at bundle %d: %s", 
        chainResult.BrokenAt,
        chainResult.Error)
}

9. Scan Directory (Re-index)#

// Scan directory and rebuild index from existing bundles
result, err := mgr.Scan()
if err != nil {
    log.Fatal(err)
}

log.Printf("Scanned %d bundles", result.BundleCount)
if len(result.MissingGaps) > 0 {
    log.Printf("Warning: Missing bundles: %v", result.MissingGaps)
}

10. Complete Example: Background Sync Service#

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

type PLCService struct {
    mgr       *plcbundle.BundleManager
    updateCh  chan struct{}
    stopCh    chan struct{}
}

func NewPLCService(bundleDir string) (*PLCService, error) {
    mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
    if err != nil {
        return nil, err
    }
    
    return &PLCService{
        mgr:      mgr,
        updateCh: make(chan struct{}, 1),
        stopCh:   make(chan struct{}),
    }, nil
}

func (s *PLCService) Start() {
    log.Println("Starting PLC service...")
    
    // Initial scan
    if _, err := s.mgr.Scan(); err != nil {
        log.Printf("Scan warning: %v", err)
    }
    
    // Start update loop
    go s.updateLoop()
    
    // Periodic trigger
    go s.periodicTrigger(5 * time.Minute)
}

func (s *PLCService) updateLoop() {
    for {
        select {
        case <-s.updateCh:
            s.fetchNewBundles()
        case <-s.stopCh:
            return
        }
    }
}

func (s *PLCService) periodicTrigger(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            s.TriggerUpdate()
        case <-s.stopCh:
            return
        }
    }
}

func (s *PLCService) TriggerUpdate() {
    select {
    case s.updateCh <- struct{}{}:
    default:
        // Update already in progress
    }
}

func (s *PLCService) fetchNewBundles() {
    ctx := context.Background()
    fetched := 0
    
    for {
        bundle, err := s.mgr.FetchNext(ctx)
        if err != nil {
            if isEndOfData(err) {
                if fetched > 0 {
                    log.Printf("✓ Fetched %d new bundles", fetched)
                }
                return
            }
            log.Printf("Fetch error: %v", err)
            return
        }
        
        fetched++
        log.Printf("Bundle %06d: %d operations", 
            bundle.BundleNumber,
            len(bundle.Operations))
    }
}

func (s *PLCService) GetBundles() []*plcbundle.BundleMetadata {
    return s.mgr.GetIndex().GetBundles()
}

func (s *PLCService) GetOperations(bundleNum int) ([]plcbundle.PLCOperation, error) {
    ctx := context.Background()
    bundle, err := s.mgr.Load(ctx, bundleNum)
    if err != nil {
        return nil, err
    }
    return bundle.Operations, nil
}

func (s *PLCService) Stop() {
    close(s.stopCh)
    s.mgr.Close()
}

func main() {
    service, err := NewPLCService("./plc_data")
    if err != nil {
        log.Fatal(err)
    }
    
    service.Start()
    
    // Wait for interrupt
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh
    
    log.Println("Shutting down...")
    service.Stop()
}

CLI Tool Usage#

Fetch bundles#

# Fetch next bundle
plcbundle fetch

# Fetch specific number of bundles
plcbundle fetch -count 10

# Fetch all available bundles
plcbundle fetch -count 0

Scan directory#

# Scan and rebuild index
plcbundle scan

Verify integrity#

# Verify specific bundle
plcbundle verify -bundle 1

# Verify entire chain
plcbundle verify

# Verbose output
plcbundle verify -v

Show information#

# General info
plcbundle info

# Specific bundle info
plcbundle info -bundle 1

Export operations#

# Export operations to stdout (JSONL)
plcbundle export -count 1000 > operations.jsonl

# Export after specific time
plcbundle export -after "2024-01-01T00:00:00Z" -count 5000

Backfill#

# Fetch all bundles and stream to stdout
plcbundle backfill > all_operations.jsonl

# Start from specific bundle
plcbundle backfill -start 100 -end 200

API Reference#

Types#

type BundleManager struct { ... }
type Bundle struct {
    BundleNumber int
    StartTime    time.Time
    EndTime      time.Time
    Operations   []PLCOperation
    DIDCount     int
    Hash         string
    // ...
}

type PLCOperation struct {
    DID       string
    Operation map[string]interface{}
    CID       string
    CreatedAt time.Time
    RawJSON   []byte
}

Methods#

// Create
New(bundleDir, plcURL string) (*BundleManager, error)

// Sync
FetchNext(ctx) (*Bundle, error)
Export(ctx, afterTime, count) ([]PLCOperation, error)

// Query
Load(ctx, bundleNumber) (*Bundle, error)
GetIndex() *Index
GetInfo() map[string]interface{}

// Verify
Verify(ctx, bundleNumber) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)

// Manage
Scan() (*DirectoryScanResult, error)
Close()

Configuration#

type Config struct {
    BundleDir        string             // Storage directory
    CompressionLevel CompressionLevel   // Compression level
    VerifyOnLoad     bool               // Verify hashes when loading
    Logger           Logger             // Custom logger
}

// Compression levels
const (
    CompressionFastest  // Fastest compression
    CompressionDefault  // Balanced
    CompressionBetter   // Better compression (default)
    CompressionBest     // Best compression
)

Bundle Format#

  • File naming: NNNNNN.jsonl.zst (e.g., 000001.jsonl.zst)
  • Size: 10,000 operations per bundle
  • Compression: Zstandard
  • Index: plc_bundles.json (metadata and chain info)

Best Practices#

  1. Regular Updates: Run periodic updates (5-10 minutes) to stay synchronized
  2. Error Handling: Handle "insufficient operations" error as "caught up" signal
  3. Verification: Periodically verify chain integrity
  4. Rate Limiting: Default 90 req/min is safe for PLC directory
  5. Storage: Plan ~1-2 GB per million operations (compressed)

License#

MIT

Contributing#

Contributions welcome! Please open an issue or PR.