PLC Bundle V1 Example Implementations
at main 5.6 kB view raw
1#!/usr/bin/env ruby 2# frozen_string_literal: true 3 4# plcbundle.rb - Ruby implementation of plcbundle V1 specification 5# Creates compressed, cryptographically-chained bundles of DID PLC operations 6# 7# PLC Bundle v1 Specification: 8# https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md 9 10require 'json' 11require 'digest' 12require 'net/http' 13require 'uri' 14require 'fileutils' 15require 'time' 16require 'set' 17require 'zstd-ruby' 18 19# Configuration constants 20BUNDLE_SIZE = 10_000 21INDEX_FILE = 'plc_bundles.json' 22PLC_URL = 'https://plc.directory' 23 24class PlcBundle 25 def initialize(dir) 26 @dir = dir 27 @pool = [] # Mempool of operations waiting to be bundled 28 @seen = Set.new # CID deduplication set (pruned after each bundle) 29 30 FileUtils.mkdir_p(@dir) 31 @idx = load_idx 32 puts "plcbundle v1 | Dir: #{@dir} | Last: #{@idx[:last_bundle]}\n" 33 34 # Seed deduplication set with boundary CIDs from previous bundle 35 seed_boundary if @idx[:bundles].any? 36 end 37 38 def run 39 cursor = @idx[:bundles].last&.dig(:end_time) 40 41 loop do 42 puts "\nFetch: #{cursor || 'start'}" 43 ops = fetch(cursor) or (puts('Done.') and break) 44 45 add_ops(ops) # Validate and add to mempool 46 cursor = ops.last[:time] 47 create_bundle while @pool.size >= BUNDLE_SIZE # Create bundles when ready 48 49 sleep 0.2 # Rate limiting 50 end 51 52 save_idx 53 puts "\nBundles: #{@idx[:bundles].size} | Pool: #{@pool.size} | Size: #{'%.1f' % (@idx[:total_size_bytes] / 1e6)}MB" 54 rescue => e 55 puts "\nError: #{e.message}" and save_idx 56 end 57 58 private 59 60 # Fetch operations from PLC directory export endpoint 61 def fetch(after) 62 uri = URI("#{PLC_URL}/export?count=1000#{after ? "&after=#{after}" : ''}") 63 res = Net::HTTP.get_response(uri) 64 res.is_a?(Net::HTTPSuccess) or return nil 65 66 # Parse each line and preserve raw JSON for reproducibility (Spec 4.2) 67 res.body.strip.split("\n").map do |line| 68 {**JSON.parse(line, symbolize_names: true), raw: line, time: JSON.parse(line)['createdAt']} 69 end 70 rescue 71 nil 72 end 73 74 # Process and validate operations before adding to mempool 75 def add_ops(ops) 76 last_t = @pool.last&.dig(:time) || @idx[:bundles].last&.dig(:end_time) || '' 77 added = 0 78 79 ops.each do |op| 80 next if @seen.include?(op[:cid]) # Skip duplicates (boundary + within-batch) 81 82 # Spec 3: Validate chronological order 83 raise "Order fail" if op[:time] < last_t 84 85 @pool << op 86 @seen << op[:cid] 87 last_t = op[:time] 88 added += 1 89 end 90 91 puts " +#{added} ops" 92 end 93 94 # Create a bundle file and update index 95 def create_bundle 96 ops = @pool.shift(BUNDLE_SIZE) 97 parent = @idx[:bundles].last&.dig(:hash) || '' 98 99 # Spec 4.2: Serialize using raw JSON strings for reproducibility 100 jsonl = ops.map { |o| o[:raw] + "\n" }.join 101 102 # Spec 6.3: Calculate hashes 103 ch = sha(jsonl) # Content hash 104 h = sha(parent.empty? ? "plcbundle:genesis:#{ch}" : "#{parent}:#{ch}") # Chain hash 105 zst = Zstd.compress(jsonl) # Compress 106 107 # Write bundle file 108 num = @idx[:last_bundle] + 1 109 file = format('%06d.jsonl.zst', num) 110 File.binwrite("#{@dir}/#{file}", zst) 111 112 # Create metadata entry 113 @idx[:bundles] << { 114 bundle_number: num, 115 start_time: ops[0][:time], 116 end_time: ops[-1][:time], 117 operation_count: ops.size, 118 did_count: ops.map { |o| o[:did] }.uniq.size, 119 hash: h, 120 content_hash: ch, 121 parent: parent, 122 compressed_hash: sha(zst), 123 compressed_size: zst.bytesize, 124 uncompressed_size: jsonl.bytesize, 125 cursor: @idx[:bundles].last&.dig(:end_time) || '', 126 created_at: Time.now.utc.iso8601 127 } 128 129 @idx[:last_bundle] = num 130 @idx[:total_size_bytes] += zst.bytesize 131 132 # Prune seen CIDs: only keep boundary + mempool (memory efficient) 133 @seen = boundary_cids(ops) | @pool.map { |o| o[:cid] }.to_set 134 135 save_idx 136 puts "#{file} | #{h[0..12]}... | seen:#{@seen.size}" 137 end 138 139 # Load index from disk or create new 140 def load_idx 141 JSON.parse(File.read("#{@dir}/#{INDEX_FILE}"), symbolize_names: true) 142 rescue 143 {version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: []} 144 end 145 146 # Atomically save index using temp file + rename 147 def save_idx 148 @idx[:updated_at] = Time.now.utc.iso8601 149 tmp = "#{@dir}/#{INDEX_FILE}.tmp" 150 File.write(tmp, JSON.pretty_generate(@idx)) 151 File.rename(tmp, "#{@dir}/#{INDEX_FILE}") 152 end 153 154 # Seed deduplication set with CIDs from last bundle's boundary 155 def seed_boundary 156 last = @idx[:bundles].last 157 file = format('%06d.jsonl.zst', last[:bundle_number]) 158 159 data = Zstd.decompress(File.binread("#{@dir}/#{file}")) 160 ops = data.strip.split("\n").map do |line| 161 {time: JSON.parse(line)['createdAt'], cid: JSON.parse(line)['cid']} 162 end 163 164 @seen = boundary_cids(ops) 165 puts "Seeded: #{@seen.size} CIDs from bundle #{last[:bundle_number]}" 166 rescue 167 puts "Warning: couldn't seed boundary" 168 end 169 170 # Get CIDs from operations at the same timestamp as the last op (boundary) 171 def boundary_cids(ops) 172 return Set.new if ops.empty? 173 174 t = ops[-1][:time] 175 ops.reverse.take_while { |o| o[:time] == t }.map { |o| o[:cid] }.to_set 176 end 177 178 # SHA-256 hash helper 179 def sha(data) 180 Digest::SHA256.hexdigest(data) 181 end 182end 183 184# Entry point 185PlcBundle.new(ARGV[0] || './plc_bundles_rb').run if __FILE__ == $PROGRAM_NAME