A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

Go 96.8%
Makefile 1.5%
Shell 1.4%
Other 0.3%
64 3 47

Clone this repository

https://tangled.org/atscan.net/plcbundle https://tangled.org/did:plc:ft3tl5dxjn4psdk6asenqn3r/plcbundle
git@tangled.org:atscan.net/plcbundle git@tangled.org:did:plc:ft3tl5dxjn4psdk6asenqn3r/plcbundle

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

PLC Bundle#

	⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠀⡀⠀⠀⠀⠀⠀⠀⢀⠀⠀⡀⠀⢀⠀⢀⡀⣤⡢⣤⡤⡀⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⡄⡄⠐⡀⠈⣀⠀⡠⡠⠀⣢⣆⢌⡾⢙⠺⢽⠾⡋⣻⡷⡫⢵⣭⢦⣴⠦⠀⢠⠀⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢠⣤⣽⣥⡈⠧⣂⢧⢾⠕⠞⠡⠊⠁⣐⠉⠀⠉⢍⠀⠉⠌⡉⠀⠂⠁⠱⠉⠁⢝⠻⠎⣬⢌⡌⣬⣡⣀⣢⣄⡄⠀⡀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⢸⣿⣿⢿⣾⣯⣑⢄⡂⠀⠄⠂⠀⠀⢀⠀⠀⠐⠀⠀⠀⠀⠀⠀⠀⠀⠄⠐⠀⠀⠀⠀⣄⠭⠂⠈⠜⣩⣿⢝⠃⠀⠁⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢀⣻⡟⠏⠀⠚⠈⠚⡉⡝⢶⣱⢤⣅⠈⠀⠄⠀⠀⠀⠀⠀⠠⠀⠀⡂⠐⣤⢕⡪⢼⣈⡹⡇⠏⠏⠋⠅⢃⣪⡏⡇⡍⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣻⡄⠀⠀⠀⢠⠌⠃⠐⠉⢡⠱⠧⠝⡯⣮⢶⣴⣤⡆⢐⣣⢅⣮⡟⠦⠍⠉⠀⠁⠐⠀⠀⠀⠄⠐⠡⣽⡸⣎⢁⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⢈⡻⣧⠀⠁⠐⠀⠀⠀⠀⠀⠀⠊⠀⠕⢀⡉⠈⡫⠽⡿⡟⠿⠟⠁⠀⠀⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⠬⠥⣋⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⡀⣾⡍⠕⡀⠀⠀⠀⠄⠠⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠥⣤⢌⠀⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠄⢀⠀⢝⢞⣫⡆⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣽⡶⡄⠐⡀⠀⠀⠀⠀⠀⠀⢀⠀⠄⠀⠀⠀⠄⠁⠇⣷⡆⠀⠀⠀⢀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⡸⢝⣮⠍⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⢀⠀⢾⣷⠀⠠⡀⠀⠀⠀⠀⢀⠀⠀⠀⠀⠀⠁⡁⠀⠀⣾⡥⠖⠀⠀⠀⠂⠀⠀⠀⠀⠀⠁⠀⡀⠁⠀⠀⠻⢳⣻⢄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣞⡙⠨⣀⠠⠄⠀⠂⠀⠀⠀⠈⢀⠀⠀⠀⠀⠀⠤⢚⢢⣟⠀⠀⠀⠀⡐⠀⠀⡀⠀⠀⠀⠀⠁⠈⠌⠊⣯⣮⡏⠡⠂⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⣻⡟⡄⡡⣄⠀⠠⠀⠀⡅⠀⠐⠀⡀⠀⡀⠀⠄⠈⠃⠳⠪⠤⠀⠀⠀⠀⡀⠀⠂⠀⠀⠀⠁⠈⢠⣠⠒⠻⣻⡧⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠪⡎⠠⢌⠑⡀⠂⠀⠄⠠⠀⠠⠀⠁⡀⠠⠠⡀⣀⠜⢏⡅⠀⠀⡀⠁⠀⠀⠁⠁⠐⠄⡀⢀⠂⠀⠄⢑⣿⣿⣿⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠼⣻⠧⣣⣀⠐⠨⠁⠕⢈⢀⢀⡁⠀⠈⠠⢀⠀⠐⠜⣽⡗⡤⠀⠂⠀⠠⠀⢂⠠⠀⠁⠄⠀⠔⠀⠑⣨⣿⢯⠋⡅⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⡚⣷⣭⠎⢃⡗⠄⡄⢀⠁⠀⠅⢀⢅⡀⠠⠀⢠⡀⡩⠷⢇⠀⡀⠄⡠⠤⠆⣀⡀⠄⠉⣠⠃⠴⠀⠈⢁⣿⡛⡯⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠘⡬⡿⣿⡏⡻⡯⠌⢁⢛⠠⠓⠐⠐⠐⠌⠃⠋⠂⡢⢰⣈⢏⣰⠂⠈⠀⠠⠒⠡⠌⠫⠭⠩⠢⡬⠆⠿⢷⢿⡽⡧⠉⠊⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠺⣷⣺⣗⣿⡶⡎⡅⣣⢎⠠⡅⣢⡖⠴⠬⡈⠂⡨⢡⠾⣣⣢⠀⠀⡹⠄⡄⠄⡇⣰⡖⡊⠔⢹⣄⣿⣭⣵⣿⢷⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠩⣿⣿⣲⣿⣷⣟⣼⠟⣬⢉⡠⣪⢜⣂⣁⠥⠓⠚⡁⢶⣷⣠⠂⡄⡢⣀⡐⠧⢆⣒⡲⡳⡫⢟⡃⢪⡧⣟⡟⣯⠐⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⢺⠟⢿⢟⢻⡗⡮⡿⣲⢷⣆⣏⣇⡧⣄⢖⠾⡷⣿⣤⢳⢷⣣⣦⡜⠗⣭⢂⠩⣹⢿⡲⢎⡧⣕⣖⣓⣽⡿⡖⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠂⠂⠏⠿⢻⣥⡪⢽⣳⣳⣥⡶⣫⣍⢐⣥⣻⣾⡻⣅⢭⡴⢭⣿⠕⣧⡭⣞⣻⣣⣻⢿⠟⠛⠙⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠄⠋⠫⠯⣍⢻⣿⣿⣷⣕⣵⣹⣽⣿⣷⣇⡏⣿⡿⣍⡝⠵⠯⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠐⠠⠁⠋⢣⠓⡍⣫⠹⣿⣿⣷⡿⠯⠺⠁⠁⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀
⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠋⢀⠋⢈⡿⠿⠁⠉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀

⚠️ Preview Version - Do Not Use in Production!

This project and plcbundle specification is currently unstable and under heavy development. Things can break at any time. Bundle hashes or data formats may change. Do not use this for production systems. Please wait for the 1.0 release.

plcbundle archives AT Protocol's DID PLC Directory operations into immutable, cryptographically-chained bundles of 10,000 operations. Each bundle is hashed (SHA-256), compressed (zstd), and linked to the previous bundle, creating a verifiable chain of DID operations.

This repository contains a reference library and a CLI tool written in Go language.

The technical specification for the plcbundle V1 format, index, and creation process can be found in the specification document.

Features#

  • 📦 Bundle Management: Automatically organize PLC operations into compressed bundles (10,000 operations each)
  • 🔄 Transparent Sync: Fetch and cache PLC operations with automatic deduplication
  • 🗜️ Efficient Storage: Zstandard compression with configurable levels
  • Integrity: SHA-256 hash verification and blockchain-like chain validation
  • 🔍 Indexing: Fast bundle lookup and gap detection
  • 📊 Export: Query operations by time range

Installation#

go get github.com/atscan/plcbundle

For the CLI tool:

go install github.com/atscan/plcbundle/cmd/plcbundle@latest

