PLC Bundle V1 Example Implementations
1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# plcbundle.rb - Ruby implementation of plcbundle V1 specification
5# Creates compressed, cryptographically-chained bundles of DID PLC operations
6#
7# PLC Bundle v1 Specification:
8# https://tangled.org/atscan.net/plcbundle/blob/main/docs/specification.md
9
10require 'json'
11require 'digest'
12require 'net/http'
13require 'uri'
14require 'fileutils'
15require 'time'
16require 'set'
17require 'zstd-ruby'
18
19# Configuration constants
20BUNDLE_SIZE = 10_000
21INDEX_FILE = 'plc_bundles.json'
22PLC_URL = 'https://plc.directory'
23
24class PlcBundle
25 def initialize(dir)
26 @dir = dir
27 @pool = [] # Mempool of operations waiting to be bundled
28 @seen = Set.new # CID deduplication set (pruned after each bundle)
29
30 FileUtils.mkdir_p(@dir)
31 @idx = load_idx
32 puts "plcbundle v1 | Dir: #{@dir} | Last: #{@idx[:last_bundle]}\n"
33
34 # Seed deduplication set with boundary CIDs from previous bundle
35 seed_boundary if @idx[:bundles].any?
36 end
37
38 def run
39 cursor = @idx[:bundles].last&.dig(:end_time)
40
41 loop do
42 puts "\nFetch: #{cursor || 'start'}"
43 ops = fetch(cursor) or (puts('Done.') and break)
44
45 add_ops(ops) # Validate and add to mempool
46 cursor = ops.last[:time]
47 create_bundle while @pool.size >= BUNDLE_SIZE # Create bundles when ready
48
49 sleep 0.2 # Rate limiting
50 end
51
52 save_idx
53 puts "\nBundles: #{@idx[:bundles].size} | Pool: #{@pool.size} | Size: #{'%.1f' % (@idx[:total_size_bytes] / 1e6)}MB"
54 rescue => e
55 puts "\nError: #{e.message}" and save_idx
56 end
57
58 private
59
60 # Fetch operations from PLC directory export endpoint
61 def fetch(after)
62 uri = URI("#{PLC_URL}/export?count=1000#{after ? "&after=#{after}" : ''}")
63 res = Net::HTTP.get_response(uri)
64 res.is_a?(Net::HTTPSuccess) or return nil
65
66 # Parse each line and preserve raw JSON for reproducibility (Spec 4.2)
67 res.body.strip.split("\n").map do |line|
68 {**JSON.parse(line, symbolize_names: true), raw: line, time: JSON.parse(line)['createdAt']}
69 end
70 rescue
71 nil
72 end
73
74 # Process and validate operations before adding to mempool
75 def add_ops(ops)
76 last_t = @pool.last&.dig(:time) || @idx[:bundles].last&.dig(:end_time) || ''
77 added = 0
78
79 ops.each do |op|
80 next if @seen.include?(op[:cid]) # Skip duplicates (boundary + within-batch)
81
82 # Spec 3: Validate chronological order
83 raise "Order fail" if op[:time] < last_t
84
85 @pool << op
86 @seen << op[:cid]
87 last_t = op[:time]
88 added += 1
89 end
90
91 puts " +#{added} ops"
92 end
93
94 # Create a bundle file and update index
95 def create_bundle
96 ops = @pool.shift(BUNDLE_SIZE)
97 parent = @idx[:bundles].last&.dig(:hash) || ''
98
99 # Spec 4.2: Serialize using raw JSON strings for reproducibility
100 jsonl = ops.map { |o| o[:raw] + "\n" }.join
101
102 # Spec 6.3: Calculate hashes
103 ch = sha(jsonl) # Content hash
104 h = sha(parent.empty? ? "plcbundle:genesis:#{ch}" : "#{parent}:#{ch}") # Chain hash
105 zst = Zstd.compress(jsonl) # Compress
106
107 # Write bundle file
108 num = @idx[:last_bundle] + 1
109 file = format('%06d.jsonl.zst', num)
110 File.binwrite("#{@dir}/#{file}", zst)
111
112 # Create metadata entry
113 @idx[:bundles] << {
114 bundle_number: num,
115 start_time: ops[0][:time],
116 end_time: ops[-1][:time],
117 operation_count: ops.size,
118 did_count: ops.map { |o| o[:did] }.uniq.size,
119 hash: h,
120 content_hash: ch,
121 parent: parent,
122 compressed_hash: sha(zst),
123 compressed_size: zst.bytesize,
124 uncompressed_size: jsonl.bytesize,
125 cursor: @idx[:bundles].last&.dig(:end_time) || '',
126 created_at: Time.now.utc.iso8601
127 }
128
129 @idx[:last_bundle] = num
130 @idx[:total_size_bytes] += zst.bytesize
131
132 # Prune seen CIDs: only keep boundary + mempool (memory efficient)
133 @seen = boundary_cids(ops) | @pool.map { |o| o[:cid] }.to_set
134
135 save_idx
136 puts "✓ #{file} | #{h[0..12]}... | seen:#{@seen.size}"
137 end
138
139 # Load index from disk or create new
140 def load_idx
141 JSON.parse(File.read("#{@dir}/#{INDEX_FILE}"), symbolize_names: true)
142 rescue
143 {version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: []}
144 end
145
146 # Atomically save index using temp file + rename
147 def save_idx
148 @idx[:updated_at] = Time.now.utc.iso8601
149 tmp = "#{@dir}/#{INDEX_FILE}.tmp"
150 File.write(tmp, JSON.pretty_generate(@idx))
151 File.rename(tmp, "#{@dir}/#{INDEX_FILE}")
152 end
153
154 # Seed deduplication set with CIDs from last bundle's boundary
155 def seed_boundary
156 last = @idx[:bundles].last
157 file = format('%06d.jsonl.zst', last[:bundle_number])
158
159 data = Zstd.decompress(File.binread("#{@dir}/#{file}"))
160 ops = data.strip.split("\n").map do |line|
161 {time: JSON.parse(line)['createdAt'], cid: JSON.parse(line)['cid']}
162 end
163
164 @seen = boundary_cids(ops)
165 puts "Seeded: #{@seen.size} CIDs from bundle #{last[:bundle_number]}"
166 rescue
167 puts "Warning: couldn't seed boundary"
168 end
169
170 # Get CIDs from operations at the same timestamp as the last op (boundary)
171 def boundary_cids(ops)
172 return Set.new if ops.empty?
173
174 t = ops[-1][:time]
175 ops.reverse.take_while { |o| o[:time] == t }.map { |o| o[:cid] }.to_set
176 end
177
178 # SHA-256 hash helper
179 def sha(data)
180 Digest::SHA256.hexdigest(data)
181 end
182end
183
184# Entry point
185PlcBundle.new(ARGV[0] || './plc_bundles_rb').run if __FILE__ == $PROGRAM_NAME