Quick Start (Library)#

package main

import (
    "context"
    "log"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

func main() {
    // Create a bundle manager
    mgr, err := plcbundle.New("./plc_data", "https://plc.directory")
    if err != nil {
        log.Fatal(err)
    }
    defer mgr.Close()
    
    // Fetch latest bundles
    ctx := context.Background()
    bundle, err := mgr.FetchNext(ctx)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Fetched bundle %d with %d operations", 
        bundle.BundleNumber, len(bundle.Operations))
}

Library Usage#

1. Basic Setup#

import (
    "context"
    plcbundle "github.com/atscan/plcbundle"
)

// Create manager with defaults
mgr, err := plcbundle.New("./bundles", "https://plc.directory")
if err != nil {
    log.Fatal(err)
}
defer mgr.Close()

2. Custom Configuration#

import (
    "github.com/atscan/plcbundle/bundle"
    "github.com/atscan/plcbundle/plc"
)

// Custom config
config := &bundle.Config{
    BundleDir:        "./my_bundles",
    CompressionLevel: bundle.CompressionBest,
    VerifyOnLoad:     true,
    Logger:           myCustomLogger,
}

// Custom PLC client with rate limiting
plcClient := plc.NewClient("https://plc.directory",
    plc.WithRateLimit(60, time.Minute),  // 60 req/min
    plc.WithTimeout(30*time.Second),
)

mgr, err := bundle.NewManager(config, plcClient)

3. Transparent Synchronization (Main Use Case)#

This is the primary pattern for keeping your local PLC mirror up-to-date:

package main

import (
    "context"
    "log"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

type PLCSync struct {
    mgr    *plcbundle.BundleManager
    ctx    context.Context
    cancel context.CancelFunc
}

func NewPLCSync(bundleDir string) (*PLCSync, error) {
    mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
    if err != nil {
        return nil, err
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    
    sync := &PLCSync{
        mgr:    mgr,
        ctx:    ctx,
        cancel: cancel,
    }
    
    return sync, nil
}

func (s *PLCSync) Start(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    log.Println("Starting PLC synchronization...")
    
    for {
        select {
        case <-ticker.C:
            if err := s.Update(); err != nil {
                log.Printf("Update error: %v", err)
            }
        case <-s.ctx.Done():
            return
        }
    }
}

func (s *PLCSync) Update() error {
    log.Println("Checking for new bundles...")
    
    for {
        bundle, err := s.mgr.FetchNext(s.ctx)
        if err != nil {
            // Check if we're caught up
            if isEndOfData(err) {
                log.Println("✓ Up to date!")
                return nil
            }
            return err
        }
        
        log.Printf("✓ Fetched bundle %06d (%d ops, %d DIDs)",
            bundle.BundleNumber, 
            len(bundle.Operations),
            bundle.DIDCount)
    }
}

func (s *PLCSync) Stop() {
    s.cancel()
    s.mgr.Close()
}

func isEndOfData(err error) bool {
    return err != nil && 
        (strings.Contains(err.Error(), "insufficient operations") ||
         strings.Contains(err.Error(), "caught up"))
}

// Usage
func main() {
    sync, err := NewPLCSync("./plc_bundles")
    if err != nil {
        log.Fatal(err)
    }
    defer sync.Stop()
    
    // Update every 5 minutes
    sync.Start(5 * time.Minute)
}

4. Getting Bundles#

ctx := context.Background()

// Get all bundles
index := mgr.GetIndex()
bundles := index.GetBundles()

for _, meta := range bundles {
    log.Printf("Bundle %06d: %d ops, %s to %s",
        meta.BundleNumber,
        meta.OperationCount,
        meta.StartTime.Format(time.RFC3339),
        meta.EndTime.Format(time.RFC3339))
}

// Load specific bundle
bundle, err := mgr.Load(ctx, 1)
if err != nil {
    log.Fatal(err)
}

log.Printf("Loaded %d operations", len(bundle.Operations))

5. Getting Operations from Bundles#

// Load a bundle and iterate operations
bundle, err := mgr.Load(ctx, 1)
if err != nil {
    log.Fatal(err)
}

for _, op := range bundle.Operations {
    log.Printf("DID: %s, CID: %s, Time: %s",
        op.DID,
        op.CID,
        op.CreatedAt.Format(time.RFC3339))
    
    // Access operation data
    if opType, ok := op.Operation["type"].(string); ok {
        log.Printf("  Type: %s", opType)
    }
}

6. Export Operations by Time Range#

// Export operations after a specific time
afterTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
operations, err := mgr.Export(ctx, afterTime, 5000)
if err != nil {
    log.Fatal(err)
}

log.Printf("Exported %d operations", len(operations))

// Process operations
for _, op := range operations {
    // Your processing logic
    processOperation(op)
}

7. Periodic Update Pattern#

// Simple periodic updater
func runPeriodicUpdate(mgr *plcbundle.BundleManager, interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for range ticker.C {
        ctx := context.Background()
        
        // Try to fetch next bundle
        bundle, err := mgr.FetchNext(ctx)
        if err != nil {
            if strings.Contains(err.Error(), "insufficient operations") {
                log.Println("Caught up!")
                continue
            }
            log.Printf("Error: %v", err)
            continue
        }
        
        log.Printf("New bundle %d: %d operations", 
            bundle.BundleNumber,
            len(bundle.Operations))
        
        // Process new operations
        for _, op := range bundle.Operations {
            handleOperation(op)
        }
    }
}

// Usage
go runPeriodicUpdate(mgr, 10*time.Minute)

8. Verify Integrity#

// Verify specific bundle
result, err := mgr.Verify(ctx, 1)
if err != nil {
    log.Fatal(err)
}

if result.Valid {
    log.Println("✓ Bundle is valid")
} else {
    log.Printf("✗ Invalid: %v", result.Error)
}

// Verify entire chain
chainResult, err := mgr.VerifyChain(ctx)
if err != nil {
    log.Fatal(err)
}

if chainResult.Valid {
    log.Printf("✓ Chain verified: %d bundles", chainResult.ChainLength)
} else {
    log.Printf("✗ Chain broken at bundle %d: %s", 
        chainResult.BrokenAt,
        chainResult.Error)
}

9. Scan Directory (Re-index)#

// Scan directory and rebuild index from existing bundles
result, err := mgr.Scan()
if err != nil {
    log.Fatal(err)
}

log.Printf("Scanned %d bundles", result.BundleCount)
if len(result.MissingGaps) > 0 {
    log.Printf("Warning: Missing bundles: %v", result.MissingGaps)
}

10. Complete Example: Background Sync Service#

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
    
    plcbundle "github.com/atscan/plcbundle"
)

type PLCService struct {
    mgr       *plcbundle.BundleManager
    updateCh  chan struct{}
    stopCh    chan struct{}
}

func NewPLCService(bundleDir string) (*PLCService, error) {
    mgr, err := plcbundle.New(bundleDir, "https://plc.directory")
    if err != nil {
        return nil, err
    }
    
    return &PLCService{
        mgr:      mgr,
        updateCh: make(chan struct{}, 1),
        stopCh:   make(chan struct{}),
    }, nil
}

func (s *PLCService) Start() {
    log.Println("Starting PLC service...")
    
    // Initial scan
    if _, err := s.mgr.Scan(); err != nil {
        log.Printf("Scan warning: %v", err)
    }
    
    // Start update loop
    go s.updateLoop()
    
    // Periodic trigger
    go s.periodicTrigger(5 * time.Minute)
}

func (s *PLCService) updateLoop() {
    for {
        select {
        case <-s.updateCh:
            s.fetchNewBundles()
        case <-s.stopCh:
            return
        }
    }
}

func (s *PLCService) periodicTrigger(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            s.TriggerUpdate()
        case <-s.stopCh:
            return
        }
    }
}

func (s *PLCService) TriggerUpdate() {
    select {
    case s.updateCh <- struct{}{}:
    default:
        // Update already in progress
    }
}

func (s *PLCService) fetchNewBundles() {
    ctx := context.Background()
    fetched := 0
    
    for {
        bundle, err := s.mgr.FetchNext(ctx)
        if err != nil {
            if isEndOfData(err) {
                if fetched > 0 {
                    log.Printf("✓ Fetched %d new bundles", fetched)
                }
                return
            }
            log.Printf("Fetch error: %v", err)
            return
        }
        
        fetched++
        log.Printf("Bundle %06d: %d operations", 
            bundle.BundleNumber,
            len(bundle.Operations))
    }
}

func (s *PLCService) GetBundles() []*plcbundle.BundleMetadata {
    return s.mgr.GetIndex().GetBundles()
}

func (s *PLCService) GetOperations(bundleNum int) ([]plcbundle.PLCOperation, error) {
    ctx := context.Background()
    bundle, err := s.mgr.Load(ctx, bundleNum)
    if err != nil {
        return nil, err
    }
    return bundle.Operations, nil
}

func (s *PLCService) Stop() {
    close(s.stopCh)
    s.mgr.Close()
}

func main() {
    service, err := NewPLCService("./plc_data")
    if err != nil {
        log.Fatal(err)
    }
    
    service.Start()
    
    // Wait for interrupt
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
    <-sigCh
    
    log.Println("Shutting down...")
    service.Stop()
}

CLI Tool Usage#

Fetch bundles#

# Fetch next bundle
plcbundle fetch

# Fetch specific number of bundles
plcbundle fetch -count 10

# Fetch all available bundles
plcbundle fetch -count 0

Scan directory#

# Scan and rebuild index
plcbundle scan

Verify integrity#

# Verify specific bundle
plcbundle verify -bundle 1

# Verify entire chain
plcbundle verify

# Verbose output
plcbundle verify -v

Show information#

# General info
plcbundle info

# Specific bundle info
plcbundle info -bundle 1

Export operations#

# Export operations to stdout (JSONL)
plcbundle export -count 1000 > operations.jsonl

# Export after specific time
plcbundle export -after "2024-01-01T00:00:00Z" -count 5000

Backfill#

# Fetch all bundles and stream to stdout
plcbundle backfill > all_operations.jsonl

# Start from specific bundle
plcbundle backfill -start 100 -end 200

API Reference#

Types#

type BundleManager struct { ... }
type Bundle struct {
    BundleNumber int
    StartTime    time.Time
    EndTime      time.Time
    Operations   []PLCOperation
    DIDCount     int
    Hash         string
    // ...
}

type PLCOperation struct {
    DID       string
    Operation map[string]interface{}
    CID       string
    CreatedAt time.Time
    RawJSON   []byte
}

Methods#

// Create
New(bundleDir, plcURL string) (*BundleManager, error)

// Sync
FetchNext(ctx) (*Bundle, error)
Export(ctx, afterTime, count) ([]PLCOperation, error)

// Query
Load(ctx, bundleNumber) (*Bundle, error)
GetIndex() *Index
GetInfo() map[string]interface{}

// Verify
Verify(ctx, bundleNumber) (*VerificationResult, error)
VerifyChain(ctx) (*ChainVerificationResult, error)

// Manage
Scan() (*DirectoryScanResult, error)
Close()

Configuration#

type Config struct {
    BundleDir        string             // Storage directory
    CompressionLevel CompressionLevel   // Compression level
    VerifyOnLoad     bool               // Verify hashes when loading
    Logger           Logger             // Custom logger
}

License#

MIT

Contributing#

Contributions welcome! Please open an issue or PR.